#!/usr/bin/env python3 import re, sys, os, calendar, time if len(sys.argv) != 3 or sys.argv[1] == "-h" or sys.argv[1] == "--help": print(f"Usage: {sys.argv[0]} ") print("The is expected to contain network/#channel/YYYY-mm-dd.log files.") print("These will be converted from Europe/Amsterdam (i.e. CET/CEST) to UTC; the") print("output is written to . The is not changed.") if len(sys.argv) != 2: sys.exit(1) else: sys.exit(0) logdir = sys.argv[1] outdir = sys.argv[2] os.mkdir(outdir) class Logger: def __init__(self, destdir): self.destdir = destdir self.curdate = None # (Y, m, d) self.events = [] # [((H, M, S), text)] def addevent(self, ymd, hms, text): if self.curdate is not None and self.curdate != ymd: self.writeout() if self.curdate is None: self.curdate = ymd self.events = [(hms, text)] else: assert self.curdate == ymd self.events.append((hms, text)) def writeout(self): (Y, m, d) = self.curdate with open(os.path.join(self.destdir, f"{Y:04}-{m:02}-{d:02}.log"), "wb") as f: for (H, M, S), text in self.events: f.write(f"[{H:02}:{M:02}:{S:02}] ".encode("ascii") + text + b"\n") self.curdate = None self.events = [] def parse_line(line): if line[-1] == b"\n"[0]: line = line[:-1] space = line.find(b" ") if space != 10: raise Exception(f"No space found in right spot on line: {line!r}") timestr = line[:10].decode("ascii") try: (H, M, S) = re.match(r"\[([0-9]{2}):([0-9]{2}):([0-9]{2})\]$", timestr).groups() except Exception as e: raise Exception(f"Could not parse time marker on line: {line!r}") text = line[11:] return ((int(H), int(M), int(S)), text) def last_sunday_in(Y, m): date = calendar.monthrange(Y, m)[1] # number of days in month, i.e. last date while calendar.weekday(Y, m, date) != calendar.SUNDAY: date -= 1 return date assert last_sunday_in(2026, 3) == 29 assert last_sunday_in(2026, 10) == 25 def convert_logfile(filename, logger): try: (Y, m, d) = re.match(r".*/([0-9]{4})-([0-9]{2})-([0-9]{2})\.log$", filename).groups() Y = int(Y) m = int(m) d = int(d) except Exception as e: raise Exception(f"Could not parse log file name {filename!r}") # DST starts on the last Sunday in March at 02:00 CET enterdst = (3, last_sunday_in(Y, 3)) # DST ends on the last Sunday in October at 03:00 CEST, which isn't reached # and instead we continue from 02:00 CET leavedst = (10, last_sunday_in(Y, 10)) with open(filename, "rb") as f: prevHMS = None switchedToCET = None # only ever used on 'leavedst' day for line in f: (H, M, S), text = parse_line(line) if (m, d) < enterdst or \ ((m, d) == enterdst and (H, M, S) < (2, 0, 0)) or \ ((m, d) == leavedst and (H, M, S) >= (3, 0, 0)) or \ (m, d) > leavedst: # CET = UTC+1; subtract because interpreting a CET time as UTC # leaves you an hour too far in the future timestamp = calendar.timegm((Y, m, d, H, M, S)) - 3600 elif ((m, d) == enterdst and (H, M, S) >= (3, 0, 0)) or \ ((m, d) > enterdst and (m, d) < leavedst) or \ ((m, d) == leavedst and (H, M, S) < (2, 0, 0)): # CEST = UTC+2 timestamp = calendar.timegm((Y, m, d, H, M, S)) - 2*3600 elif (m, d) == enterdst and (2, 0, 0) <= (H, M, S) < (3, 0, 0): raise f"Invalid time in limbo between CET and CEST: {f!r}: {line!r}" elif (m, d) == leavedst and (2, 0, 0) <= (H, M, S) < (3, 0, 0): # ambiguous interval: either before or after the CEST->CET switch if switchedToCET is None: switchedToCET = False # mark that we entered here if (H, M, S) < prevHMS: switchedToCET = True # time warp means that's the switch if not switchedToCET: timestamp = calendar.timegm((Y, m, d, H, M, S)) - 2*3600 else: timestamp = calendar.timegm((Y, m, d, H, M, S)) - 3600 Y2, m2, d2, H2, M2, S2, *_ = time.gmtime(timestamp) logger.addevent((Y2, m2, d2), (H2, M2, S2), text) prevHMS = (H, M, S) if switchedToCET == False: print(f"{filename!r}: ambiguous CEST->CET switch") print("Converting") for network in os.listdir(logdir): os.mkdir(os.path.join(outdir, network)) for channel in os.listdir(os.path.join(logdir, network)): os.mkdir(os.path.join(outdir, network, channel)) logger = Logger(os.path.join(outdir, network, channel)) for filename in sorted(os.listdir(os.path.join(logdir, network, channel))): convert_logfile(os.path.join(logdir, network, channel, filename), logger) logger.writeout() print("Checking") def enumerate_channel(chandir): events = [] for filename in sorted(os.listdir(chandir)): with open(os.path.join(chandir, filename), "rb") as f: lines = f.read().split(b"\n") if lines[-1] == b"": lines = lines[:-1] def compute_stamp(line): timepart = line[:10].decode("ascii") Y, m, d, H, M, S, *_ = re.match(r"^(....)-(..)-(..)\.log\|\[(..):(..):(..)\]$", filename + "|" + timepart).groups() return calendar.timegm((int(Y), int(m), int(d), int(H), int(M), int(S))) events += [(compute_stamp(line), line[line.index(b" "[0])+1:]) for line in lines] return events for network in os.listdir(logdir): for channel in os.listdir(os.path.join(logdir, network)): src_events = enumerate_channel(os.path.join(logdir, network, channel)) out_events = enumerate_channel(os.path.join(outdir, network, channel)) assert len(src_events) == len(out_events) assert [ev[1] for ev in src_events] == [ev[1] for ev in out_events] out_prev_stamp = 0 for i in range(len(src_events)): assert abs(src_events[i][0] - out_events[i][0]) in [3600, 7200] if out_events[i][0] < out_prev_stamp: print(src_events[i]) print(out_events[i]) sys.exit(1) out_prev_stamp = out_events[i][0] print("OK")