From 3521485ffb5beecb6dab78705765e01658b3f0c1 Mon Sep 17 00:00:00 2001 From: tomsmeding Date: Tue, 24 Dec 2019 11:38:58 +0100 Subject: Day 23, part 1 and 2 again, but now with actual concurrency --- 2019/23.hs | 221 ++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 130 insertions(+), 91 deletions(-) diff --git a/2019/23.hs b/2019/23.hs index 054b53a..77baccf 100644 --- a/2019/23.hs +++ b/2019/23.hs @@ -1,111 +1,150 @@ --- Part 2: 17281 is too high - -{-# LANGUAGE TupleSections #-} module Main where -import Data.Foldable (toList) -import qualified Data.Map.Strict as Map -import Data.Maybe -import qualified Data.Sequence as S +import qualified Data.Array as A +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad +import qualified Data.IntSet as IntSet -import Debug.Trace +-- import Debug.Trace import Input import IntCode -safeLast :: [a] -> Maybe a -safeLast [] = Nothing -safeLast [x] = Just x -safeLast (_:xs) = safeLast xs - -data Computer - = Computer { cCont :: Continuation - , cQueue :: S.Seq Integer } - -queueAdd :: Message -> Computer -> Computer -queueAdd (Message _ x y) comp = comp { cQueue = cQueue comp S.|> x S.|> y } - -queueAdds :: [Message] -> Computer -> Computer -queueAdds msgs comp = foldr queueAdd comp msgs - -data Message = Message Int Integer Integer - deriving (Show) +data Message = Message Int Integer Integer deriving (Show) +data CompIn = CompRecvMsg Message | IdlePoll | Quit deriving (Show) +data CompOut = CompSendMsg Int Message | Idle Int deriving (Show) +data NatIn = NatRecvMsg Message | NetworkIdle deriving (Show) +data NatOut = NatSendMsg Message | Answer Integer deriving (Show) parseMessages :: [Integer] -> [Message] parseMessages [] = [] parseMessages (to:x:y:rest) = Message (fromIntegral to) x y : parseMessages rest parseMessages _ = error "Non-3-blocked output" -runComputer :: Computer -> ([Message], Computer) -runComputer (Computer cont queue) = - let queue' = if null queue then [-1] else toList queue - (cont', output) = either id ((undefined,) . snd) (runContinue cont queue') - in (parseMessages output, Computer cont' mempty) - -step :: Map.Map Int Computer -> Int -> ([Message], Map.Map Int Computer) -step network cid = - let (msgs, computer') = runComputer (network Map.! cid) - network' = foldr (\msg@(Message to _ _) -> Map.adjust (queueAdd msg) to) - (Map.insert cid computer' network) - msgs - in (msgs, network') - -runUntilNAT :: Map.Map Int Computer -> Int -> Message -runUntilNAT network cid = - let (msgs, network') = step network cid - in case [msg | msg@(Message 255 _ _) <- msgs] of - msg:_ -> msg - [] -> runUntilNAT network' ((cid + 1) `mod` 50) - --- NAT (last recv'd msg (with to=0)) (last Y value sent to 0) (number of idle computers) -data NAT = NAT (Maybe Message) (Maybe Integer) Int - -initNAT :: NAT -initNAT = NAT Nothing Nothing 0 - -natNotify :: NAT -> Int -> [Message] -> S.Seq Integer -> NAT -natNotify (NAT origBin zeroY idle) from msgs queue = - let recv = [msg | msg@(Message 255 _ _) <- msgs] - newBin = safeLast (maybeToList origBin ++ [Message 0 x y | Message _ x y <- recv]) - idle' = if null msgs && S.null queue then idle + 1 else 0 - in -- (if not (null recv) then trace ("NAT received " ++ show recv ++ " from " ++ show from) else id) $ - (if idle' >= 50 then trace "idle!" else id) $ - NAT newBin zeroY idle' - -natPoll :: NAT -> Either Integer ([Message], NAT) --- natPoll (NAT _ _ idle) | traceShow ("poll", idle) False = undefined -natPoll (NAT (Just bin) zeroY idle) | idle >= 50 = - case (bin, zeroY) of - (Message 0 _ y, Just y') | y == y' -> Left y - (Message 0 _ y, _) -> Right ([bin], NAT Nothing (Just y) 0) - (_, _) -> Right ([bin], NAT Nothing zeroY 0) -natPoll nat@(NAT _ _ _) = Right ([], nat) - -traceMessages :: [(Int, Message)] -> a -> a -traceMessages = flip (foldr (\(from, Message to x y) -> trace ("Message " ++ show from ++ " -> " ++ show to ++ ": [" ++ show x ++ "," ++ show y ++ "]"))) - -runWithNAT :: NAT -> Map.Map Int Computer -> Int -> Integer -runWithNAT nat network cid = - -- trace ("runWithNAT cid=" ++ show cid) $ - let (msgs, network') = step network cid - nat' = natNotify nat cid msgs (cQueue (network Map.! cid)) - in trace ("step " ++ show cid ++ " in: " ++ show (cQueue (network Map.! cid)) ++ ", out: " ++ show msgs) $ - traceMessages (map (cid,) msgs) $ - case natPoll nat' of - Left res -> res - Right (natout, nat'') -> - -- (if not (null natout) then trace ("NAT sent " ++ show natout) else id) $ - traceMessages (map (-1,) natout) $ - runWithNAT nat'' (Map.adjust (queueAdds natout) 0 network') ((cid + 1) `mod` 50) +chanFunnel :: (a -> c) -> TChan a -> (b -> c) -> TChan b -> IO (ThreadId, TChan c) +chanFunnel f1 in1Chan f2 in2Chan = do + outChan <- atomically $ newTChan + th1 <- forkIO $ forever $ atomically (readTChan in1Chan) >>= atomically . writeTChan outChan . f1 + th2 <- forkIO $ forever $ atomically (readTChan in2Chan) >>= atomically . writeTChan outChan . f2 + th <- forkFinally (forever yield) (const $ killThread th1 >> killThread th2) + return (th, outChan) + +threadComputer :: Int -> [Integer] -> TChan CompIn -> TChan CompOut -> IO () +threadComputer myid nicProgram inChan outChan = + case runInterruptible nicProgram [fromIntegral myid] of + Left (cont, output) -> sendMsgs output >> loop cont + Right _ -> undefined + where + loop :: Continuation -> IO () + loop cont = do + incoming <- atomically $ readTChan inChan + case incoming of + CompRecvMsg (Message _ x y) -> do + (cont', output) <- + case runContinue cont [x, y] of + Left pair -> return pair + Right (_, out) -> do + putStrLn $ "!! Computer " ++ show myid ++ ": intcode terminated! (in recvmsg)" + return (undefined, out) + sendMsgs output + loop cont' + IdlePoll -> do + (cont', output) <- + case runContinue cont [-1] of + Left pair -> return pair + Right (_, out) -> do + putStrLn $ "!! Computer " ++ show myid ++ ": intcode terminated! (in idlepoll)" + return (undefined, out) + if null output + then atomically $ writeTChan outChan (Idle myid) + else sendMsgs output + loop cont' + Quit -> + return () + + sendMsgs :: [Integer] -> IO () + sendMsgs output = atomically $ sequence_ [writeTChan outChan (CompSendMsg myid msg) + | msg <- parseMessages output] + +threadNAT :: TChan NatIn -> TChan NatOut -> TVar (Maybe Integer) -> IO () +threadNAT inChan outChan part1 = loop Nothing Nothing + where + loop :: Maybe Message -> Maybe Integer -> IO () + loop msave mlastY = do + incoming <- atomically $ readTChan inChan + case incoming of + NatRecvMsg msg@(Message _ _ y) -> do + -- Set 'part1' to contain the first y value ever sent to the NAT + atomically $ modifyTVar' part1 (\m -> case m of + Just _ -> m + Nothing -> Just y) + loop (Just msg) mlastY + NetworkIdle -> do + -- traceM "NAT: network idle received" + case (msave, mlastY) of + (Just (Message _ _ y), Just y') | y == y' -> + atomically $ writeTChan outChan (Answer y) + (Just (Message _ x y), _) -> do + atomically $ writeTChan outChan (NatSendMsg (Message 0 x y)) + loop Nothing (Just y) + (Nothing, _) -> do + putStrLn "!! NAT deadlock: network idle but no message queued in NAT!" + atomically $ writeTChan outChan (Answer (-1)) + +arbitrator :: TChan (Either NatOut CompOut) -> A.Array Int (TChan CompIn) -> TChan NatIn -> IO Integer +arbitrator inChan compChans natChan = loop Nothing + where + loop :: Maybe IntSet.IntSet -> IO Integer + loop midle = do + yield + incoming <- atomically $ tryReadTChan inChan + case incoming of + Just (Left (NatSendMsg msg)) -> + sendMsg (-1) msg >> loop Nothing + Just (Left (Answer y)) -> do + sequence_ [atomically $ writeTChan chan Quit | chan <- A.elems compChans] + return y + Just (Right (CompSendMsg from msg)) -> + sendMsg from msg >> loop Nothing + Just (Right (Idle from)) -> + case midle of + Just idle -> + let idle' = IntSet.insert from idle + in if IntSet.size idle' == 50 + then atomically (writeTChan natChan NetworkIdle) >> loop Nothing + else loop (Just idle') + Nothing -> + loop midle + Nothing -> do + -- traceM "ar: Sending deadlock poll" + sequence_ [atomically $ writeTChan chan IdlePoll | chan <- A.elems compChans] + loop (Just IntSet.empty) + + sendMsg :: Int -> Message -> IO () + -- sendMsg from msg | trace ("ar: send " ++ show from ++ " -> " ++ show msg) False = undefined + sendMsg _ msg@(Message 255 _ _) = atomically $ writeTChan natChan (NatRecvMsg msg) + sendMsg _ msg@(Message to _ _) = atomically $ writeTChan (compChans A.! to) (CompRecvMsg msg) main :: IO () main = do nicProgram <- parse . head <$> getInput 23 - let initComputer i = Computer (initialContinuation nicProgram) (S.singleton (fromIntegral i)) - initNetwork = Map.fromList [(i, initComputer i) | i <- [0..49]] - Message _ _ y = runUntilNAT initNetwork 0 - print y + part1 <- atomically $ newTVar Nothing + + compChans <- atomically $ A.listArray (0, 49) <$> sequence (replicate 50 newTChan) + compOutChan <- atomically $ newTChan + natChan <- atomically $ newTChan + natOutChan <- atomically $ newTChan + (_, collectChan) <- chanFunnel Left natOutChan Right compOutChan + + _ <- sequence [forkIO (threadComputer i nicProgram (compChans A.! i) compOutChan) + | i <- [0..49]] - print (runWithNAT initNAT initNetwork 0) + _ <- forkIO (threadNAT natChan natOutChan part1) + + y <- arbitrator collectChan compChans natChan + Just part1y <- atomically (readTVar part1) + print part1y + print y -- cgit v1.2.3