1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
|
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}
module Index (
Index,
initIndex,
indexGetEventsLinear,
findEventIDLinear,
indexGetEventsDay,
indexNumEvents,
indexCalendar,
) where
import Prelude hiding (foldl') -- exported since GHC 9.10 (base 4.20)
import Control.Applicative (empty)
import Control.Concurrent
import Control.Monad (forM, forM_, when, guard)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe
import Data.ByteString qualified as BS
import Data.ByteString (ByteString)
import Data.Char (isDigit, chr, ord)
import Data.Functor ((<&>))
import Data.IORef
import Data.List (sort, scanl', foldl', minimumBy)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (catMaybes)
import Data.Ord (comparing)
import Data.Text qualified as T
import Data.Text (Text)
import Data.Time
import Data.Vector.Storable qualified as VS
import Data.Word
import Numeric (showIntAtBase)
import System.Clock qualified as Clock
import System.Directory
import System.FilePath
import System.FSNotify qualified as FN
import Text.Read (readMaybe)
import AtomicPrint
import Debounce
import Cache
import Config (Channel(..), prettyChannel)
import ImmutGrowVector qualified as IGV
import Mmap
import Util
import ZNC
data ChanIndex = ChanIndex
{ ciStartDay :: !Day
, ciCountUntil :: !(IGV.ImmutGrowVector Int) -- ^ number of events up to and /including/ this day
, ciTotal :: !Int }
deriving (Show)
-- | Inclusive.
ciEndDay :: ChanIndex -> Day
ciEndDay ci =
let ndays = IGV.length (ciCountUntil ci)
in if ndays > 0
then (ndays - 1) `addDays'` ciStartDay ci
else error "ciEndDay: ChanIndex with zero days!"
-- 1. The ChanIndex is contained in an IORef, which means that we can't have a
-- proper critical section around modifying it. This is fine because it's
-- all pure, so we just allocate a new ChanIndex each time.
-- 2. There being no critical section also means that having multiple writers
-- would be problematic. Fortunately, there is only one writer, so this
-- simple data structure is fine.
data Index = Index !FilePath
!(Map Channel (IORef ChanIndex))
!(Cache (Channel, YMD) (ByteString, VS.Vector Word32))
type EventID = Text
-- init
initIndex :: FilePath -> [Channel] -> IO Index
initIndex basedir toimport = do
cache <- cacheNew 100
c_start <- Clock.getTime Clock.Realtime
items <-
forM toimport $ \(Channel nwT chT) -> do
let nw = T.unpack nwT
ch = T.unpack chT
files <- listDirectory (basedir </> nw </> ch)
days <- fmap sort . forM files $ \fn -> do
let path = basedir </> nw </> ch </> fn
date = case parseFileName fn of
Just ymd -> ymdToGregorian ymd
Nothing -> error $ "Log file with unexpected file name: " ++ path
-- atomicPrintS $ "Parsing " ++ path ++ " (" ++ show date ++ " -> " ++ show (dateToDay date) ++ ")"
events <- parseLog <$> BS.readFile path
return (uncurry3 fromGregorian date, length events)
let minday = minimum (map fst days)
maxday = maximum (map fst days)
ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1)
-- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days]
let countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days))
ntotal = sum (map snd days)
chanindex <- newIORef $
ChanIndex { ciStartDay = minday
, ciCountUntil = countScan
, ciTotal = ntotal }
return (Channel nwT chT, chanindex)
c_end <- Clock.getTime Clock.Realtime
let timetakenSecs = fromIntegral @_ @Double (Clock.toNanoSecs (Clock.diffTimeSpec c_start c_end)) / 1e9
atomicPrintS $ "Parsing/indexing logs in " ++ show basedir ++ " took " ++ show timetakenSecs ++ " secs"
let index = Index basedir (Map.fromList items) cache
_ <- forkIO $ do
FN.withManager $ \mgr -> do
let predicate FN.Added{} = True
predicate FN.Modified{} = True
predicate FN.Removed{} = True
predicate FN.WatchedDirectoryRemoved{} = True
predicate _ = False
handle deb _ FN.Added{} = debounceClick deb
handle deb chan ev@FN.Modified{} = do
-- atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev)
case parseFileName (takeFileName (FN.eventPath ev)) of
Just ymd -> do
cacheInvalidate cache (chan, ymd)
debounceClick deb
Nothing -> return ()
handle _ chan ev@FN.Removed{} = do
-- atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev)
case parseFileName (takeFileName (FN.eventPath ev)) of
Just ymd -> do
cacheInvalidate cache (chan, ymd)
Nothing -> return ()
handle _ _ ev@FN.WatchedDirectoryRemoved{} = -- not sure what to do here...
atomicPrintS $ "Directory " ++ FN.eventPath ev ++ " removed!"
handle _ _ _ = undefined
forM_ toimport $ \chan@(Channel nw ch) -> do
deb <- makeDebounce (nw <> T.pack "/" <> ch) 5.0 (indexUpdateImport index chan)
_ <- FN.watchDir mgr (basedir </> T.unpack nw </> T.unpack ch) predicate (handle deb chan)
atomicPrintS $ "Watching " <> basedir </> T.unpack nw </> T.unpack ch
let loop = threadDelay (round (1e9 :: Double)) >> loop in loop
return index
makeCounts :: [Day] -> [(Day, Int)] -> [Int]
makeCounts [] [] = []
makeCounts [] _ = error "makeCounts: more entries than days in range"
makeCounts (d:ds) ents@((d',n):ents')
| d == d' = n : makeCounts ds ents'
| d < d' = 0 : makeCounts ds ents
| otherwise = error $ "makeCounts: duplicate entry? " ++ show (d, toGregorian d, d', toGregorian d')
makeCounts (_:ds) [] = 0 : makeCounts ds []
indexUpdateImport :: Index -> Channel -> IO ()
indexUpdateImport index@(Index _ mp _) chan = do
let ciRef = mp Map.! chan
ci <- readIORef ciRef
today <- utctDay <$> getCurrentTime
let todayish = max today (ciEndDay ci)
readCounts <- fmap catMaybes . forM [-1 .. 1] $ \increment -> do
let day = increment `addDays` todayish
ymd = dayToYMD day
dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci)
loadDay index chan ymd >>= \case
Just (_bs, lineStarts) -> return (Just (dayidx, VS.length lineStarts))
Nothing -> return Nothing
atomicPrint $ "Update import for " <> prettyChannel chan <> ": " <> T.show readCounts <> " (len = " <> T.show (IGV.length (ciCountUntil ci)) <> ")"
when (not (null readCounts)) $
atomicModifyIORef' ciRef $ \ci' ->
let ciRes = foldl' (\ci2 (dayidx, count) -> recordCount ci2 dayidx count) ci' readCounts
in (ciRes, ())
where
-- | How many events do we already have on this day?
eventsOnDayIdx :: ChanIndex -> Int -> Int
eventsOnDayIdx ci dayidx
| dayidx == 0 = scan IGV.! 0
| dayidx < IGV.length scan = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
| otherwise = 0
where scan = ciCountUntil ci
recordCount :: ChanIndex -> Int -> Int -> ChanIndex
recordCount ci dayidx count
| count <= alreadyHave = ci
| otherwise =
let ciExt = -- Ensure that there is space in the scan vector for our record
let nExtraDays = dayidx - IGV.length (ciCountUntil ci) + 1
lastCount | IGV.length (ciCountUntil ci) == 0 = 0
| otherwise = ciCountUntil ci IGV.! (IGV.length (ciCountUntil ci) - 1)
in ci { ciCountUntil = iterate (`IGV.append` lastCount) (ciCountUntil ci) !! nExtraDays }
addFrom di n vec
| di < IGV.length vec = addFrom (di + 1) n (IGV.set vec di (vec IGV.! di + n))
| otherwise = vec
in ciExt { ciCountUntil = addFrom dayidx (count - alreadyHave) (ciCountUntil ciExt)
, ciTotal = ciTotal ciExt + count - alreadyHave }
where alreadyHave = eventsOnDayIdx ci dayidx
-- search
-- | Returns proper lazy list of events. Reading the files happens strictly,
-- but parsing the events happens lazily.
indexGetEventsLinear :: Index -> Channel -> Int -> Int -> IO [(YMDHMS, EventID, Event)]
indexGetEventsLinear index@(Index _ mp _) chan from count = do
ci <- readIORef (mp Map.! chan)
if from + count < 0 || from >= ciTotal ci
then return []
else go ci (ciCountUntil ci)
where
go ci scan = do
let day1idx = binSearch scan from
day1 = day1idx `addDays'` ciStartDay ci
neventBeforeDay1 | day1idx == 0 = 0
| otherwise = scan IGV.! (day1idx - 1)
neventInclDay1 = scan IGV.! day1idx
neventOnDay1 = neventInclDay1 - neventBeforeDay1
off1 = from - neventBeforeDay1
-- day2 is inclusive, off2 is exclusive
(day2, off2)
| from + count <= neventInclDay1 = (day1, off1 + count)
| day1idx == IGV.length scan - 1 = (day1, neventOnDay1)
| otherwise =
let loop day2idx nbefore nseen
| nseen + nOnDay2 >= count =
(day2idx `addDays'` ciStartDay ci, count - nseen)
| day2idx == IGV.length scan - 1 =
(day2idx `addDays'` ciStartDay ci, nOnDay2)
| otherwise =
loop (day2idx + 1) (scan IGV.! day2idx) (nseen + nOnDay2)
where
nOnDay2 = scan IGV.! day2idx - nbefore
in loop (day1idx + 1) (neventBeforeDay1 + neventOnDay1) (neventOnDay1 - off1)
-- traceM ("ci = " ++ show ci)
-- traceM ("binSearch " ++ show from ++ " =")
-- traceM (" " ++ show day1idx)
-- traceM ("day1 = " ++ show day1)
-- traceM ("off1 = " ++ show off1)
-- traceM ("neventOnDay1 = " ++ show neventOnDay1)
-- traceM ("count = " ++ show count)
-- traceM ("day2 = " ++ show day2)
-- traceM ("off2 = " ++ show off2)
evs <- forM (zip [day1 .. day2] [day1idx..]) $ \(day, dayidx) ->
let neventsOnDay | dayidx == 0 = scan IGV.! 0
| otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
range
| day == day1 =
if day1 == day2
then (off1, Just (off2 - off1))
else (off1, Just (neventsOnDay - off1))
| day == day2 = (0, Just off2)
| otherwise = (0, Just neventsOnDay)
ymd = ymdFromGregorian (toGregorian day)
in if neventsOnDay > 0
then loadDay index chan ymd <&> \case
Just (bs, lineStarts) ->
let events = parseLogRange range lineStarts bs
in zipWith (\(hms, ev) off -> (YMDHMS ymd hms, genEventID (YMDHMS ymd hms) off, ev))
events [fst range ..]
Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file"
else return []
return (concat evs)
-- | The vector must be sorted.
-- Returns index of the first element x such that needle < x. If there is no
-- such element (i.e. needle is greater-equal the last element of vec), returns
-- the length of vec.
--
-- TODO: proportional binary search
binSearch :: IGV.ImmutGrowVector Int -> Int -> Int
binSearch vec needle
| veclen == 0 || vec IGV.! (veclen - 1) < needle = veclen
| needle < vec IGV.! 0 = 0
| otherwise = go 0 (veclen - 1)
where
veclen = IGV.length vec
-- Invariant: vec[lo] <= needle < vec[hi]
go :: Int -> Int -> Int
go lo hi
| lo + 1 == hi = hi
| otherwise =
let mid = lo + (hi - lo) `div` 2
in if vec IGV.! mid <= needle
then go mid hi else go lo mid
-- | If ID is found, returns:
-- - Timestamp of the event
-- - Index in the linear list of events for this channel
-- - Index in the list of events in the channel on that day
findEventIDLinear :: Index -> Channel -> EventID -> IO (Maybe (YMDHMS, Int, Int))
findEventIDLinear index@(Index _ mp _) chan eid = runMaybeT $ do
(ymdhms@(YMDHMS ymd hms), idoff) <- hoistMaybe (parseEventID eid)
day <- hoistMaybe (uncurry3 fromGregorianValid (ymdToGregorian ymd))
ci <- lift $ readIORef (mp Map.! chan)
guard (ciStartDay ci <= day && day <= ciEndDay ci)
(bs, lineStarts) <- MaybeT $ loadDay index chan ymd
let candidates =
map snd $
takeWhile ((== hms) . fst) $
dropWhile ((< hms) . fst) $
zip (parseLogTimesOnly lineStarts bs) [0..]
case candidates of
[] -> empty
_ -> do
let dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci)
eventsBeforeDay | dayidx == 0 = 0
| otherwise = ciCountUntil ci IGV.! (dayidx - 1)
let dayoff = minimumBy (comparing (\off -> abs (off - idoff))) candidates
return (ymdhms, eventsBeforeDay + dayoff, dayoff)
-- other methods
indexNumEvents :: Index -> Channel -> IO Int
indexNumEvents (Index _ mp _) chan = ciTotal <$> readIORef (mp Map.! chan)
indexCalendar :: Index -> Channel -> IO ((Day, Day), [Int])
indexCalendar (Index _ mp _) chan = do
ci <- readIORef (mp Map.! chan)
let scan = ciCountUntil ci
return ((ciStartDay ci, ciEndDay ci)
,[if i == 0
then scan IGV.! 0
else scan IGV.! i - scan IGV.! (i - 1)
| i <- [0 .. IGV.length scan - 1]])
indexGetEventsDay :: Index -> Channel -> Day -> IO [(HMS, EventID, Event)]
indexGetEventsDay index@(Index _ mp _) chan day = do
ci <- readIORef (mp Map.! chan)
let ymd = dayToYMD day
if day < ciStartDay ci || day > ciEndDay ci
then return []
else loadDay index chan ymd <&> \case
Just (bs, _lineStarts) ->
let events = parseLog bs
in zipWith (\(hms, ev) off -> (hms, genEventID (YMDHMS ymd hms) off, ev))
events [0..]
Nothing -> [] -- if the file doesn't exist, there ain't no events
-- utilities
loadDay :: Index -> Channel -> YMD -> IO (Maybe (ByteString, VS.Vector Word32))
loadDay (Index basedir _ cache) chan@(Channel network channel) ymd = do
cacheLookup cache (chan, ymd) >>= \case
Nothing -> do
mapFile (basedir </> T.unpack network </> T.unpack channel </> toFileName ymd) >>= \case
Just bs -> do
let lineStarts = preparseLog bs
cacheAdd cache (chan, ymd) (bs, lineStarts)
return (Just (bs, lineStarts))
Nothing -> return Nothing -- file didn't exist
Just (bs, lineStarts) -> return (Just (bs, lineStarts))
-- | Takes the index of the event in the day's log (the "offset") in addition
-- to the timestamp, in order to disambiguate in case there are multiple events
-- with the same timestamp.
--
-- >>> genEventID (YMDHMS (YMD 2026 4 7) (HMS 18 56 55)) 123
-- "a5d9CtBVX"
genEventID :: YMDHMS -> Int -> EventID
genEventID (YMDHMS (YMD y m d) (HMS hh mm ss)) off
-- An event ID is a mixed-radix number.
-- Components: [offset, year, month, day, hour, minute, second]
-- Radix: [ --, 5000, 12, 31, 24, 60, 60]
-- Maximal offset is determined by:
-- > ceiling (2 ** (64 - logBase 2 (5000 * 12 * 31 * 24 * 60 * 60)))
-- 114787088
-- to fit the ID number in a Word64.
-- Let's round that down conservatively to 100_000_000, i.e. 100 million events per day max.
--
-- The result number is encoded in base62, and an 'a' is prefixed as an ID version identifier.
| off >= 100_000_000 = error "Too many events per day"
| y >= 5000 = error "You should have better tech at this point"
| otherwise =
let cast :: Integral a => a -> Word64 ; cast = fromIntegral
num = (((((cast off * 5000 + cast y) * 12 + cast (m - 1)) * 31 + cast (d - 1))
* 24 + cast hh) * 60 + cast mm) * 60 + cast ss
in T.pack ('a' : showIntAtBase 62 base62char num "")
where
base62char = chr . fromIntegral . (base62alphabet `BS.index`)
base62alphabet = BS.pack (map (fromIntegral . ord) (['0'..'9'] ++ ['A'..'Z'] ++ ['a'..'z']))
-- >>> parseEventID "a5d9CtBVX"
-- Just (YMDHMS (YMD 2026 4 7) (HMS 18 56 55),123)
parseEventID :: EventID -> Maybe (YMDHMS, Int)
parseEventID (T.uncons -> Just ('a', eid)) = do
num <- multiply <$> mapM (fmap (fromIntegral @Int @Word64) . unbase62char) (T.unpack eid)
let (num2, ss) = num `quotRem` 60
let (num3, mm) = num2 `quotRem` 60
let (num4, hh) = num3 `quotRem` 24
let (num5, d') = num4 `quotRem` 31
let (num6, m') = num5 `quotRem` 12
let (off , y ) = num6 `quotRem` 5000
let cast :: Num b => Word64 -> b ; cast = fromIntegral
return (YMDHMS (YMD (cast y) (cast m' + 1) (cast d' + 1))
(HMS (cast hh) (cast mm) (cast ss))
,cast off)
where
multiply = sum . map (uncurry (*)) . zip (iterate (*62) 1) . reverse
unbase62char c
| '0' <= c, c <= '9' = Just (ord c - ord '0')
| 'A' <= c, c <= 'Z' = Just (ord c - ord 'A' + 10)
| 'a' <= c, c <= 'z' = Just (ord c - ord 'a' + 36)
| otherwise = Nothing
parseEventID _ = Nothing
parseFileName :: String -> Maybe YMD
parseFileName name
| (y, '-' : s1) <- splitAt 4 name
, (m, '-' : s2) <- splitAt 2 s1
, (d, ".log") <- splitAt 2 s2
, all isDigit y, all isDigit m, all isDigit d
, let y' = read' @Int y ; m' = read' @Int m ; d' = read' @Int d
, 1 <= m', m' <= 12
, 1 <= d', d' <= gregorianMonthLength (fromIntegral @Int @Integer y') m'
= Just (YMD y' (fromIntegral @Int @Word8 m') (fromIntegral @Int @Word8 d'))
| otherwise
= Nothing
where
read' :: Read a => String -> a
read' s = case readMaybe s of
Just r -> r
Nothing -> error $ "No read: " ++ show s
toFileName :: YMD -> String
toFileName ymd = ymdToString ymd ++ ".log"
addDays' :: Int -> Day -> Day
addDays' = addDays . fromIntegral
|