diff options
Diffstat (limited to 'src/Index.hs')
| -rw-r--r-- | src/Index.hs | 291 |
1 files changed, 221 insertions, 70 deletions
diff --git a/src/Index.hs b/src/Index.hs index 94c3f9e..3703f65 100644 --- a/src/Index.hs +++ b/src/Index.hs @@ -1,10 +1,17 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeOperators #-} {-# LANGUAGE ViewPatterns #-} module Index ( Index, EventID, + CountKind(..), initIndex, indexGetEventsLinear, findEventIDLinear, @@ -25,14 +32,19 @@ import Data.ByteString (ByteString) import Data.Char (isDigit, chr, ord) import Data.Functor ((<&>)) import Data.IORef -import Data.List (sort, scanl', foldl', minimumBy) +import Data.List (sort, scanl', foldl', minimumBy, intercalate) import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map import Data.Maybe (catMaybes) +import Data.Monoid (Endo(..)) import Data.Ord (comparing) import Data.Text qualified as T import Data.Text (Text) import Data.Time +import Data.Vector.Generic qualified as VG +import Data.Vector.Generic.Mutable qualified as VGM +import Data.Vector.Unboxed qualified as VU +import Data.Vector.Unboxed.Base qualified as VU (Vector(V_2)) import Data.Vector.Storable qualified as VS import Data.Word import System.Clock qualified as Clock @@ -52,10 +64,62 @@ import Util import ZNC +-- This module keeps an index both for the full list of events, as well as a +-- /compressed/ list of events. Compression here means that sequences of events +-- with type not in a list of "important" events (see 'isImportant') are +-- compressed into a single pseudo-event, recording the existence of +-- unimportant events but not their details. This compression happens within a +-- single UTC day, so if there is a sequence of unimportant events crossing a +-- UTC day boundary, the compressed stream will have two compressed events. +-- Ideally these would also be compressed but that would complicate the code. +-- +-- The 'Counts' data type represents the number of events in a particular time +-- period, as counted in each of the tracked ways (two for now). + + +data CountKind = CKAll | CKCompressed + deriving (Show, Eq) + +data Counts = Counts { coAll :: {-# UNPACK #-} !Int + , coCmpr :: {-# UNPACK #-} !Int } + deriving (Show, Eq) + +instance Num Counts where + Counts a i + Counts a' i' = Counts (a + a') (i + i') + Counts a i - Counts a' i' = Counts (a - a') (i - i') + (*) = error "(*) on Counts" + abs = error "abs on Counts" + signum = error "signum on Counts" + fromInteger 0 = Counts 0 0 + fromInteger _ = error "non-zero fromInteger on Counts" + +instance VU.IsoUnbox Counts (Int, Int) where + toURepr (Counts a i) = (a, i) ; {-# INLINE toURepr #-} + fromURepr (a, i) = Counts a i ; {-# INLINE fromURepr #-} +newtype instance VU.MVector s Counts = MV_Counts (VU.MVector s (Int, Int)) +newtype instance VU.Vector Counts = V_Counts (VU.Vector (Int, Int)) +deriving via (Counts `VU.As` (Int, Int)) instance VGM.MVector VU.MVector Counts +deriving via (Counts `VU.As` (Int, Int)) instance VG.Vector VU.Vector Counts +instance VU.Unbox Counts + +getCount :: CountKind -> Counts -> Int +getCount CKAll = coAll +getCount CKCompressed = coCmpr + +getCountVector :: CountKind -> VU.Vector Counts -> VU.Vector Int +getCountVector kind (V_Counts (VU.V_2 _ vAll vCmpr)) = + case kind of + CKAll -> vAll + CKCompressed -> vCmpr + + +-- | The index for a single channel. data ChanIndex = ChanIndex { ciStartDay :: !Day - , ciCountUntil :: !(IGV.ImmutGrowVector Int) -- ^ number of events up to and /including/ this day - , ciTotal :: !Int } + , ciCountUntil :: !(IGV.ImmutGrowVector Counts) + -- ^ Number of events up to and /including/ this day. Values: + -- (all events, compressed events). + , ciTotal :: !Counts } deriving (Show) -- | Inclusive. @@ -87,6 +151,7 @@ initIndex basedir toimport = do c_start <- Clock.getTime Clock.Realtime items <- forM toimport $ \(Channel nwT chT) -> do + atomicPrint $ "Indexing " <> nwT <> "/" <> chT let nw = T.unpack nwT ch = T.unpack chT files <- listDirectory (basedir </> nw </> ch) @@ -98,13 +163,15 @@ initIndex basedir toimport = do -- atomicPrintS $ "Parsing " ++ path ++ " (" ++ show date ++ " -> " ++ show (dateToDay date) ++ ")" events <- parseLog <$> BS.readFile path let !nevents = length events - return (uncurry3 fromGregorian date, nevents) - let minday = minimum (map fst days) - maxday = maximum (map fst days) + !ccpr = countCompressed (map snd events) + return (uncurry3 fromGregorian date, nevents, ccpr) + let minday = minimum [day | (day, _, _) <- days] + maxday = maximum [day | (day, _, _) <- days] ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1) -- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days] - let countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days)) - ntotal = sum (map snd days) + let daysEvents = [(day, Counts nev ncompr) | (day, nev, ncompr) <- days] + countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] daysEvents)) + ntotal = sum [Counts nev ncompr | (_, nev, ncompr) <- days] chanindex <- newIORef $! ChanIndex { ciStartDay = minday , ciCountUntil = countScan @@ -150,7 +217,7 @@ initIndex basedir toimport = do return index -makeCounts :: [Day] -> [(Day, Int)] -> [Int] +makeCounts :: Num a => [Day] -> [(Day, a)] -> [a] makeCounts [] [] = [] makeCounts [] _ = error "makeCounts: more entries than days in range" makeCounts (d:ds) ents@((d',n):ents') @@ -159,6 +226,56 @@ makeCounts (d:ds) ents@((d',n):ents') | otherwise = error $ "makeCounts: duplicate entry? " ++ show (d, toGregorian d, d', toGregorian d') makeCounts (_:ds) [] = 0 : makeCounts ds [] +countCompressed :: [Event] -> Int +countCompressed = goI 0 + where + goI !n [] = n + goI n (e:es) + | isImportant e = goI (n+1) es + | otherwise = goU n es + + goU !n [] = n + 1 -- 1 for the unimportant sequence + goU n (e:es) + | isImportant e = goI (n+2) es -- 1 for the unimportant sequence, 1 for this important event + | otherwise = goU n es + +-- | Compresses every sequence of unimportant events down to one event, with +-- the tag of the first in the sequence. +compressEvents :: [(a, Event)] -> [(a, Event)] +compressEvents [] = [] +compressEvents events = + let (evsU, evs1) = break (isImportant . snd) events + (evsI, evs2) = span (isImportant . snd) evs1 + tl = evsI ++ compressEvents evs2 + in case evsU of + (tag, e) : ps -> (tag, compress (e : map snd ps)) : tl + [] -> tl + where + compress :: [Event] -> Event + compress = Compressed . describe . (`appEndo` CompressedCount 0 0 0 0) . foldMap (Endo . collect) + + collect :: Event -> CompressedCount -> CompressedCount + collect Join{} = \cc -> cc { ccJoins = ccJoins cc + 1 } + collect Part{} = \cc -> cc { ccLeaves = ccLeaves cc + 1 } + collect Quit{} = \cc -> cc { ccLeaves = ccLeaves cc + 1 } + collect Mode{} = \cc -> cc { ccModes = ccModes cc + 1 } + collect ParseError{} = \cc -> cc { ccParseErrors = ccParseErrors cc + 1 } + collect _ = id + + describe :: CompressedCount -> Text + describe (CompressedCount nj nl nm np) = + T.pack $ intercalate ", " $ + [show nj ++ " joined" | nj > 0] ++ + [show nl ++ " left" | nl > 0] ++ + [show nm ++ " set a mode" | nm > 0] ++ + [show np ++ " parse errors" | np > 0] + +data CompressedCount = CompressedCount + { ccJoins :: Int + , ccLeaves :: Int + , ccModes :: Int + , ccParseErrors :: Int } + indexUpdateImport :: Index -> Channel -> IO () indexUpdateImport index@(Index _ mp _) chan = do let ciRef = mp Map.! chan @@ -172,50 +289,55 @@ indexUpdateImport index@(Index _ mp _) chan = do dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci) loadDay index chan ymd >>= \case - Just (_bs, lineStarts) -> return (Just (dayidx, VS.length lineStarts)) + Just (bs, lineStarts) -> + return (Just (dayidx, Counts (VS.length lineStarts) (countCompressed (map snd (parseLog bs))))) Nothing -> return Nothing atomicPrint $ "Update import for " <> prettyChannel chan <> ": " <> T.show readCounts <> " (len = " <> T.show (IGV.length (ciCountUntil ci)) <> ")" when (not (null readCounts)) $ atomicModifyIORef' ciRef $ \ci' -> - let ciRes = foldl' (\ci2 (dayidx, count) -> recordCount ci2 dayidx count) ci' readCounts + let ciRes = foldl' (\ci2 (dayidx, counts) -> recordCounts ci2 dayidx counts) ci' readCounts in (ciRes, ()) where -- | How many events do we already have on this day? - eventsOnDayIdx :: ChanIndex -> Int -> Int + eventsOnDayIdx :: ChanIndex -> Int -> Counts eventsOnDayIdx ci dayidx | dayidx == 0 = scan IGV.! 0 | dayidx < IGV.length scan = scan IGV.! dayidx - scan IGV.! (dayidx - 1) | otherwise = 0 where scan = ciCountUntil ci - recordCount :: ChanIndex -> Int -> Int -> ChanIndex - recordCount ci dayidx count - | count <= alreadyHave = ci + recordCounts :: ChanIndex -> Int -> Counts -> ChanIndex + recordCounts ci dayidx counts + | counts == alreadyHave = ci | otherwise = - let ciExt = -- Ensure that there is space in the scan vector for our record - let nExtraDays = dayidx - IGV.length (ciCountUntil ci) + 1 - lastCount | IGV.length (ciCountUntil ci) == 0 = 0 - | otherwise = ciCountUntil ci IGV.! (IGV.length (ciCountUntil ci) - 1) + let ciExt = + -- Ensure that there is space in the scan vector for our record + -- by copying the last entry a few times + let currentLen = IGV.length (ciCountUntil ci) + nExtraDays = dayidx - currentLen + 1 + lastCount | currentLen == 0 = 0 + | otherwise = ciCountUntil ci IGV.! (currentLen - 1) in ci { ciCountUntil = iterate (`IGV.append` lastCount) (ciCountUntil ci) !! nExtraDays } - addFrom di n vec - | di < IGV.length vec = addFrom (di + 1) n (IGV.set vec di (vec IGV.! di + n)) - | otherwise = vec - in ciExt { ciCountUntil = addFrom dayidx (count - alreadyHave) (ciCountUntil ciExt) - , ciTotal = ciTotal ciExt + count - alreadyHave } + addFrom di n vec = + foldl' (\vec' di' -> IGV.set vec' di' (vec' IGV.! di' + n)) + vec [di .. IGV.length vec - 1] + in ciExt { ciCountUntil = addFrom dayidx (counts - alreadyHave) (ciCountUntil ciExt) + , ciTotal = ciTotal ciExt + counts - alreadyHave } where alreadyHave = eventsOnDayIdx ci dayidx -- search -- | Returns proper lazy list of events. Reading the files happens strictly, -- but parsing the events happens lazily. -indexGetEventsLinear :: Index -> Channel -> Int -> Int -> IO [(YMDHMS, EventID, Event)] -indexGetEventsLinear index@(Index _ mp _) chan from count = do +indexGetEventsLinear :: Index -> Channel -> CountKind -> Int -> Int -> IO [(YMDHMS, EventID, Event)] +indexGetEventsLinear index@(Index _ mp _) chan kind from count = do + atomicPrintS $ "indexGetEventsLinear " ++ show chan ++ " " ++ show kind ++ " " ++ show from ++ " " ++ show count ci <- readIORef (mp Map.! chan) - if from + count < 0 || from >= ciTotal ci + if from + count < 0 || from >= getCount kind (ciTotal ci) then return [] - else go ci (ciCountUntil ci) + else go ci (IGV.mapUVector (getCountVector kind) (ciCountUntil ci)) where go ci scan = do let day1idx = binSearch scan from @@ -241,34 +363,36 @@ indexGetEventsLinear index@(Index _ mp _) chan from count = do nOnDay2 = scan IGV.! day2idx - nbefore in loop (day1idx + 1) (neventBeforeDay1 + neventOnDay1) (neventOnDay1 - off1) - -- traceM ("ci = " ++ show ci) - -- traceM ("binSearch " ++ show from ++ " =") - -- traceM (" " ++ show day1idx) - -- traceM ("day1 = " ++ show day1) - -- traceM ("off1 = " ++ show off1) - -- traceM ("neventOnDay1 = " ++ show neventOnDay1) - -- traceM ("count = " ++ show count) - -- traceM ("day2 = " ++ show day2) - -- traceM ("off2 = " ++ show off2) + -- atomicPrintS ("ci = " ++ show ci) + -- atomicPrintS ("binSearch " ++ show from ++ " =") + -- atomicPrintS (" " ++ show day1idx) + -- atomicPrintS ("day1 = " ++ show day1) + -- atomicPrintS ("off1 = " ++ show off1) + -- atomicPrintS ("neventOnDay1 = " ++ show neventOnDay1) + -- atomicPrintS ("count = " ++ show count) + -- atomicPrintS ("day2 = " ++ show day2) + -- atomicPrintS ("off2 = " ++ show off2) evs <- forM (zip [day1 .. day2] [day1idx..]) $ \(day, dayidx) -> - let neventsOnDay | dayidx == 0 = scan IGV.! 0 - | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1) - range - | day == day1 = - if day1 == day2 - then (off1, Just (off2 - off1)) - else (off1, Just (neventsOnDay - off1)) - | day == day2 = (0, Just off2) - | otherwise = (0, Just neventsOnDay) - ymd = ymdFromGregorian (toGregorian day) - in if neventsOnDay > 0 - then loadDay index chan ymd <&> \case - Just (bs, lineStarts) -> - let events = parseLogRange range lineStarts bs - in zipWith (\(hms, ev) off -> (YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev)) - events [fst range ..] - Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file" - else return [] + let neventsOnDay | dayidx == 0 = scan IGV.! 0 + | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1) + rangeStart = if day == day1 then off1 else 0 + rangeEnd = if day == day2 then off2 else neventsOnDay + range = (rangeStart, Just (rangeEnd - rangeStart)) + ymd = ymdFromGregorian (toGregorian day) + in if neventsOnDay > 0 + then loadDay index chan ymd <&> \case + Just (bs, lineStarts) -> case kind of + CKAll -> + let events = parseLogRange range lineStarts bs + in [(YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev) + | ((hms, ev), off) <- zip events [rangeStart ..]] + CKCompressed -> + let events = parseLog bs + events' = take (rangeEnd - rangeStart) $ drop rangeStart $ + compressEvents [((hms, off), ev) | ((hms, ev), off) <- zip events [0..]] + in [(YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev) | ((hms, off), ev) <- events'] + Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file" + else return [] return (concat evs) @@ -299,8 +423,8 @@ binSearch vec needle -- - Timestamp of the event -- - Index in the linear list of events for this channel -- - Index in the list of events in the channel on that day -findEventIDLinear :: Index -> Channel -> EventID -> IO (Maybe (YMDHMS, Int, Int)) -findEventIDLinear index@(Index _ mp _) chan eid = runMaybeT $ do +findEventIDLinear :: Index -> Channel -> CountKind -> EventID -> IO (Maybe (YMDHMS, Int, Int)) +findEventIDLinear index@(Index _ mp _) chan kind eid = runMaybeT $ do (ymdhms@(YMDHMS ymd hms), idoff) <- hoistMaybe (parseEventID eid) day <- hoistMaybe (uncurry3 fromGregorianValid (ymdToGregorian ymd)) @@ -308,24 +432,29 @@ findEventIDLinear index@(Index _ mp _) chan eid = runMaybeT $ do guard (ciStartDay ci <= day && day <= ciEndDay ci) (bs, lineStarts) <- MaybeT $ loadDay index chan ymd - let candidates = + let candidates = -- [(event offset, index in possibly compressed event list)] map snd $ takeWhile ((== hms) . fst) $ dropWhile ((< hms) . fst) $ - zip (parseLogTimesOnly lineStarts bs) [0..] + case kind of + CKAll -> zip (parseLogTimesOnly lineStarts bs) (zip [0..] [0..]) + CKCompressed -> + let compressed = compressEvents [((hms', off), ev) + | ((hms', ev), off) <- zip (parseLog bs) [0..]] + in [(hms', (off, idx)) | (((hms', off), _ev), idx) <- zip compressed [0..]] case candidates of [] -> empty _ -> do let dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci) eventsBeforeDay | dayidx == 0 = 0 - | otherwise = ciCountUntil ci IGV.! (dayidx - 1) - let dayoff = minimumBy (comparing (\off -> abs (off - idoff))) candidates - return (ymdhms, eventsBeforeDay + dayoff, dayoff) + | otherwise = getCount kind (ciCountUntil ci IGV.! (dayidx - 1)) + let (_dayoff, daylistidx) = minimumBy (comparing (\(off, _listidx) -> abs (off - idoff))) candidates + return (ymdhms, eventsBeforeDay + daylistidx, daylistidx) -- other methods -indexNumEvents :: Index -> Channel -> IO Int -indexNumEvents (Index _ mp _) chan = ciTotal <$> readIORef (mp Map.! chan) +indexNumEvents :: Index -> Channel -> CountKind -> IO Int +indexNumEvents (Index _ mp _) chan kind = getCount kind . ciTotal <$> readIORef (mp Map.! chan) indexCalendar :: Index -> Channel -> IO ((Day, Day), [Int]) indexCalendar (Index _ mp _) chan = do @@ -333,12 +462,12 @@ indexCalendar (Index _ mp _) chan = do let scan = ciCountUntil ci return ((ciStartDay ci, ciEndDay ci) ,[if i == 0 - then scan IGV.! 0 - else scan IGV.! i - scan IGV.! (i - 1) + then coAll (scan IGV.! 0) + else coAll (scan IGV.! i) - coAll (scan IGV.! (i - 1)) | i <- [0 .. IGV.length scan - 1]]) -indexGetEventsDay :: Index -> Channel -> Day -> IO [(HMS, EventID, Event)] -indexGetEventsDay index@(Index _ mp _) chan day = do +indexGetEventsDay :: Index -> Channel -> CountKind -> Day -> IO [(HMS, EventID, Event)] +indexGetEventsDay index@(Index _ mp _) chan kind day = do ci <- readIORef (mp Map.! chan) let ymd = dayToYMD day if day < ciStartDay ci || day > ciEndDay ci @@ -346,8 +475,14 @@ indexGetEventsDay index@(Index _ mp _) chan day = do else loadDay index chan ymd <&> \case Just (bs, _lineStarts) -> let events = parseLog bs - in zipWith (\(hms, ev) off -> (hms, genEventID (YMDHMS ymd hms) off, ev)) - events [0..] + in case kind of + CKAll -> + [(hms, genEventID (YMDHMS ymd hms) off, ev) + | ((hms, ev), off) <- zip events [0..]] + CKCompressed -> + [(hms, genEventID (YMDHMS ymd hms) off, ev) + | ((hms, off), ev) <- compressEvents [((hms, off), ev) + | ((hms, ev), off) <- zip events [0..]]] Nothing -> [] -- if the file doesn't exist, there ain't no events -- utilities @@ -364,6 +499,22 @@ loadDay (Index basedir _ cache) chan@(Channel network channel) ymd = do Nothing -> return Nothing -- file didn't exist Just (bs, lineStarts) -> return (Just (bs, lineStarts)) +isImportant :: Event -> Bool +isImportant ReNick{} = True +isImportant Talk{} = True +isImportant Notice{} = True +isImportant Act{} = True +isImportant Kick{} = True +isImportant Topic{} = True + +isImportant Join{} = False +isImportant Part{} = False +isImportant Quit{} = False +isImportant Mode{} = False +isImportant ParseError{} = False + +isImportant Compressed{} = error "isImportant: why am I getting a compressed event" + -- | Takes the index of the event in the day's log (the "offset") in addition -- to the timestamp, in order to disambiguate in case there are multiple events -- with the same timestamp. |
