summaryrefslogtreecommitdiff
path: root/log-tz-convert.py
diff options
context:
space:
mode:
authorTom Smeding <tom@tomsmeding.com>2026-05-08 01:05:28 +0200
committerTom Smeding <tom@tomsmeding.com>2026-05-08 01:05:28 +0200
commit5ba0fc25d8bfdaedd1a8ff7743ed2b0224d34ff3 (patch)
tree731c4d954084a977ae34dbaecc34fd46a56ec36f /log-tz-convert.py
parentb1429cd1cf064b69cd4bf8935885bba2e9dcfcb1 (diff)
Rename log-tz-convert.py to -znc-NL
Diffstat (limited to 'log-tz-convert.py')
-rwxr-xr-xlog-tz-convert.py161
1 files changed, 0 insertions, 161 deletions
diff --git a/log-tz-convert.py b/log-tz-convert.py
deleted file mode 100755
index 85d3f54..0000000
--- a/log-tz-convert.py
+++ /dev/null
@@ -1,161 +0,0 @@
-#!/usr/bin/env python3
-import re, sys, os, calendar, time
-
-if len(sys.argv) != 3 or sys.argv[1] == "-h" or sys.argv[1] == "--help":
- print(f"Usage: {sys.argv[0]} <logdir> <outdir>")
- print("The <logdir> is expected to contain network/#channel/YYYY-mm-dd.log files.")
- print("These will be converted from Europe/Amsterdam (i.e. CET/CEST) to UTC; the")
- print("output is written to <outdir>. The <logdir> is not changed.")
- if len(sys.argv) != 2: sys.exit(1)
- else: sys.exit(0)
-
-logdir = sys.argv[1]
-outdir = sys.argv[2]
-os.mkdir(outdir)
-
-class Logger:
- def __init__(self, destdir):
- self.destdir = destdir
- self.curdate = None # (Y, m, d)
- self.events = [] # [((H, M, S), text)]
-
- def addevent(self, ymd, hms, text):
- if self.curdate is not None and self.curdate != ymd:
- self.writeout()
-
- if self.curdate is None:
- self.curdate = ymd
- self.events = [(hms, text)]
- else:
- assert self.curdate == ymd
- self.events.append((hms, text))
-
- def writeout(self):
- (Y, m, d) = self.curdate
- with open(os.path.join(self.destdir, f"{Y:04}-{m:02}-{d:02}.log"), "wb") as f:
- for (H, M, S), text in self.events:
- f.write(f"[{H:02}:{M:02}:{S:02}] ".encode("ascii") + text + b"\n")
- self.curdate = None
- self.events = []
-
-def parse_line(line):
- if line[-1] == b"\n"[0]: line = line[:-1]
- space = line.find(b" ")
- if space != 10: raise Exception(f"No space found in right spot on line: {line!r}")
- timestr = line[:10].decode("ascii")
- try:
- (H, M, S) = re.match(r"\[([0-9]{2}):([0-9]{2}):([0-9]{2})\]$", timestr).groups()
- except Exception as e:
- raise Exception(f"Could not parse time marker on line: {line!r}")
-
- text = line[11:]
- return ((int(H), int(M), int(S)), text)
-
-def last_sunday_in(Y, m):
- date = calendar.monthrange(Y, m)[1] # number of days in month, i.e. last date
- while calendar.weekday(Y, m, date) != calendar.SUNDAY:
- date -= 1
- return date
-
-assert last_sunday_in(2026, 3) == 29
-assert last_sunday_in(2026, 10) == 25
-
-def convert_logfile(filename, logger):
- try:
- (Y, m, d) = re.match(r".*/([0-9]{4})-([0-9]{2})-([0-9]{2})\.log$", filename).groups()
- Y = int(Y)
- m = int(m)
- d = int(d)
- except Exception as e:
- raise Exception(f"Could not parse log file name {filename!r}")
-
- # DST starts on the last Sunday in March at 02:00 CET
- enterdst = (3, last_sunday_in(Y, 3))
- # DST ends on the last Sunday in October at 03:00 CEST, which isn't reached
- # and instead we continue from 02:00 CET
- leavedst = (10, last_sunday_in(Y, 10))
-
- with open(filename, "rb") as f:
- prevHMS = None
- switchedToCET = None # only ever used on 'leavedst' day
-
- for line in f:
- (H, M, S), text = parse_line(line)
- if (m, d) < enterdst or \
- ((m, d) == enterdst and (H, M, S) < (2, 0, 0)) or \
- ((m, d) == leavedst and (H, M, S) >= (3, 0, 0)) or \
- (m, d) > leavedst:
- # CET = UTC+1; subtract because interpreting a CET time as UTC
- # leaves you an hour too far in the future
- timestamp = calendar.timegm((Y, m, d, H, M, S)) - 3600
-
- elif ((m, d) == enterdst and (H, M, S) >= (3, 0, 0)) or \
- ((m, d) > enterdst and (m, d) < leavedst) or \
- ((m, d) == leavedst and (H, M, S) < (2, 0, 0)):
- # CEST = UTC+2
- timestamp = calendar.timegm((Y, m, d, H, M, S)) - 2*3600
-
- elif (m, d) == enterdst and (2, 0, 0) <= (H, M, S) < (3, 0, 0):
- raise f"Invalid time in limbo between CET and CEST: {f!r}: {line!r}"
-
- elif (m, d) == leavedst and (2, 0, 0) <= (H, M, S) < (3, 0, 0):
- # ambiguous interval: either before or after the CEST->CET switch
- if switchedToCET is None: switchedToCET = False # mark that we entered here
- if (H, M, S) < prevHMS: switchedToCET = True # time warp means that's the switch
- if not switchedToCET:
- timestamp = calendar.timegm((Y, m, d, H, M, S)) - 2*3600
- else:
- timestamp = calendar.timegm((Y, m, d, H, M, S)) - 3600
-
- Y2, m2, d2, H2, M2, S2, *_ = time.gmtime(timestamp)
- logger.addevent((Y2, m2, d2), (H2, M2, S2), text)
-
- prevHMS = (H, M, S)
-
- if switchedToCET == False:
- print(f"{filename!r}: ambiguous CEST->CET switch")
-
-print("Converting")
-for network in os.listdir(logdir):
- os.mkdir(os.path.join(outdir, network))
- for channel in os.listdir(os.path.join(logdir, network)):
- os.mkdir(os.path.join(outdir, network, channel))
- logger = Logger(os.path.join(outdir, network, channel))
- for filename in sorted(os.listdir(os.path.join(logdir, network, channel))):
- convert_logfile(os.path.join(logdir, network, channel, filename), logger)
- logger.writeout()
-
-print("Checking")
-def enumerate_channel(chandir):
- events = []
- for filename in sorted(os.listdir(chandir)):
- with open(os.path.join(chandir, filename), "rb") as f:
- lines = f.read().split(b"\n")
- if lines[-1] == b"": lines = lines[:-1]
-
- def compute_stamp(line):
- timepart = line[:10].decode("ascii")
- Y, m, d, H, M, S, *_ = re.match(r"^(....)-(..)-(..)\.log\|\[(..):(..):(..)\]$", filename + "|" + timepart).groups()
- return calendar.timegm((int(Y), int(m), int(d), int(H), int(M), int(S)))
-
- events += [(compute_stamp(line), line[line.index(b" "[0])+1:])
- for line in lines]
- return events
-
-for network in os.listdir(logdir):
- for channel in os.listdir(os.path.join(logdir, network)):
- src_events = enumerate_channel(os.path.join(logdir, network, channel))
- out_events = enumerate_channel(os.path.join(outdir, network, channel))
-
- assert len(src_events) == len(out_events)
- assert [ev[1] for ev in src_events] == [ev[1] for ev in out_events]
- out_prev_stamp = 0
- for i in range(len(src_events)):
- assert abs(src_events[i][0] - out_events[i][0]) in [3600, 7200]
- if out_events[i][0] < out_prev_stamp:
- print(src_events[i])
- print(out_events[i])
- sys.exit(1)
- out_prev_stamp = out_events[i][0]
-
-print("OK")