summaryrefslogtreecommitdiff
path: root/src/Index.hs
diff options
context:
space:
mode:
authorTom Smeding <tom@tomsmeding.com>2026-06-28 12:47:28 +0200
committerTom Smeding <tom@tomsmeding.com>2026-06-28 12:50:26 +0200
commit352f64c7171cf62f2e1a7578fb8e786dead90d9f (patch)
tree2fcb0663f1509b2fe5d1f2533f1e8859ddac36ad /src/Index.hs
parent08e042b949ca358a86c256d137379e76f3881bfc (diff)
Prototype compressed event listing
Diffstat (limited to 'src/Index.hs')
-rw-r--r--src/Index.hs291
1 files changed, 221 insertions, 70 deletions
diff --git a/src/Index.hs b/src/Index.hs
index 94c3f9e..3703f65 100644
--- a/src/Index.hs
+++ b/src/Index.hs
@@ -1,10 +1,17 @@
{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE DerivingVia #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE StandaloneDeriving #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE ViewPatterns #-}
module Index (
Index,
EventID,
+ CountKind(..),
initIndex,
indexGetEventsLinear,
findEventIDLinear,
@@ -25,14 +32,19 @@ import Data.ByteString (ByteString)
import Data.Char (isDigit, chr, ord)
import Data.Functor ((<&>))
import Data.IORef
-import Data.List (sort, scanl', foldl', minimumBy)
+import Data.List (sort, scanl', foldl', minimumBy, intercalate)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (catMaybes)
+import Data.Monoid (Endo(..))
import Data.Ord (comparing)
import Data.Text qualified as T
import Data.Text (Text)
import Data.Time
+import Data.Vector.Generic qualified as VG
+import Data.Vector.Generic.Mutable qualified as VGM
+import Data.Vector.Unboxed qualified as VU
+import Data.Vector.Unboxed.Base qualified as VU (Vector(V_2))
import Data.Vector.Storable qualified as VS
import Data.Word
import System.Clock qualified as Clock
@@ -52,10 +64,62 @@ import Util
import ZNC
+-- This module keeps an index both for the full list of events, as well as a
+-- /compressed/ list of events. Compression here means that sequences of events
+-- with type not in a list of "important" events (see 'isImportant') are
+-- compressed into a single pseudo-event, recording the existence of
+-- unimportant events but not their details. This compression happens within a
+-- single UTC day, so if there is a sequence of unimportant events crossing a
+-- UTC day boundary, the compressed stream will have two compressed events.
+-- Ideally these would also be compressed but that would complicate the code.
+--
+-- The 'Counts' data type represents the number of events in a particular time
+-- period, as counted in each of the tracked ways (two for now).
+
+
+data CountKind = CKAll | CKCompressed
+ deriving (Show, Eq)
+
+data Counts = Counts { coAll :: {-# UNPACK #-} !Int
+ , coCmpr :: {-# UNPACK #-} !Int }
+ deriving (Show, Eq)
+
+instance Num Counts where
+ Counts a i + Counts a' i' = Counts (a + a') (i + i')
+ Counts a i - Counts a' i' = Counts (a - a') (i - i')
+ (*) = error "(*) on Counts"
+ abs = error "abs on Counts"
+ signum = error "signum on Counts"
+ fromInteger 0 = Counts 0 0
+ fromInteger _ = error "non-zero fromInteger on Counts"
+
+instance VU.IsoUnbox Counts (Int, Int) where
+ toURepr (Counts a i) = (a, i) ; {-# INLINE toURepr #-}
+ fromURepr (a, i) = Counts a i ; {-# INLINE fromURepr #-}
+newtype instance VU.MVector s Counts = MV_Counts (VU.MVector s (Int, Int))
+newtype instance VU.Vector Counts = V_Counts (VU.Vector (Int, Int))
+deriving via (Counts `VU.As` (Int, Int)) instance VGM.MVector VU.MVector Counts
+deriving via (Counts `VU.As` (Int, Int)) instance VG.Vector VU.Vector Counts
+instance VU.Unbox Counts
+
+getCount :: CountKind -> Counts -> Int
+getCount CKAll = coAll
+getCount CKCompressed = coCmpr
+
+getCountVector :: CountKind -> VU.Vector Counts -> VU.Vector Int
+getCountVector kind (V_Counts (VU.V_2 _ vAll vCmpr)) =
+ case kind of
+ CKAll -> vAll
+ CKCompressed -> vCmpr
+
+
+-- | The index for a single channel.
data ChanIndex = ChanIndex
{ ciStartDay :: !Day
- , ciCountUntil :: !(IGV.ImmutGrowVector Int) -- ^ number of events up to and /including/ this day
- , ciTotal :: !Int }
+ , ciCountUntil :: !(IGV.ImmutGrowVector Counts)
+ -- ^ Number of events up to and /including/ this day. Values:
+ -- (all events, compressed events).
+ , ciTotal :: !Counts }
deriving (Show)
-- | Inclusive.
@@ -87,6 +151,7 @@ initIndex basedir toimport = do
c_start <- Clock.getTime Clock.Realtime
items <-
forM toimport $ \(Channel nwT chT) -> do
+ atomicPrint $ "Indexing " <> nwT <> "/" <> chT
let nw = T.unpack nwT
ch = T.unpack chT
files <- listDirectory (basedir </> nw </> ch)
@@ -98,13 +163,15 @@ initIndex basedir toimport = do
-- atomicPrintS $ "Parsing " ++ path ++ " (" ++ show date ++ " -> " ++ show (dateToDay date) ++ ")"
events <- parseLog <$> BS.readFile path
let !nevents = length events
- return (uncurry3 fromGregorian date, nevents)
- let minday = minimum (map fst days)
- maxday = maximum (map fst days)
+ !ccpr = countCompressed (map snd events)
+ return (uncurry3 fromGregorian date, nevents, ccpr)
+ let minday = minimum [day | (day, _, _) <- days]
+ maxday = maximum [day | (day, _, _) <- days]
ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1)
-- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days]
- let countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days))
- ntotal = sum (map snd days)
+ let daysEvents = [(day, Counts nev ncompr) | (day, nev, ncompr) <- days]
+ countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] daysEvents))
+ ntotal = sum [Counts nev ncompr | (_, nev, ncompr) <- days]
chanindex <- newIORef $!
ChanIndex { ciStartDay = minday
, ciCountUntil = countScan
@@ -150,7 +217,7 @@ initIndex basedir toimport = do
return index
-makeCounts :: [Day] -> [(Day, Int)] -> [Int]
+makeCounts :: Num a => [Day] -> [(Day, a)] -> [a]
makeCounts [] [] = []
makeCounts [] _ = error "makeCounts: more entries than days in range"
makeCounts (d:ds) ents@((d',n):ents')
@@ -159,6 +226,56 @@ makeCounts (d:ds) ents@((d',n):ents')
| otherwise = error $ "makeCounts: duplicate entry? " ++ show (d, toGregorian d, d', toGregorian d')
makeCounts (_:ds) [] = 0 : makeCounts ds []
+countCompressed :: [Event] -> Int
+countCompressed = goI 0
+ where
+ goI !n [] = n
+ goI n (e:es)
+ | isImportant e = goI (n+1) es
+ | otherwise = goU n es
+
+ goU !n [] = n + 1 -- 1 for the unimportant sequence
+ goU n (e:es)
+ | isImportant e = goI (n+2) es -- 1 for the unimportant sequence, 1 for this important event
+ | otherwise = goU n es
+
+-- | Compresses every sequence of unimportant events down to one event, with
+-- the tag of the first in the sequence.
+compressEvents :: [(a, Event)] -> [(a, Event)]
+compressEvents [] = []
+compressEvents events =
+ let (evsU, evs1) = break (isImportant . snd) events
+ (evsI, evs2) = span (isImportant . snd) evs1
+ tl = evsI ++ compressEvents evs2
+ in case evsU of
+ (tag, e) : ps -> (tag, compress (e : map snd ps)) : tl
+ [] -> tl
+ where
+ compress :: [Event] -> Event
+ compress = Compressed . describe . (`appEndo` CompressedCount 0 0 0 0) . foldMap (Endo . collect)
+
+ collect :: Event -> CompressedCount -> CompressedCount
+ collect Join{} = \cc -> cc { ccJoins = ccJoins cc + 1 }
+ collect Part{} = \cc -> cc { ccLeaves = ccLeaves cc + 1 }
+ collect Quit{} = \cc -> cc { ccLeaves = ccLeaves cc + 1 }
+ collect Mode{} = \cc -> cc { ccModes = ccModes cc + 1 }
+ collect ParseError{} = \cc -> cc { ccParseErrors = ccParseErrors cc + 1 }
+ collect _ = id
+
+ describe :: CompressedCount -> Text
+ describe (CompressedCount nj nl nm np) =
+ T.pack $ intercalate ", " $
+ [show nj ++ " joined" | nj > 0] ++
+ [show nl ++ " left" | nl > 0] ++
+ [show nm ++ " set a mode" | nm > 0] ++
+ [show np ++ " parse errors" | np > 0]
+
+data CompressedCount = CompressedCount
+ { ccJoins :: Int
+ , ccLeaves :: Int
+ , ccModes :: Int
+ , ccParseErrors :: Int }
+
indexUpdateImport :: Index -> Channel -> IO ()
indexUpdateImport index@(Index _ mp _) chan = do
let ciRef = mp Map.! chan
@@ -172,50 +289,55 @@ indexUpdateImport index@(Index _ mp _) chan = do
dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci)
loadDay index chan ymd >>= \case
- Just (_bs, lineStarts) -> return (Just (dayidx, VS.length lineStarts))
+ Just (bs, lineStarts) ->
+ return (Just (dayidx, Counts (VS.length lineStarts) (countCompressed (map snd (parseLog bs)))))
Nothing -> return Nothing
atomicPrint $ "Update import for " <> prettyChannel chan <> ": " <> T.show readCounts <> " (len = " <> T.show (IGV.length (ciCountUntil ci)) <> ")"
when (not (null readCounts)) $
atomicModifyIORef' ciRef $ \ci' ->
- let ciRes = foldl' (\ci2 (dayidx, count) -> recordCount ci2 dayidx count) ci' readCounts
+ let ciRes = foldl' (\ci2 (dayidx, counts) -> recordCounts ci2 dayidx counts) ci' readCounts
in (ciRes, ())
where
-- | How many events do we already have on this day?
- eventsOnDayIdx :: ChanIndex -> Int -> Int
+ eventsOnDayIdx :: ChanIndex -> Int -> Counts
eventsOnDayIdx ci dayidx
| dayidx == 0 = scan IGV.! 0
| dayidx < IGV.length scan = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
| otherwise = 0
where scan = ciCountUntil ci
- recordCount :: ChanIndex -> Int -> Int -> ChanIndex
- recordCount ci dayidx count
- | count <= alreadyHave = ci
+ recordCounts :: ChanIndex -> Int -> Counts -> ChanIndex
+ recordCounts ci dayidx counts
+ | counts == alreadyHave = ci
| otherwise =
- let ciExt = -- Ensure that there is space in the scan vector for our record
- let nExtraDays = dayidx - IGV.length (ciCountUntil ci) + 1
- lastCount | IGV.length (ciCountUntil ci) == 0 = 0
- | otherwise = ciCountUntil ci IGV.! (IGV.length (ciCountUntil ci) - 1)
+ let ciExt =
+ -- Ensure that there is space in the scan vector for our record
+ -- by copying the last entry a few times
+ let currentLen = IGV.length (ciCountUntil ci)
+ nExtraDays = dayidx - currentLen + 1
+ lastCount | currentLen == 0 = 0
+ | otherwise = ciCountUntil ci IGV.! (currentLen - 1)
in ci { ciCountUntil = iterate (`IGV.append` lastCount) (ciCountUntil ci) !! nExtraDays }
- addFrom di n vec
- | di < IGV.length vec = addFrom (di + 1) n (IGV.set vec di (vec IGV.! di + n))
- | otherwise = vec
- in ciExt { ciCountUntil = addFrom dayidx (count - alreadyHave) (ciCountUntil ciExt)
- , ciTotal = ciTotal ciExt + count - alreadyHave }
+ addFrom di n vec =
+ foldl' (\vec' di' -> IGV.set vec' di' (vec' IGV.! di' + n))
+ vec [di .. IGV.length vec - 1]
+ in ciExt { ciCountUntil = addFrom dayidx (counts - alreadyHave) (ciCountUntil ciExt)
+ , ciTotal = ciTotal ciExt + counts - alreadyHave }
where alreadyHave = eventsOnDayIdx ci dayidx
-- search
-- | Returns proper lazy list of events. Reading the files happens strictly,
-- but parsing the events happens lazily.
-indexGetEventsLinear :: Index -> Channel -> Int -> Int -> IO [(YMDHMS, EventID, Event)]
-indexGetEventsLinear index@(Index _ mp _) chan from count = do
+indexGetEventsLinear :: Index -> Channel -> CountKind -> Int -> Int -> IO [(YMDHMS, EventID, Event)]
+indexGetEventsLinear index@(Index _ mp _) chan kind from count = do
+ atomicPrintS $ "indexGetEventsLinear " ++ show chan ++ " " ++ show kind ++ " " ++ show from ++ " " ++ show count
ci <- readIORef (mp Map.! chan)
- if from + count < 0 || from >= ciTotal ci
+ if from + count < 0 || from >= getCount kind (ciTotal ci)
then return []
- else go ci (ciCountUntil ci)
+ else go ci (IGV.mapUVector (getCountVector kind) (ciCountUntil ci))
where
go ci scan = do
let day1idx = binSearch scan from
@@ -241,34 +363,36 @@ indexGetEventsLinear index@(Index _ mp _) chan from count = do
nOnDay2 = scan IGV.! day2idx - nbefore
in loop (day1idx + 1) (neventBeforeDay1 + neventOnDay1) (neventOnDay1 - off1)
- -- traceM ("ci = " ++ show ci)
- -- traceM ("binSearch " ++ show from ++ " =")
- -- traceM (" " ++ show day1idx)
- -- traceM ("day1 = " ++ show day1)
- -- traceM ("off1 = " ++ show off1)
- -- traceM ("neventOnDay1 = " ++ show neventOnDay1)
- -- traceM ("count = " ++ show count)
- -- traceM ("day2 = " ++ show day2)
- -- traceM ("off2 = " ++ show off2)
+ -- atomicPrintS ("ci = " ++ show ci)
+ -- atomicPrintS ("binSearch " ++ show from ++ " =")
+ -- atomicPrintS (" " ++ show day1idx)
+ -- atomicPrintS ("day1 = " ++ show day1)
+ -- atomicPrintS ("off1 = " ++ show off1)
+ -- atomicPrintS ("neventOnDay1 = " ++ show neventOnDay1)
+ -- atomicPrintS ("count = " ++ show count)
+ -- atomicPrintS ("day2 = " ++ show day2)
+ -- atomicPrintS ("off2 = " ++ show off2)
evs <- forM (zip [day1 .. day2] [day1idx..]) $ \(day, dayidx) ->
- let neventsOnDay | dayidx == 0 = scan IGV.! 0
- | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
- range
- | day == day1 =
- if day1 == day2
- then (off1, Just (off2 - off1))
- else (off1, Just (neventsOnDay - off1))
- | day == day2 = (0, Just off2)
- | otherwise = (0, Just neventsOnDay)
- ymd = ymdFromGregorian (toGregorian day)
- in if neventsOnDay > 0
- then loadDay index chan ymd <&> \case
- Just (bs, lineStarts) ->
- let events = parseLogRange range lineStarts bs
- in zipWith (\(hms, ev) off -> (YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev))
- events [fst range ..]
- Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file"
- else return []
+ let neventsOnDay | dayidx == 0 = scan IGV.! 0
+ | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
+ rangeStart = if day == day1 then off1 else 0
+ rangeEnd = if day == day2 then off2 else neventsOnDay
+ range = (rangeStart, Just (rangeEnd - rangeStart))
+ ymd = ymdFromGregorian (toGregorian day)
+ in if neventsOnDay > 0
+ then loadDay index chan ymd <&> \case
+ Just (bs, lineStarts) -> case kind of
+ CKAll ->
+ let events = parseLogRange range lineStarts bs
+ in [(YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev)
+ | ((hms, ev), off) <- zip events [rangeStart ..]]
+ CKCompressed ->
+ let events = parseLog bs
+ events' = take (rangeEnd - rangeStart) $ drop rangeStart $
+ compressEvents [((hms, off), ev) | ((hms, ev), off) <- zip events [0..]]
+ in [(YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev) | ((hms, off), ev) <- events']
+ Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file"
+ else return []
return (concat evs)
@@ -299,8 +423,8 @@ binSearch vec needle
-- - Timestamp of the event
-- - Index in the linear list of events for this channel
-- - Index in the list of events in the channel on that day
-findEventIDLinear :: Index -> Channel -> EventID -> IO (Maybe (YMDHMS, Int, Int))
-findEventIDLinear index@(Index _ mp _) chan eid = runMaybeT $ do
+findEventIDLinear :: Index -> Channel -> CountKind -> EventID -> IO (Maybe (YMDHMS, Int, Int))
+findEventIDLinear index@(Index _ mp _) chan kind eid = runMaybeT $ do
(ymdhms@(YMDHMS ymd hms), idoff) <- hoistMaybe (parseEventID eid)
day <- hoistMaybe (uncurry3 fromGregorianValid (ymdToGregorian ymd))
@@ -308,24 +432,29 @@ findEventIDLinear index@(Index _ mp _) chan eid = runMaybeT $ do
guard (ciStartDay ci <= day && day <= ciEndDay ci)
(bs, lineStarts) <- MaybeT $ loadDay index chan ymd
- let candidates =
+ let candidates = -- [(event offset, index in possibly compressed event list)]
map snd $
takeWhile ((== hms) . fst) $
dropWhile ((< hms) . fst) $
- zip (parseLogTimesOnly lineStarts bs) [0..]
+ case kind of
+ CKAll -> zip (parseLogTimesOnly lineStarts bs) (zip [0..] [0..])
+ CKCompressed ->
+ let compressed = compressEvents [((hms', off), ev)
+ | ((hms', ev), off) <- zip (parseLog bs) [0..]]
+ in [(hms', (off, idx)) | (((hms', off), _ev), idx) <- zip compressed [0..]]
case candidates of
[] -> empty
_ -> do
let dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci)
eventsBeforeDay | dayidx == 0 = 0
- | otherwise = ciCountUntil ci IGV.! (dayidx - 1)
- let dayoff = minimumBy (comparing (\off -> abs (off - idoff))) candidates
- return (ymdhms, eventsBeforeDay + dayoff, dayoff)
+ | otherwise = getCount kind (ciCountUntil ci IGV.! (dayidx - 1))
+ let (_dayoff, daylistidx) = minimumBy (comparing (\(off, _listidx) -> abs (off - idoff))) candidates
+ return (ymdhms, eventsBeforeDay + daylistidx, daylistidx)
-- other methods
-indexNumEvents :: Index -> Channel -> IO Int
-indexNumEvents (Index _ mp _) chan = ciTotal <$> readIORef (mp Map.! chan)
+indexNumEvents :: Index -> Channel -> CountKind -> IO Int
+indexNumEvents (Index _ mp _) chan kind = getCount kind . ciTotal <$> readIORef (mp Map.! chan)
indexCalendar :: Index -> Channel -> IO ((Day, Day), [Int])
indexCalendar (Index _ mp _) chan = do
@@ -333,12 +462,12 @@ indexCalendar (Index _ mp _) chan = do
let scan = ciCountUntil ci
return ((ciStartDay ci, ciEndDay ci)
,[if i == 0
- then scan IGV.! 0
- else scan IGV.! i - scan IGV.! (i - 1)
+ then coAll (scan IGV.! 0)
+ else coAll (scan IGV.! i) - coAll (scan IGV.! (i - 1))
| i <- [0 .. IGV.length scan - 1]])
-indexGetEventsDay :: Index -> Channel -> Day -> IO [(HMS, EventID, Event)]
-indexGetEventsDay index@(Index _ mp _) chan day = do
+indexGetEventsDay :: Index -> Channel -> CountKind -> Day -> IO [(HMS, EventID, Event)]
+indexGetEventsDay index@(Index _ mp _) chan kind day = do
ci <- readIORef (mp Map.! chan)
let ymd = dayToYMD day
if day < ciStartDay ci || day > ciEndDay ci
@@ -346,8 +475,14 @@ indexGetEventsDay index@(Index _ mp _) chan day = do
else loadDay index chan ymd <&> \case
Just (bs, _lineStarts) ->
let events = parseLog bs
- in zipWith (\(hms, ev) off -> (hms, genEventID (YMDHMS ymd hms) off, ev))
- events [0..]
+ in case kind of
+ CKAll ->
+ [(hms, genEventID (YMDHMS ymd hms) off, ev)
+ | ((hms, ev), off) <- zip events [0..]]
+ CKCompressed ->
+ [(hms, genEventID (YMDHMS ymd hms) off, ev)
+ | ((hms, off), ev) <- compressEvents [((hms, off), ev)
+ | ((hms, ev), off) <- zip events [0..]]]
Nothing -> [] -- if the file doesn't exist, there ain't no events
-- utilities
@@ -364,6 +499,22 @@ loadDay (Index basedir _ cache) chan@(Channel network channel) ymd = do
Nothing -> return Nothing -- file didn't exist
Just (bs, lineStarts) -> return (Just (bs, lineStarts))
+isImportant :: Event -> Bool
+isImportant ReNick{} = True
+isImportant Talk{} = True
+isImportant Notice{} = True
+isImportant Act{} = True
+isImportant Kick{} = True
+isImportant Topic{} = True
+
+isImportant Join{} = False
+isImportant Part{} = False
+isImportant Quit{} = False
+isImportant Mode{} = False
+isImportant ParseError{} = False
+
+isImportant Compressed{} = error "isImportant: why am I getting a compressed event"
+
-- | Takes the index of the event in the day's log (the "offset") in addition
-- to the timestamp, in order to disambiguate in case there are multiple events
-- with the same timestamp.