#!/usr/bin/env python3 import re, sys, os, calendar, time if len(sys.argv) != 3 or sys.argv[1] == "-h" or sys.argv[1] == "--help": print(f"Usage: {sys.argv[0]} ") print("The is expected to contain yy.mm.dd files in clog format (tunes.org/~nef/logs).") print("These will be converted from US/Pacific (i.e. PST/PDT) to UTC and to ZNC log format; the") print("output is written to . The is not changed.") if len(sys.argv) != 2: sys.exit(1) else: sys.exit(0) logdir = sys.argv[1] outdir = sys.argv[2] os.mkdir(outdir) class Logger: def __init__(self, destdir): self.destdir = destdir self.curdate = None # (Y, m, d) self.events = [] # [((H, M, S), text)] def addevent(self, ymd, hms, text): if self.curdate is not None and self.curdate != ymd: self.writeout() if self.curdate is None: self.curdate = ymd self.events = [(hms, text)] else: assert self.curdate == ymd self.events.append((hms, text)) def writeout(self): (Y, m, d) = self.curdate with open(os.path.join(self.destdir, f"{Y:04}-{m:02}-{d:02}.log"), "wb") as f: for (H, M, S), text in self.events: f.write(f"[{H:02}:{M:02}:{S:02}] ".encode("ascii") + text + b"\n") self.curdate = None self.events = [] # takes line from the '---' bit, i.e. after the HMS time # returns None if this is a meta-marker def convert_clog_to_znc(fname, line): m = re.match(b"^--- log: (started|ended|stopped) haskell/[0-9.]{8}$", line) if m is not None: return None m = re.match(b"^--- topic: '.*", line) if m is not None: return None m = re.match(b"^--- topic: set by .*", line) if m is not None: return None m = re.match(b"^--- names: list \\(.*", line) if m is not None: return None m = re.match(b"^--- join: ([^ ]*) \\(([^)]*)\\) joined #haskell$", line) if m is not None: return b"*** Joins: " + m[1] + b" (" + m[2] + b")" m = re.match(b"^--- join: ([^ ]*) joined #haskell$", line) # join without host if m is not None: return b"*** Joins: " + m[1] + b" ()" m = re.match(b"^--- quit: ([^ ]*) \\((.*)\\)$", line) if m is not None: return b"*** Quits: " + m[1] + b" () (" + m[2] + b")" m = re.match(b"^--- part: ([^ ]*) left #haskell$", line) if m is not None: return b"*** Parts: " + m[1] + b" () ()" m = re.match(b"^--- topic: set to '(.*)' by ([^ ]*)$", line) if m is not None: return b"*** " + m[2] + b" changes topic to '" + m[1] + b"'" m = re.match(b"^--- nick: ([^ ]*) -> ([^ ]*)$", line) if m is not None: return b"*** " + m[1] + b" is now known as " + m[2] m = re.match(b"^--- mode: ([^ ]*) set mode: (.*)$", line) if m is not None: return b"*** " + m[1] + b" sets mode: " + m[2] m = re.match(b"^--- mode: ([^ ]*) set ([+-].*)$", line) if m is not None: return b"*** " + m[1] + b" sets mode: " + m[2] m = re.match(b"^--- kick: ([^ ]*) was kicked by ([^ ]*) \\((.*)\\)$", line) if m is not None: return b"*** " + m[0] + b" was kicked by " + m[1] + b" (" + m[2] + b")" m = re.match(b"^<([^>]*)> (.*)", line) if m is not None: return line m = re.match(b"^-([^(]*)\\(([^)]*)\\)- (.*)", line) # notice if m is not None: return b"-" + m[0] + b"- " + m[2] m = re.match(b"^\\* ([^ ]*) (.*)", line) if m is not None: return b"* " + m[1] + b" " + m[2] m = re.match(b"^\\* ([^ ]*)$", line) # empty action if m is not None: return b"* " + m[1] + b" " print(f"Cannot parse: {line!r} ({fname})") return b"#parseerror " + repr(line).encode("utf-8") # returned text is None if this is no event but a meta-marker (log started or ended) def parse_line(fname, line): space = line.find(b" ") if space != 8: print(f"Unparseable time: {line!r} ({fname})") return ((0, 0, 0), b"#parseerror " + repr(line).encode("utf-8")) timestr = line[:8].decode("ascii") try: (H, M, S) = re.match(r"([0-9]{2}):([0-9]{2}):([0-9]{2})$", timestr).groups() except Exception as e: raise Exception(f"Could not parse time marker on line: {line!r}") text = convert_clog_to_znc(fname, line[9:]) return ((int(H), int(M), int(S)), text) def last_sunday_in(Y, m): date = calendar.monthrange(Y, m)[1] # number of days in month, i.e. last date while calendar.weekday(Y, m, date) != calendar.SUNDAY: date -= 1 return date def nth_sunday_in(n, Y, m): if n == -1: return last_sunday_in(Y, m) else: assert n >= 1 date = 1 while calendar.weekday(Y, m, date) != calendar.SUNDAY: date += 1 return date + (n - 1) * 7 assert last_sunday_in(2026, 3) == 29 assert last_sunday_in(2026, 10) == 25 assert nth_sunday_in(2, 2026, 3) == 8 assert nth_sunday_in(1, 2026, 10) == 4 def convert_logfile(filename, logger): try: (Y, m, d) = re.match(r".*/([0-9]{2}).([0-9]{2}).([0-9]{2})$", filename).groups() Y = 2000 + int(Y) m = int(m) d = int(d) except Exception as e: raise Exception(f"Could not parse log file name {filename!r}") if Y <= 2006: # DST starts on the first Sunday in April at 02:00 PST enterdst = (4, nth_sunday_in(1, Y, 4)) # DST ends on the last Sunday in October at 02:00 PDT, which isn't reached # and instead we continue from 01:00 PST leavedst = (10, last_sunday_in(Y, 10)) else: # DST starts on the second Sunday in March at 02:00 PST enterdst = (3, nth_sunday_in(2, Y, 3)) # DST ends on the first Sunday in October at 02:00 PDT, which isn't reached # and instead we continue from 01:00 PST leavedst = (11, nth_sunday_in(1, Y, 11)) with open(filename, "rb") as f: prevHMS = None # logged time prevYmdHMS2 = None # UTC time switchedToWinter = None # only ever used on 'leavedst' day for line in f: if line[-1] == b"\n"[0]: line = line[:-1] if line[-1] == b"\r"[0]: line = line[:-1] (H, M, S), text = parse_line(filename, line) if text is None: continue # meta-marker if (m, d) < enterdst or \ ((m, d) == enterdst and (H, M, S) < (2, 0, 0)) or \ ((m, d) == leavedst and (H, M, S) >= (2, 0, 0)) or \ (m, d) > leavedst: # PST = UTC-8; add because interpreting a PST time as UTC # leaves you 8 hours in the past of what it should be timestamp = calendar.timegm((Y, m, d, H, M, S)) + 8*3600 elif ((m, d) == enterdst and (H, M, S) >= (3, 0, 0)) or \ ((m, d) > enterdst and (m, d) < leavedst) or \ ((m, d) == leavedst and (H, M, S) < (1, 0, 0)): # PDT = UTC-7 timestamp = calendar.timegm((Y, m, d, H, M, S)) + 7*3600 elif (m, d) == enterdst and (2, 0, 0) <= (H, M, S) < (3, 0, 0): raise Exception(f"Invalid time in limbo between PST and PDT: {filename!r}: {line!r}") elif (m, d) == leavedst and (1, 0, 0) <= (H, M, S) < (2, 0, 0): # ambiguous interval: either before or after the PDT->PST switch if switchedToWinter is None: switchedToWinter = False # mark that we entered here if prevHMS is not None and (H, M, S) < prevHMS: # time warp means that's the switch switchedToWinter = True if not switchedToWinter: timestamp = calendar.timegm((Y, m, d, H, M, S)) + 7*3600 # PDT else: timestamp = calendar.timegm((Y, m, d, H, M, S)) + 8*3600 # PST Y2, m2, d2, H2, M2, S2, *_ = time.gmtime(timestamp) if prevYmdHMS2 is not None and ((Y2, m2, d2), (H2, M2, S2)) < prevYmdHMS2: print(f"Time reversal: {line!r} ({filename})") logger.addevent(*prevYmdHMS2, b"#non-monotonic-time " + line) else: logger.addevent((Y2, m2, d2), (H2, M2, S2), text) prevYmdHMS2 = ((Y2, m2, d2), (H2, M2, S2)) prevHMS = (H, M, S) if switchedToWinter == False: print(f"{filename!r}: ambiguous PDT->PST switch") print("Converting") logger = Logger(outdir) for filename in sorted(os.listdir(logdir)): convert_logfile(os.path.join(logdir, filename), logger) logger.writeout() # print("Checking") # def enumerate_channel(chandir): # events = [] # for filename in sorted(os.listdir(chandir)): # with open(os.path.join(chandir, filename), "rb") as f: # lines = f.read().split(b"\n") # if lines[-1] == b"": lines = lines[:-1] # def compute_stamp(line): # timepart = line[:10].decode("ascii") # Y, m, d, H, M, S, *_ = re.match(r"^(....)-(..)-(..)\.log\|\[(..):(..):(..)\]$", filename + "|" + timepart).groups() # return calendar.timegm((int(Y), int(m), int(d), int(H), int(M), int(S))) # events += [(compute_stamp(line), line[line.index(b" "[0])+1:]) # for line in lines] # return events # for network in os.listdir(logdir): # for channel in os.listdir(os.path.join(logdir, network)): # src_events = enumerate_channel(os.path.join(logdir, network, channel)) # out_events = enumerate_channel(os.path.join(outdir, network, channel)) # assert len(src_events) == len(out_events) # assert [ev[1] for ev in src_events] == [ev[1] for ev in out_events] # out_prev_stamp = 0 # for i in range(len(src_events)): # assert abs(src_events[i][0] - out_events[i][0]) in [3600, 7200] # if out_events[i][0] < out_prev_stamp: # print(src_events[i]) # print(out_events[i]) # sys.exit(1) # out_prev_stamp = out_events[i][0] # print("OK")