From 287d9e5c4fc50bcca2474b9783148181d7ede872 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Mon, 6 Apr 2026 23:35:05 +0200 Subject: Log watching --- src/Index.hs | 290 +++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 202 insertions(+), 88 deletions(-) (limited to 'src/Index.hs') diff --git a/src/Index.hs b/src/Index.hs index 5486692..8c605d3 100644 --- a/src/Index.hs +++ b/src/Index.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE OverloadedStrings #-} module Index ( Index, initIndex, @@ -7,24 +8,33 @@ module Index ( indexCalendar, ) where -import Data.Time.Calendar -import Control.Monad (forM) +import Prelude hiding (foldl') -- exported since GHC 9.10 (base 4.20) + +import Control.Concurrent +import Control.Monad (forM, forM_, when) import Data.ByteString qualified as BS import Data.ByteString (ByteString) import Data.Char (isDigit) -import Data.List (sort, scanl') +import Data.IORef +import Data.List (sort, scanl', foldl') import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map +import Data.Maybe (catMaybes) import Data.Text qualified as T +import Data.Time import Data.Vector.Storable qualified as VS -import Data.Word (Word32) +import Data.Word import System.Clock qualified as Clock import System.Directory import System.FilePath +import System.FSNotify qualified as FN import Text.Read (readMaybe) +import AtomicPrint +import Debounce import Cache -import Config (Channel(..)) +import Config (Channel(..), prettyChannel) +import ImmutGrowVector qualified as IGV import Mmap import Util import ZNC @@ -32,13 +42,26 @@ import ZNC data ChanIndex = ChanIndex { ciStartDay :: !Day - , ciEndDay :: !Day -- ^ inclusive - , ciCountUntil :: !(VS.Vector Int) -- ^ number of events up to and /including/ this day + , ciCountUntil :: !(IGV.ImmutGrowVector Int) -- ^ number of events up to and /including/ this day , ciTotal :: !Int } deriving (Show) +-- | Inclusive. +ciEndDay :: ChanIndex -> Day +ciEndDay ci = + let ndays = IGV.length (ciCountUntil ci) + in if ndays > 0 + then (ndays - 1) `addDays'` ciStartDay ci + else error "ciEndDay: ChanIndex with zero days!" + +-- 1. The ChanIndex is contained in an IORef, which means that we can't have a +-- proper critical section around modifying it. This is fine because it's +-- all pure, so we just allocate a new ChanIndex each time. +-- 2. There being no critical section also means that having multiple writers +-- would be problematic. Fortunately, there is only one writer, so this +-- simple data structure is fine. data Index = Index !FilePath - !(Map Channel ChanIndex) + !(Map Channel (IORef ChanIndex)) !(Cache (Channel, YMD) (ByteString, VS.Vector Word32)) -- init @@ -49,33 +72,68 @@ initIndex basedir toimport = do c_start <- Clock.getTime Clock.Realtime items <- - fmap concat . forM (map chanNetwork toimport) $ \nwT -> do + forM toimport $ \(Channel nwT chT) -> do let nw = T.unpack nwT - forM [ch | Channel nwT' ch <- toimport, nwT' == nwT] $ \chT -> do - let ch = T.unpack chT - files <- listDirectory (basedir nw ch) - days <- fmap sort . forM files $ \fn -> do - let path = basedir nw ch fn - -- putStrLn $ "Parsing " ++ path ++ " (" ++ show (parseFileName fn) ++ " -> " ++ show (dateToDay (parseFileName fn)) ++ ")" - events <- parseLog <$> BS.readFile path - return (uncurry3 fromGregorian (parseFileName fn), length events) - let minday = minimum (map fst days) - maxday = maximum (map fst days) - ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1) - -- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days] - let countScan = VS.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days)) - let ntotal = sum (map snd days) - return (Channel nwT chT - ,ChanIndex - { ciStartDay = minday - , ciEndDay = maxday + ch = T.unpack chT + files <- listDirectory (basedir nw ch) + days <- fmap sort . forM files $ \fn -> do + let path = basedir nw ch fn + date = case parseFileName fn of + Just ymd -> ymdToGregorian ymd + Nothing -> error $ "Log file with unexpected file name: " ++ path + -- atomicPrintS $ "Parsing " ++ path ++ " (" ++ show date ++ " -> " ++ show (dateToDay date) ++ ")" + events <- parseLog <$> BS.readFile path + return (uncurry3 fromGregorian date, length events) + let minday = minimum (map fst days) + maxday = maximum (map fst days) + ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1) + -- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days] + let countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days)) + ntotal = sum (map snd days) + chanindex <- newIORef $ + ChanIndex { ciStartDay = minday , ciCountUntil = countScan - , ciTotal = ntotal}) + , ciTotal = ntotal } + return (Channel nwT chT, chanindex) c_end <- Clock.getTime Clock.Realtime let timetakenSecs = fromIntegral @_ @Double (Clock.toNanoSecs (Clock.diffTimeSpec c_start c_end)) / 1e9 - putStrLn $ "Parsing/indexing logs in " ++ show basedir ++ " took " ++ show timetakenSecs ++ " secs" + atomicPrintS $ "Parsing/indexing logs in " ++ show basedir ++ " took " ++ show timetakenSecs ++ " secs" + + let index = Index basedir (Map.fromList items) cache + + _ <- forkIO $ do + FN.withManager $ \mgr -> do + let predicate FN.Added{} = True + predicate FN.Modified{} = True + predicate FN.Removed{} = True + predicate FN.WatchedDirectoryRemoved{} = True + predicate _ = False + + handle deb _ FN.Added{} = debounceClick deb + handle deb chan ev@FN.Modified{} = do + atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev) + case parseFileName (takeFileName (FN.eventPath ev)) of + Just ymd -> do + cacheInvalidate cache (chan, ymd) + debounceClick deb + Nothing -> return () + handle _ chan ev@FN.Removed{} = do + atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev) + case parseFileName (takeFileName (FN.eventPath ev)) of + Just ymd -> do + cacheInvalidate cache (chan, ymd) + Nothing -> return () + handle _ _ ev@FN.WatchedDirectoryRemoved{} = -- not sure what to do here... + atomicPrintS $ "Directory " ++ FN.eventPath ev ++ " removed!" + handle _ _ _ = undefined - return (Index basedir (Map.fromList items) cache) + forM_ toimport $ \chan@(Channel nw ch) -> do + deb <- makeDebounce (nw <> T.pack "/" <> ch) 5.0 (indexUpdateImport index chan) + _ <- FN.watchDir mgr (basedir T.unpack nw T.unpack ch) predicate (handle deb chan) + atomicPrintS $ "Watching " <> basedir T.unpack nw T.unpack ch + let loop = threadDelay (round (1e9 :: Double)) >> loop in loop + + return index makeCounts :: [Day] -> [(Day, Int)] -> [Int] makeCounts [] [] = [] @@ -86,37 +144,86 @@ makeCounts (d:ds) ents@((d',n):ents') | otherwise = error $ "makeCounts: duplicate entry? " ++ show (d, toGregorian d, d', toGregorian d') makeCounts (_:ds) [] = 0 : makeCounts ds [] +indexUpdateImport :: Index -> Channel -> IO () +indexUpdateImport index@(Index _ mp _) chan = do + let ciRef = mp Map.! chan + ci <- readIORef ciRef + today <- utctDay <$> getCurrentTime + let todayish = max today (ciEndDay ci) + + readCounts <- fmap catMaybes . forM [-1 .. 1] $ \increment -> do + let day = increment `addDays` todayish + ymd = dayToYMD day + dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci) + + loadDay index chan ymd >>= \case + Just (_bs, lineStarts) -> return (Just (dayidx, VS.length lineStarts)) + Nothing -> return Nothing + + atomicPrint $ "Update import for " <> prettyChannel chan <> ": " <> T.show readCounts <> " (len = " <> T.show (IGV.length (ciCountUntil ci)) <> ")" + + when (not (null readCounts)) $ + atomicModifyIORef' ciRef $ \ci' -> + let ciRes = foldl' (\ci2 (dayidx, count) -> recordCount ci2 dayidx count) ci' readCounts + in (ciRes, ()) + where + -- | How many events do we already have on this day? + eventsOnDayIdx :: ChanIndex -> Int -> Int + eventsOnDayIdx ci dayidx + | dayidx == 0 = scan IGV.! 0 + | dayidx < IGV.length scan = scan IGV.! dayidx - scan IGV.! (dayidx - 1) + | otherwise = 0 + where scan = ciCountUntil ci + + recordCount :: ChanIndex -> Int -> Int -> ChanIndex + recordCount ci dayidx count + | count <= alreadyHave = ci + | otherwise = + let ciExt = -- Ensure that there is space in the scan vector for our record + let nExtraDays = dayidx - IGV.length (ciCountUntil ci) + 1 + lastCount | IGV.length (ciCountUntil ci) == 0 = 0 + | otherwise = ciCountUntil ci IGV.! (IGV.length (ciCountUntil ci) - 1) + in ci { ciCountUntil = iterate (`IGV.append` lastCount) (ciCountUntil ci) !! nExtraDays } + addFrom di n vec + | di < IGV.length vec = addFrom (di + 1) n (IGV.set vec di (vec IGV.! di + n)) + | otherwise = vec + in ciExt { ciCountUntil = addFrom dayidx (count - alreadyHave) (ciCountUntil ciExt) + , ciTotal = ciTotal ciExt + count - alreadyHave } + where alreadyHave = eventsOnDayIdx ci dayidx + -- search -- | Returns proper lazy list of events. Reading the files happens strictly, -- but parsing the events happens lazily. indexGetEventsLinear :: Index -> Channel -> Int -> Int -> IO [(YMDHMS, Event)] -indexGetEventsLinear index@(Index _ mp _) chan from count - | from + count < 0 = return [] - | from >= ciTotal ci = return [] - | otherwise = do - let scan = ciCountUntil ci - day1idx = binSearch scan from +indexGetEventsLinear index@(Index _ mp _) chan from count = do + ci <- readIORef (mp Map.! chan) + if from + count < 0 || from >= ciTotal ci + then return [] + else go ci (ciCountUntil ci) + where + go ci scan = do + let day1idx = binSearch scan from day1 = day1idx `addDays'` ciStartDay ci neventBeforeDay1 | day1idx == 0 = 0 - | otherwise = scan VS.! (day1idx - 1) - neventInclDay1 = scan VS.! day1idx + | otherwise = scan IGV.! (day1idx - 1) + neventInclDay1 = scan IGV.! day1idx neventOnDay1 = neventInclDay1 - neventBeforeDay1 off1 = from - neventBeforeDay1 -- day2 is inclusive, off2 is exclusive (day2, off2) | from + count <= neventInclDay1 = (day1, off1 + count) - | day1idx == VS.length scan - 1 = (day1, neventOnDay1) + | day1idx == IGV.length scan - 1 = (day1, neventOnDay1) | otherwise = let loop day2idx nbefore nseen | nseen + nOnDay2 >= count = (day2idx `addDays'` ciStartDay ci, count - nseen) - | day2idx == VS.length scan - 1 = + | day2idx == IGV.length scan - 1 = (day2idx `addDays'` ciStartDay ci, nOnDay2) | otherwise = - loop (day2idx + 1) (scan VS.! day2idx) (nseen + nOnDay2) + loop (day2idx + 1) (scan IGV.! day2idx) (nseen + nOnDay2) where - nOnDay2 = scan VS.! day2idx - nbefore + nOnDay2 = scan IGV.! day2idx - nbefore in loop (day1idx + 1) (neventBeforeDay1 + neventOnDay1) (neventOnDay1 - off1) -- traceM ("ci = " ++ show ci) @@ -129,26 +236,24 @@ indexGetEventsLinear index@(Index _ mp _) chan from count -- traceM ("day2 = " ++ show day2) -- traceM ("off2 = " ++ show off2) evs <- forM (zip [day1 .. day2] [day1idx..]) $ \(day, dayidx) -> - let neventsOnDay | dayidx == 0 = scan VS.! 0 - | otherwise = scan VS.! dayidx - scan VS.! (dayidx - 1) - parse + let neventsOnDay | dayidx == 0 = scan IGV.! 0 + | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1) + range | day == day1 = if day1 == day2 - then parseLogRange (off1, Just (off2 - off1)) - else parseLogRange (off1, Nothing) - | day == day2 = parseLogRange (0, Just off2) - | otherwise = \_lineStarts -> parseLog - (y, month, d) = toGregorian day - ymd = YMD (fromIntegral y) (fromIntegral month) (fromIntegral d) + then (off1, Just (off2 - off1)) + else (off1, Just (neventsOnDay - off1)) + | day == day2 = (0, Just off2) + | otherwise = (0, Just neventsOnDay) + ymd = ymdFromGregorian (toGregorian day) fixDate = map $ \(tod, ev) -> (YMDHMS ymd tod, ev) in if neventsOnDay > 0 - then do (bs, lineStarts) <- loadDay index chan ymd - return (fixDate (parse lineStarts bs)) + then loadDay index chan ymd >>= \case + Just (bs, lineStarts) -> return (fixDate (parseLogRange range lineStarts bs)) + Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file" else return [] return (concat evs) - where - ci = mp Map.! chan -- | The vector must be sorted. -- Returns index of the first element x such that needle < x. If there is no @@ -156,13 +261,13 @@ indexGetEventsLinear index@(Index _ mp _) chan from count -- the length of vec. -- -- TODO: proportional binary search -binSearch :: VS.Vector Int -> Int -> Int +binSearch :: IGV.ImmutGrowVector Int -> Int -> Int binSearch vec needle - | veclen == 0 || vec VS.! (veclen - 1) < needle = veclen - | needle < vec VS.! 0 = 0 + | veclen == 0 || vec IGV.! (veclen - 1) < needle = veclen + | needle < vec IGV.! 0 = 0 | otherwise = go 0 (veclen - 1) where - veclen = VS.length vec + veclen = IGV.length vec -- Invariant: vec[lo] <= needle < vec[hi] go :: Int -> Int -> Int @@ -170,62 +275,71 @@ binSearch vec needle | lo + 1 == hi = hi | otherwise = let mid = lo + (hi - lo) `div` 2 - in if vec VS.! mid <= needle + in if vec IGV.! mid <= needle then go mid hi else go lo mid -- other methods -indexNumEvents :: Index -> Channel -> Int -indexNumEvents (Index _ mp _) chan = ciTotal (mp Map.! chan) +indexNumEvents :: Index -> Channel -> IO Int +indexNumEvents (Index _ mp _) chan = ciTotal <$> readIORef (mp Map.! chan) -indexCalendar :: Index -> Channel -> ((Day, Day), [Int]) -indexCalendar (Index _ mp _) chan = - let ci = mp Map.! chan - in ((ciStartDay ci, ciEndDay ci) - ,[if i == 0 - then ciCountUntil ci VS.! 0 - else ciCountUntil ci VS.! i - ciCountUntil ci VS.! (i - 1) - | i <- [0 .. VS.length (ciCountUntil ci) - 1]]) +indexCalendar :: Index -> Channel -> IO ((Day, Day), [Int]) +indexCalendar (Index _ mp _) chan = do + ci <- readIORef (mp Map.! chan) + let scan = ciCountUntil ci + return ((ciStartDay ci, ciEndDay ci) + ,[if i == 0 + then scan IGV.! 0 + else scan IGV.! i - scan IGV.! (i - 1) + | i <- [0 .. IGV.length scan - 1]]) indexGetEventsDay :: Index -> Channel -> Day -> IO [(HMS, Event)] -indexGetEventsDay index@(Index _ mp _) chan day - | day < ciStartDay ci || day > ciEndDay ci = return [] - | otherwise = do - (bs, _lineStarts) <- loadDay index chan (dayToYMD day) - return (parseLog bs) - where - ci = mp Map.! chan +indexGetEventsDay index@(Index _ mp _) chan day = do + ci <- readIORef (mp Map.! chan) + if day < ciStartDay ci || day > ciEndDay ci + then return [] + else loadDay index chan (dayToYMD day) >>= \case + Just (bs, _lineStarts) -> return (parseLog bs) + Nothing -> return [] -- if the file doesn't exist, there ain't no events -- utilities -loadDay :: Index -> Channel -> YMD -> IO (ByteString, VS.Vector Word32) +loadDay :: Index -> Channel -> YMD -> IO (Maybe (ByteString, VS.Vector Word32)) loadDay (Index basedir _ cache) chan@(Channel network channel) ymd = do cacheLookup cache (chan, ymd) >>= \case Nothing -> do - bs <- mapFile (basedir T.unpack network T.unpack channel toFileName ymd) - let lineStarts = preparseLog bs - cacheAdd cache (chan, ymd) (bs, lineStarts) - return (bs, lineStarts) - Just (bs, lineStarts) -> return (bs, lineStarts) + mapFile (basedir T.unpack network T.unpack channel toFileName ymd) >>= \case + Just bs -> do + let lineStarts = preparseLog bs + cacheAdd cache (chan, ymd) (bs, lineStarts) + return (Just (bs, lineStarts)) + Nothing -> return Nothing -- file didn't exist + Just (bs, lineStarts) -> return (Just (bs, lineStarts)) -parseFileName :: String -> (Year, MonthOfYear, DayOfMonth) +parseFileName :: String -> Maybe YMD parseFileName name | (y, '-' : s1) <- splitAt 4 name , (m, '-' : s2) <- splitAt 2 s1 , (d, ".log") <- splitAt 2 s2 , all isDigit y, all isDigit m, all isDigit d - , let y' = read' y ; m' = read' m ; d' = read' d + , let y' = read' @Int y ; m' = read' @Int m ; d' = read' @Int d , 1 <= m', m' <= 12 - , 1 <= d', d' <= gregorianMonthLength y' m' - = (y', m', d') + , 1 <= d', d' <= gregorianMonthLength (fromIntegral @Int @Integer y') m' + = Just (YMD y' (fromIntegral @Int @Word8 m') (fromIntegral @Int @Word8 d')) | otherwise - = error $ "Invalid ZNC log file name: " ++ name + = Nothing where read' :: Read a => String -> a read' s = case readMaybe s of Just r -> r Nothing -> error $ "No read: " ++ show s +ymdToGregorian :: YMD -> (Year, MonthOfYear, DayOfMonth) +ymdToGregorian (YMD y m d) = (fromIntegral y, fromIntegral m, fromIntegral d) + +ymdFromGregorian :: (Year, MonthOfYear, DayOfMonth) -> YMD +ymdFromGregorian (y, m, d) = YMD (fromIntegral y) (fromIntegral m) (fromIntegral d) + toFileName :: YMD -> String toFileName ymd = ymdToString ymd ++ ".log" -- cgit v1.3