summaryrefslogtreecommitdiff
path: root/src/Index.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Index.hs')
-rw-r--r--src/Index.hs290
1 files changed, 202 insertions, 88 deletions
diff --git a/src/Index.hs b/src/Index.hs
index 5486692..8c605d3 100644
--- a/src/Index.hs
+++ b/src/Index.hs
@@ -1,3 +1,4 @@
+{-# LANGUAGE OverloadedStrings #-}
module Index (
Index,
initIndex,
@@ -7,24 +8,33 @@ module Index (
indexCalendar,
) where
-import Data.Time.Calendar
-import Control.Monad (forM)
+import Prelude hiding (foldl') -- exported since GHC 9.10 (base 4.20)
+
+import Control.Concurrent
+import Control.Monad (forM, forM_, when)
import Data.ByteString qualified as BS
import Data.ByteString (ByteString)
import Data.Char (isDigit)
-import Data.List (sort, scanl')
+import Data.IORef
+import Data.List (sort, scanl', foldl')
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
+import Data.Maybe (catMaybes)
import Data.Text qualified as T
+import Data.Time
import Data.Vector.Storable qualified as VS
-import Data.Word (Word32)
+import Data.Word
import System.Clock qualified as Clock
import System.Directory
import System.FilePath
+import System.FSNotify qualified as FN
import Text.Read (readMaybe)
+import AtomicPrint
+import Debounce
import Cache
-import Config (Channel(..))
+import Config (Channel(..), prettyChannel)
+import ImmutGrowVector qualified as IGV
import Mmap
import Util
import ZNC
@@ -32,13 +42,26 @@ import ZNC
data ChanIndex = ChanIndex
{ ciStartDay :: !Day
- , ciEndDay :: !Day -- ^ inclusive
- , ciCountUntil :: !(VS.Vector Int) -- ^ number of events up to and /including/ this day
+ , ciCountUntil :: !(IGV.ImmutGrowVector Int) -- ^ number of events up to and /including/ this day
, ciTotal :: !Int }
deriving (Show)
+-- | Inclusive.
+ciEndDay :: ChanIndex -> Day
+ciEndDay ci =
+ let ndays = IGV.length (ciCountUntil ci)
+ in if ndays > 0
+ then (ndays - 1) `addDays'` ciStartDay ci
+ else error "ciEndDay: ChanIndex with zero days!"
+
+-- 1. The ChanIndex is contained in an IORef, which means that we can't have a
+-- proper critical section around modifying it. This is fine because it's
+-- all pure, so we just allocate a new ChanIndex each time.
+-- 2. There being no critical section also means that having multiple writers
+-- would be problematic. Fortunately, there is only one writer, so this
+-- simple data structure is fine.
data Index = Index !FilePath
- !(Map Channel ChanIndex)
+ !(Map Channel (IORef ChanIndex))
!(Cache (Channel, YMD) (ByteString, VS.Vector Word32))
-- init
@@ -49,33 +72,68 @@ initIndex basedir toimport = do
c_start <- Clock.getTime Clock.Realtime
items <-
- fmap concat . forM (map chanNetwork toimport) $ \nwT -> do
+ forM toimport $ \(Channel nwT chT) -> do
let nw = T.unpack nwT
- forM [ch | Channel nwT' ch <- toimport, nwT' == nwT] $ \chT -> do
- let ch = T.unpack chT
- files <- listDirectory (basedir </> nw </> ch)
- days <- fmap sort . forM files $ \fn -> do
- let path = basedir </> nw </> ch </> fn
- -- putStrLn $ "Parsing " ++ path ++ " (" ++ show (parseFileName fn) ++ " -> " ++ show (dateToDay (parseFileName fn)) ++ ")"
- events <- parseLog <$> BS.readFile path
- return (uncurry3 fromGregorian (parseFileName fn), length events)
- let minday = minimum (map fst days)
- maxday = maximum (map fst days)
- ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1)
- -- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days]
- let countScan = VS.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days))
- let ntotal = sum (map snd days)
- return (Channel nwT chT
- ,ChanIndex
- { ciStartDay = minday
- , ciEndDay = maxday
+ ch = T.unpack chT
+ files <- listDirectory (basedir </> nw </> ch)
+ days <- fmap sort . forM files $ \fn -> do
+ let path = basedir </> nw </> ch </> fn
+ date = case parseFileName fn of
+ Just ymd -> ymdToGregorian ymd
+ Nothing -> error $ "Log file with unexpected file name: " ++ path
+ -- atomicPrintS $ "Parsing " ++ path ++ " (" ++ show date ++ " -> " ++ show (dateToDay date) ++ ")"
+ events <- parseLog <$> BS.readFile path
+ return (uncurry3 fromGregorian date, length events)
+ let minday = minimum (map fst days)
+ maxday = maximum (map fst days)
+ ndays = fromIntegral @Integer @Int (diffDays maxday minday + 1)
+ -- traceM $ nw ++ "/" ++ ch ++ ": days = " ++ show [(toFileName (dayToYMD d), i) | (d, i) <- days]
+ let countScan = IGV.fromListN ndays (drop 1 $ scanl' (+) 0 (makeCounts [minday..maxday] days))
+ ntotal = sum (map snd days)
+ chanindex <- newIORef $
+ ChanIndex { ciStartDay = minday
, ciCountUntil = countScan
- , ciTotal = ntotal})
+ , ciTotal = ntotal }
+ return (Channel nwT chT, chanindex)
c_end <- Clock.getTime Clock.Realtime
let timetakenSecs = fromIntegral @_ @Double (Clock.toNanoSecs (Clock.diffTimeSpec c_start c_end)) / 1e9
- putStrLn $ "Parsing/indexing logs in " ++ show basedir ++ " took " ++ show timetakenSecs ++ " secs"
+ atomicPrintS $ "Parsing/indexing logs in " ++ show basedir ++ " took " ++ show timetakenSecs ++ " secs"
+
+ let index = Index basedir (Map.fromList items) cache
+
+ _ <- forkIO $ do
+ FN.withManager $ \mgr -> do
+ let predicate FN.Added{} = True
+ predicate FN.Modified{} = True
+ predicate FN.Removed{} = True
+ predicate FN.WatchedDirectoryRemoved{} = True
+ predicate _ = False
+
+ handle deb _ FN.Added{} = debounceClick deb
+ handle deb chan ev@FN.Modified{} = do
+ atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev)
+ case parseFileName (takeFileName (FN.eventPath ev)) of
+ Just ymd -> do
+ cacheInvalidate cache (chan, ymd)
+ debounceClick deb
+ Nothing -> return ()
+ handle _ chan ev@FN.Removed{} = do
+ atomicPrintS $ "path = " ++ show (FN.eventPath ev) ++ " fn = " ++ takeFileName (FN.eventPath ev)
+ case parseFileName (takeFileName (FN.eventPath ev)) of
+ Just ymd -> do
+ cacheInvalidate cache (chan, ymd)
+ Nothing -> return ()
+ handle _ _ ev@FN.WatchedDirectoryRemoved{} = -- not sure what to do here...
+ atomicPrintS $ "Directory " ++ FN.eventPath ev ++ " removed!"
+ handle _ _ _ = undefined
- return (Index basedir (Map.fromList items) cache)
+ forM_ toimport $ \chan@(Channel nw ch) -> do
+ deb <- makeDebounce (nw <> T.pack "/" <> ch) 5.0 (indexUpdateImport index chan)
+ _ <- FN.watchDir mgr (basedir </> T.unpack nw </> T.unpack ch) predicate (handle deb chan)
+ atomicPrintS $ "Watching " <> basedir </> T.unpack nw </> T.unpack ch
+ let loop = threadDelay (round (1e9 :: Double)) >> loop in loop
+
+ return index
makeCounts :: [Day] -> [(Day, Int)] -> [Int]
makeCounts [] [] = []
@@ -86,37 +144,86 @@ makeCounts (d:ds) ents@((d',n):ents')
| otherwise = error $ "makeCounts: duplicate entry? " ++ show (d, toGregorian d, d', toGregorian d')
makeCounts (_:ds) [] = 0 : makeCounts ds []
+indexUpdateImport :: Index -> Channel -> IO ()
+indexUpdateImport index@(Index _ mp _) chan = do
+ let ciRef = mp Map.! chan
+ ci <- readIORef ciRef
+ today <- utctDay <$> getCurrentTime
+ let todayish = max today (ciEndDay ci)
+
+ readCounts <- fmap catMaybes . forM [-1 .. 1] $ \increment -> do
+ let day = increment `addDays` todayish
+ ymd = dayToYMD day
+ dayidx = fromIntegral @Integer @Int (day `diffDays` ciStartDay ci)
+
+ loadDay index chan ymd >>= \case
+ Just (_bs, lineStarts) -> return (Just (dayidx, VS.length lineStarts))
+ Nothing -> return Nothing
+
+ atomicPrint $ "Update import for " <> prettyChannel chan <> ": " <> T.show readCounts <> " (len = " <> T.show (IGV.length (ciCountUntil ci)) <> ")"
+
+ when (not (null readCounts)) $
+ atomicModifyIORef' ciRef $ \ci' ->
+ let ciRes = foldl' (\ci2 (dayidx, count) -> recordCount ci2 dayidx count) ci' readCounts
+ in (ciRes, ())
+ where
+ -- | How many events do we already have on this day?
+ eventsOnDayIdx :: ChanIndex -> Int -> Int
+ eventsOnDayIdx ci dayidx
+ | dayidx == 0 = scan IGV.! 0
+ | dayidx < IGV.length scan = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
+ | otherwise = 0
+ where scan = ciCountUntil ci
+
+ recordCount :: ChanIndex -> Int -> Int -> ChanIndex
+ recordCount ci dayidx count
+ | count <= alreadyHave = ci
+ | otherwise =
+ let ciExt = -- Ensure that there is space in the scan vector for our record
+ let nExtraDays = dayidx - IGV.length (ciCountUntil ci) + 1
+ lastCount | IGV.length (ciCountUntil ci) == 0 = 0
+ | otherwise = ciCountUntil ci IGV.! (IGV.length (ciCountUntil ci) - 1)
+ in ci { ciCountUntil = iterate (`IGV.append` lastCount) (ciCountUntil ci) !! nExtraDays }
+ addFrom di n vec
+ | di < IGV.length vec = addFrom (di + 1) n (IGV.set vec di (vec IGV.! di + n))
+ | otherwise = vec
+ in ciExt { ciCountUntil = addFrom dayidx (count - alreadyHave) (ciCountUntil ciExt)
+ , ciTotal = ciTotal ciExt + count - alreadyHave }
+ where alreadyHave = eventsOnDayIdx ci dayidx
+
-- search
-- | Returns proper lazy list of events. Reading the files happens strictly,
-- but parsing the events happens lazily.
indexGetEventsLinear :: Index -> Channel -> Int -> Int -> IO [(YMDHMS, Event)]
-indexGetEventsLinear index@(Index _ mp _) chan from count
- | from + count < 0 = return []
- | from >= ciTotal ci = return []
- | otherwise = do
- let scan = ciCountUntil ci
- day1idx = binSearch scan from
+indexGetEventsLinear index@(Index _ mp _) chan from count = do
+ ci <- readIORef (mp Map.! chan)
+ if from + count < 0 || from >= ciTotal ci
+ then return []
+ else go ci (ciCountUntil ci)
+ where
+ go ci scan = do
+ let day1idx = binSearch scan from
day1 = day1idx `addDays'` ciStartDay ci
neventBeforeDay1 | day1idx == 0 = 0
- | otherwise = scan VS.! (day1idx - 1)
- neventInclDay1 = scan VS.! day1idx
+ | otherwise = scan IGV.! (day1idx - 1)
+ neventInclDay1 = scan IGV.! day1idx
neventOnDay1 = neventInclDay1 - neventBeforeDay1
off1 = from - neventBeforeDay1
-- day2 is inclusive, off2 is exclusive
(day2, off2)
| from + count <= neventInclDay1 = (day1, off1 + count)
- | day1idx == VS.length scan - 1 = (day1, neventOnDay1)
+ | day1idx == IGV.length scan - 1 = (day1, neventOnDay1)
| otherwise =
let loop day2idx nbefore nseen
| nseen + nOnDay2 >= count =
(day2idx `addDays'` ciStartDay ci, count - nseen)
- | day2idx == VS.length scan - 1 =
+ | day2idx == IGV.length scan - 1 =
(day2idx `addDays'` ciStartDay ci, nOnDay2)
| otherwise =
- loop (day2idx + 1) (scan VS.! day2idx) (nseen + nOnDay2)
+ loop (day2idx + 1) (scan IGV.! day2idx) (nseen + nOnDay2)
where
- nOnDay2 = scan VS.! day2idx - nbefore
+ nOnDay2 = scan IGV.! day2idx - nbefore
in loop (day1idx + 1) (neventBeforeDay1 + neventOnDay1) (neventOnDay1 - off1)
-- traceM ("ci = " ++ show ci)
@@ -129,26 +236,24 @@ indexGetEventsLinear index@(Index _ mp _) chan from count
-- traceM ("day2 = " ++ show day2)
-- traceM ("off2 = " ++ show off2)
evs <- forM (zip [day1 .. day2] [day1idx..]) $ \(day, dayidx) ->
- let neventsOnDay | dayidx == 0 = scan VS.! 0
- | otherwise = scan VS.! dayidx - scan VS.! (dayidx - 1)
- parse
+ let neventsOnDay | dayidx == 0 = scan IGV.! 0
+ | otherwise = scan IGV.! dayidx - scan IGV.! (dayidx - 1)
+ range
| day == day1 =
if day1 == day2
- then parseLogRange (off1, Just (off2 - off1))
- else parseLogRange (off1, Nothing)
- | day == day2 = parseLogRange (0, Just off2)
- | otherwise = \_lineStarts -> parseLog
- (y, month, d) = toGregorian day
- ymd = YMD (fromIntegral y) (fromIntegral month) (fromIntegral d)
+ then (off1, Just (off2 - off1))
+ else (off1, Just (neventsOnDay - off1))
+ | day == day2 = (0, Just off2)
+ | otherwise = (0, Just neventsOnDay)
+ ymd = ymdFromGregorian (toGregorian day)
fixDate = map $ \(tod, ev) -> (YMDHMS ymd tod, ev)
in if neventsOnDay > 0
- then do (bs, lineStarts) <- loadDay index chan ymd
- return (fixDate (parse lineStarts bs))
+ then loadDay index chan ymd >>= \case
+ Just (bs, lineStarts) -> return (fixDate (parseLogRange range lineStarts bs))
+ Nothing -> error $ "events on day " ++ show (dayToYMD day) ++ " but no file"
else return []
return (concat evs)
- where
- ci = mp Map.! chan
-- | The vector must be sorted.
-- Returns index of the first element x such that needle < x. If there is no
@@ -156,13 +261,13 @@ indexGetEventsLinear index@(Index _ mp _) chan from count
-- the length of vec.
--
-- TODO: proportional binary search
-binSearch :: VS.Vector Int -> Int -> Int
+binSearch :: IGV.ImmutGrowVector Int -> Int -> Int
binSearch vec needle
- | veclen == 0 || vec VS.! (veclen - 1) < needle = veclen
- | needle < vec VS.! 0 = 0
+ | veclen == 0 || vec IGV.! (veclen - 1) < needle = veclen
+ | needle < vec IGV.! 0 = 0
| otherwise = go 0 (veclen - 1)
where
- veclen = VS.length vec
+ veclen = IGV.length vec
-- Invariant: vec[lo] <= needle < vec[hi]
go :: Int -> Int -> Int
@@ -170,62 +275,71 @@ binSearch vec needle
| lo + 1 == hi = hi
| otherwise =
let mid = lo + (hi - lo) `div` 2
- in if vec VS.! mid <= needle
+ in if vec IGV.! mid <= needle
then go mid hi else go lo mid
-- other methods
-indexNumEvents :: Index -> Channel -> Int
-indexNumEvents (Index _ mp _) chan = ciTotal (mp Map.! chan)
+indexNumEvents :: Index -> Channel -> IO Int
+indexNumEvents (Index _ mp _) chan = ciTotal <$> readIORef (mp Map.! chan)
-indexCalendar :: Index -> Channel -> ((Day, Day), [Int])
-indexCalendar (Index _ mp _) chan =
- let ci = mp Map.! chan
- in ((ciStartDay ci, ciEndDay ci)
- ,[if i == 0
- then ciCountUntil ci VS.! 0
- else ciCountUntil ci VS.! i - ciCountUntil ci VS.! (i - 1)
- | i <- [0 .. VS.length (ciCountUntil ci) - 1]])
+indexCalendar :: Index -> Channel -> IO ((Day, Day), [Int])
+indexCalendar (Index _ mp _) chan = do
+ ci <- readIORef (mp Map.! chan)
+ let scan = ciCountUntil ci
+ return ((ciStartDay ci, ciEndDay ci)
+ ,[if i == 0
+ then scan IGV.! 0
+ else scan IGV.! i - scan IGV.! (i - 1)
+ | i <- [0 .. IGV.length scan - 1]])
indexGetEventsDay :: Index -> Channel -> Day -> IO [(HMS, Event)]
-indexGetEventsDay index@(Index _ mp _) chan day
- | day < ciStartDay ci || day > ciEndDay ci = return []
- | otherwise = do
- (bs, _lineStarts) <- loadDay index chan (dayToYMD day)
- return (parseLog bs)
- where
- ci = mp Map.! chan
+indexGetEventsDay index@(Index _ mp _) chan day = do
+ ci <- readIORef (mp Map.! chan)
+ if day < ciStartDay ci || day > ciEndDay ci
+ then return []
+ else loadDay index chan (dayToYMD day) >>= \case
+ Just (bs, _lineStarts) -> return (parseLog bs)
+ Nothing -> return [] -- if the file doesn't exist, there ain't no events
-- utilities
-loadDay :: Index -> Channel -> YMD -> IO (ByteString, VS.Vector Word32)
+loadDay :: Index -> Channel -> YMD -> IO (Maybe (ByteString, VS.Vector Word32))
loadDay (Index basedir _ cache) chan@(Channel network channel) ymd = do
cacheLookup cache (chan, ymd) >>= \case
Nothing -> do
- bs <- mapFile (basedir </> T.unpack network </> T.unpack channel </> toFileName ymd)
- let lineStarts = preparseLog bs
- cacheAdd cache (chan, ymd) (bs, lineStarts)
- return (bs, lineStarts)
- Just (bs, lineStarts) -> return (bs, lineStarts)
+ mapFile (basedir </> T.unpack network </> T.unpack channel </> toFileName ymd) >>= \case
+ Just bs -> do
+ let lineStarts = preparseLog bs
+ cacheAdd cache (chan, ymd) (bs, lineStarts)
+ return (Just (bs, lineStarts))
+ Nothing -> return Nothing -- file didn't exist
+ Just (bs, lineStarts) -> return (Just (bs, lineStarts))
-parseFileName :: String -> (Year, MonthOfYear, DayOfMonth)
+parseFileName :: String -> Maybe YMD
parseFileName name
| (y, '-' : s1) <- splitAt 4 name
, (m, '-' : s2) <- splitAt 2 s1
, (d, ".log") <- splitAt 2 s2
, all isDigit y, all isDigit m, all isDigit d
- , let y' = read' y ; m' = read' m ; d' = read' d
+ , let y' = read' @Int y ; m' = read' @Int m ; d' = read' @Int d
, 1 <= m', m' <= 12
- , 1 <= d', d' <= gregorianMonthLength y' m'
- = (y', m', d')
+ , 1 <= d', d' <= gregorianMonthLength (fromIntegral @Int @Integer y') m'
+ = Just (YMD y' (fromIntegral @Int @Word8 m') (fromIntegral @Int @Word8 d'))
| otherwise
- = error $ "Invalid ZNC log file name: " ++ name
+ = Nothing
where
read' :: Read a => String -> a
read' s = case readMaybe s of
Just r -> r
Nothing -> error $ "No read: " ++ show s
+ymdToGregorian :: YMD -> (Year, MonthOfYear, DayOfMonth)
+ymdToGregorian (YMD y m d) = (fromIntegral y, fromIntegral m, fromIntegral d)
+
+ymdFromGregorian :: (Year, MonthOfYear, DayOfMonth) -> YMD
+ymdFromGregorian (y, m, d) = YMD (fromIntegral y) (fromIntegral m) (fromIntegral d)
+
toFileName :: YMD -> String
toFileName ymd = ymdToString ymd ++ ".log"