{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ViewPatterns #-} module Index ( Index, initIndex, indexGetEventsLinear, findEventIDLinear, indexGetEventsDay, indexNumEvents, indexCalendar, ) where import Prelude hiding (foldl') -- exported since GHC 9.10 (base 4.20) import Control.Applicative (empty) import Control.Concurrent import Control.Monad (forM, forM_, when, guard) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Maybe import Data.ByteString qualified as BS import Data.ByteString (ByteString) import Data.Char (isDigit, chr, ord) import Data.Functor ((<&>)) import Data.IORef import Data.List (sort, scanl', foldl', minimumBy) import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map import Data.Maybe (catMaybes) import Data.Ord (comparing) import Data.Text qualified as T import Data.Text (Text) import Data.Time import Data.Vector.Storable qualified as VS import Data.Word import Numeric (showIntAtBase) import System.Clock qualified as Clock import System.Directory import System.FilePath import System.FSNotify qualified as FN import Text.Read (readMaybe) import AtomicPrint import Debounce import Cache import Config (Channel(..), prettyChannel) import ImmutGrowVector qualified as IGV import Mmap import Util import ZNC data ChanIndex = ChanIndex { ciStartDay :: !Day , ciCountUntil :: !(IGV.ImmutGrowVector Int) -- ^ number of events up to and /including/ this day , ciTotal :: !Int } deriving (Show) -- | Inclusive. ciEndDay :: ChanIndex -> Day ciEndDay ci = let ndays = IGV.length (ciCountUntil ci) in if ndays > 0 then (ndays - 1) `addDays'` ciStartDay ci else error "ciEndDay: ChanIndex with zero days!" -- 1. The ChanIndex is contained in an IORef, which means that we can't have a -- proper critical section around modifying it. This is fine because it's -- all pure, so we just allocate a new ChanIndex each time. -- 2. There being no critical section also means that having multiple writers -- would be problematic. Fortunately, there is only one writer, so this -- simple data structure is fine. data Index = Index !FilePath !(Map Channel (IORef ChanIndex)) !(Cache (Channel, YMD) (ByteString, VS.Vector Word32)) type EventID = Text -- init initIndex :: FilePath -> [Channel] -> IO Index initIndex basedir toimport = do cache <- cacheNew 100 c_start <- Clock.getTime Clock.Realtime items <- forM toimport $ \(Channel nwT chT) -> do let nw = T.unpack nwT ch = T.unpack chT files <- listDirectory (basedir nw ch) days <- fmap sort . forM files $ \fn -> do let path = basedir nw ch fn date = case parseFileName fn of Just ymd -> ymdToGregorian ymd Nothing -> error $ "Log file with unexpected file name: " ++ path -- atomicPrintS $ "Parsing " ++ path ++ " (" ++ show date ++ " -> " ++ show (dateToDay date) ++ ")" events <- parseLog <$> BS.readFile path return (uncurry3 fromGregorian date, length events) let minday = minimum (map fst days) maxday = maximum (map fst days) ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1) -- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days] let countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days)) ntotal = sum (map snd days) chanindex <- newIORef $ ChanIndex { ciStartDay = minday , ciCountUntil = countScan , ciTotal = ntotal } return (Channel nwT chT, chanindex) c_end <- Clock.getTime Clock.Realtime let timetakenSecs = fromIntegral @_ @Double (Clock.toNanoSecs (Clock.diffTimeSpec c_start c_end)) / 1e9 atomicPrintS $ "Parsing/indexing logs in " ++ show basedir ++ " took " ++ show timetakenSecs ++ " secs" let index = Index basedir (Map.fromList items) cache _ <- forkIO $ do FN.withManager $ \mgr -> do let predicate FN.Added{} = True predicate FN.Modified{} = True predicate FN.Removed{} = True predicate FN.WatchedDirectoryRemoved{} = True predicate _ = False handle deb _ FN.Added{} = debounceClick deb handle deb chan ev@FN.Modified{} = do -- atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev) case parseFileName (takeFileName (FN.eventPath ev)) of Just ymd -> do cacheInvalidate cache (chan, ymd) debounceClick deb Nothing -> return () handle _ chan ev@FN.Removed{} = do -- atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev) case parseFileName (takeFileName (FN.eventPath ev)) of Just ymd -> do cacheInvalidate cache (chan, ymd) Nothing -> return () handle _ _ ev@FN.WatchedDirectoryRemoved{} = -- not sure what to do here... atomicPrintS $ "Directory " ++ FN.eventPath ev ++ " removed!" handle _ _ _ = undefined forM_ toimport $ \chan@(Channel nw ch) -> do deb <- makeDebounce (nw <> T.pack "/" <> ch) 5.0 (indexUpdateImport index chan) _ <- FN.watchDir mgr (basedir T.unpack nw T.unpack ch) predicate (handle deb chan) atomicPrintS $ "Watching " <> basedir T.unpack nw T.unpack ch let loop = threadDelay (round (1e9 :: Double)) >> loop in loop return index makeCounts :: [Day] -> [(Day, Int)] -> [Int] makeCounts [] [] = [] makeCounts [] _ = error "makeCounts: more entries than days in range" makeCounts (d:ds) ents@((d',n):ents') | d == d' = n : makeCounts ds ents' | d < d' = 0 : makeCounts ds ents | otherwise = error $ "makeCounts: duplicate entry? " ++ show (d, toGregorian d, d', toGregorian d') makeCounts (_:ds) [] = 0 : makeCounts ds [] indexUpdateImport :: Index -> Channel -> IO () indexUpdateImport index@(Index _ mp _) chan = do let ciRef = mp Map.! chan ci <- readIORef ciRef today <- utctDay <$> getCurrentTime let todayish = max today (ciEndDay ci) readCounts <- fmap catMaybes . forM [-1 .. 1] $ \increment -> do let day = increment `addDays` todayish ymd = dayToYMD day dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci) loadDay index chan ymd >>= \case Just (_bs, lineStarts) -> return (Just (dayidx, VS.length lineStarts)) Nothing -> return Nothing atomicPrint $ "Update import for " <> prettyChannel chan <> ": " <> T.show readCounts <> " (len = " <> T.show (IGV.length (ciCountUntil ci)) <> ")" when (not (null readCounts)) $ atomicModifyIORef' ciRef $ \ci' -> let ciRes = foldl' (\ci2 (dayidx, count) -> recordCount ci2 dayidx count) ci' readCounts in (ciRes, ()) where -- | How many events do we already have on this day? eventsOnDayIdx :: ChanIndex -> Int -> Int eventsOnDayIdx ci dayidx | dayidx == 0 = scan IGV.! 0 | dayidx < IGV.length scan = scan IGV.! dayidx - scan IGV.! (dayidx - 1) | otherwise = 0 where scan = ciCountUntil ci recordCount :: ChanIndex -> Int -> Int -> ChanIndex recordCount ci dayidx count | count <= alreadyHave = ci | otherwise = let ciExt = -- Ensure that there is space in the scan vector for our record let nExtraDays = dayidx - IGV.length (ciCountUntil ci) + 1 lastCount | IGV.length (ciCountUntil ci) == 0 = 0 | otherwise = ciCountUntil ci IGV.! (IGV.length (ciCountUntil ci) - 1) in ci { ciCountUntil = iterate (`IGV.append` lastCount) (ciCountUntil ci) !! nExtraDays } addFrom di n vec | di < IGV.length vec = addFrom (di + 1) n (IGV.set vec di (vec IGV.! di + n)) | otherwise = vec in ciExt { ciCountUntil = addFrom dayidx (count - alreadyHave) (ciCountUntil ciExt) , ciTotal = ciTotal ciExt + count - alreadyHave } where alreadyHave = eventsOnDayIdx ci dayidx -- search -- | Returns proper lazy list of events. Reading the files happens strictly, -- but parsing the events happens lazily. indexGetEventsLinear :: Index -> Channel -> Int -> Int -> IO [(YMDHMS, EventID, Event)] indexGetEventsLinear index@(Index _ mp _) chan from count = do ci <- readIORef (mp Map.! chan) if from + count < 0 || from >= ciTotal ci then return [] else go ci (ciCountUntil ci) where go ci scan = do let day1idx = binSearch scan from day1 = day1idx `addDays'` ciStartDay ci neventBeforeDay1 | day1idx == 0 = 0 | otherwise = scan IGV.! (day1idx - 1) neventInclDay1 = scan IGV.! day1idx neventOnDay1 = neventInclDay1 - neventBeforeDay1 off1 = from - neventBeforeDay1 -- day2 is inclusive, off2 is exclusive (day2, off2) | from + count <= neventInclDay1 = (day1, off1 + count) | day1idx == IGV.length scan - 1 = (day1, neventOnDay1) | otherwise = let loop day2idx nbefore nseen | nseen + nOnDay2 >= count = (day2idx `addDays'` ciStartDay ci, count - nseen) | day2idx == IGV.length scan - 1 = (day2idx `addDays'` ciStartDay ci, nOnDay2) | otherwise = loop (day2idx + 1) (scan IGV.! day2idx) (nseen + nOnDay2) where nOnDay2 = scan IGV.! day2idx - nbefore in loop (day1idx + 1) (neventBeforeDay1 + neventOnDay1) (neventOnDay1 - off1) -- traceM ("ci = " ++ show ci) -- traceM ("binSearch " ++ show from ++ " =") -- traceM (" " ++ show day1idx) -- traceM ("day1 = " ++ show day1) -- traceM ("off1 = " ++ show off1) -- traceM ("neventOnDay1 = " ++ show neventOnDay1) -- traceM ("count = " ++ show count) -- traceM ("day2 = " ++ show day2) -- traceM ("off2 = " ++ show off2) evs <- forM (zip [day1 .. day2] [day1idx..]) $ \(day, dayidx) -> let neventsOnDay | dayidx == 0 = scan IGV.! 0 | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1) range | day == day1 = if day1 == day2 then (off1, Just (off2 - off1)) else (off1, Just (neventsOnDay - off1)) | day == day2 = (0, Just off2) | otherwise = (0, Just neventsOnDay) ymd = ymdFromGregorian (toGregorian day) in if neventsOnDay > 0 then loadDay index chan ymd <&> \case Just (bs, lineStarts) -> let events = parseLogRange range lineStarts bs in zipWith (\(hms, ev) off -> (YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev)) events [fst range ..] Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file" else return [] return (concat evs) -- | The vector must be sorted. -- Returns index of the first element x such that needle < x. If there is no -- such element (i.e. needle is greater-equal the last element of vec), returns -- the length of vec. -- -- TODO: proportional binary search binSearch :: IGV.ImmutGrowVector Int -> Int -> Int binSearch vec needle | veclen == 0 || vec IGV.! (veclen - 1) < needle = veclen | needle < vec IGV.! 0 = 0 | otherwise = go 0 (veclen - 1) where veclen = IGV.length vec -- Invariant: vec[lo] <= needle < vec[hi] go :: Int -> Int -> Int go lo hi | lo + 1 == hi = hi | otherwise = let mid = lo + (hi - lo) `div` 2 in if vec IGV.! mid <= needle then go mid hi else go lo mid -- | If ID is found, returns: -- - Timestamp of the event -- - Index in the linear list of events for this channel -- - Index in the list of events in the channel on that day findEventIDLinear :: Index -> Channel -> EventID -> IO (Maybe (YMDHMS, Int, Int)) findEventIDLinear index@(Index _ mp _) chan eid = runMaybeT $ do (ymdhms@(YMDHMS ymd hms), idoff) <- hoistMaybe (parseEventID eid) day <- hoistMaybe (uncurry3 fromGregorianValid (ymdToGregorian ymd)) ci <- lift $ readIORef (mp Map.! chan) guard (ciStartDay ci <= day && day <= ciEndDay ci) (bs, lineStarts) <- MaybeT $ loadDay index chan ymd let candidates = map snd $ takeWhile ((== hms) . fst) $ dropWhile ((< hms) . fst) $ zip (parseLogTimesOnly lineStarts bs) [0..] case candidates of [] -> empty _ -> do let dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci) eventsBeforeDay | dayidx == 0 = 0 | otherwise = ciCountUntil ci IGV.! (dayidx - 1) let dayoff = minimumBy (comparing (\off -> abs (off - idoff))) candidates return (ymdhms, eventsBeforeDay + dayoff, dayoff) -- other methods indexNumEvents :: Index -> Channel -> IO Int indexNumEvents (Index _ mp _) chan = ciTotal <$> readIORef (mp Map.! chan) indexCalendar :: Index -> Channel -> IO ((Day, Day), [Int]) indexCalendar (Index _ mp _) chan = do ci <- readIORef (mp Map.! chan) let scan = ciCountUntil ci return ((ciStartDay ci, ciEndDay ci) ,[if i == 0 then scan IGV.! 0 else scan IGV.! i - scan IGV.! (i - 1) | i <- [0 .. IGV.length scan - 1]]) indexGetEventsDay :: Index -> Channel -> Day -> IO [(HMS, EventID, Event)] indexGetEventsDay index@(Index _ mp _) chan day = do ci <- readIORef (mp Map.! chan) let ymd = dayToYMD day if day < ciStartDay ci || day > ciEndDay ci then return [] else loadDay index chan ymd <&> \case Just (bs, _lineStarts) -> let events = parseLog bs in zipWith (\(hms, ev) off -> (hms, genEventID (YMDHMS ymd hms) off, ev)) events [0..] Nothing -> [] -- if the file doesn't exist, there ain't no events -- utilities loadDay :: Index -> Channel -> YMD -> IO (Maybe (ByteString, VS.Vector Word32)) loadDay (Index basedir _ cache) chan@(Channel network channel) ymd = do cacheLookup cache (chan, ymd) >>= \case Nothing -> do mapFile (basedir T.unpack network T.unpack channel toFileName ymd) >>= \case Just bs -> do let lineStarts = preparseLog bs cacheAdd cache (chan, ymd) (bs, lineStarts) return (Just (bs, lineStarts)) Nothing -> return Nothing -- file didn't exist Just (bs, lineStarts) -> return (Just (bs, lineStarts)) -- | Takes the index of the event in the day's log (the "offset") in addition -- to the timestamp, in order to disambiguate in case there are multiple events -- with the same timestamp. -- -- >>> genEventID (YMDHMS (YMD 2026 4 7) (HMS 18 56 55)) 123 -- "a5d9CtBVX" genEventID :: YMDHMS -> Int -> EventID genEventID (YMDHMS (YMD y m d) (HMS hh mm ss)) off -- An event ID is a mixed-radix number. -- Components: [offset, year, month, day, hour, minute, second] -- Radix: [ --, 5000, 12, 31, 24, 60, 60] -- Maximal offset is determined by: -- > ceiling (2 ** (64 - logBase 2 (5000 * 12 * 31 * 24 * 60 * 60))) -- 114787088 -- to fit the ID number in a Word64. -- Let's round that down conservatively to 100_000_000, i.e. 100 million events per day max. -- -- The result number is encoded in base62, and an 'a' is prefixed as an ID version identifier. | off >= 100_000_000 = error "Too many events per day" | y >= 5000 = error "You should have better tech at this point" | otherwise = let cast :: Integral a => a -> Word64 ; cast = fromIntegral num = (((((cast off * 5000 + cast y) * 12 + cast (m - 1)) * 31 + cast (d - 1)) * 24 + cast hh) * 60 + cast mm) * 60 + cast ss in T.pack ('a' : showIntAtBase 62 base62char num "") where base62char = chr . fromIntegral . (base62alphabet `BS.index`) base62alphabet = BS.pack (map (fromIntegral . ord) (['0'..'9'] ++ ['A'..'Z'] ++ ['a'..'z'])) -- >>> parseEventID "a5d9CtBVX" -- Just (YMDHMS (YMD 2026 4 7) (HMS 18 56 55),123) parseEventID :: EventID -> Maybe (YMDHMS, Int) parseEventID (T.uncons -> Just ('a', eid)) = do num <- multiply <$> mapM (fmap (fromIntegral @Int @Word64) . unbase62char) (T.unpack eid) let (num2, ss) = num `quotRem` 60 let (num3, mm) = num2 `quotRem` 60 let (num4, hh) = num3 `quotRem` 24 let (num5, d') = num4 `quotRem` 31 let (num6, m') = num5 `quotRem` 12 let (off , y ) = num6 `quotRem` 5000 let cast :: Num b => Word64 -> b ; cast = fromIntegral return (YMDHMS (YMD (cast y) (cast m' + 1) (cast d' + 1)) (HMS (cast hh) (cast mm) (cast ss)) ,cast off) where multiply = sum . map (uncurry (*)) . zip (iterate (*62) 1) . reverse unbase62char c | '0' <= c, c <= '9' = Just (ord c - ord '0') | 'A' <= c, c <= 'Z' = Just (ord c - ord 'A' + 10) | 'a' <= c, c <= 'z' = Just (ord c - ord 'a' + 36) | otherwise = Nothing parseEventID _ = Nothing parseFileName :: String -> Maybe YMD parseFileName name | (y, '-' : s1) <- splitAt 4 name , (m, '-' : s2) <- splitAt 2 s1 , (d, ".log") <- splitAt 2 s2 , all isDigit y, all isDigit m, all isDigit d , let y' = read' @Int y ; m' = read' @Int m ; d' = read' @Int d , 1 <= m', m' <= 12 , 1 <= d', d' <= gregorianMonthLength (fromIntegral @Int @Integer y') m' = Just (YMD y' (fromIntegral @Int @Word8 m') (fromIntegral @Int @Word8 d')) | otherwise = Nothing where read' :: Read a => String -> a read' s = case readMaybe s of Just r -> r Nothing -> error $ "No read: " ++ show s toFileName :: YMD -> String toFileName ymd = ymdToString ymd ++ ".log" addDays' :: Int -> Day -> Day addDays' = addDays . fromIntegral