generate pretty table from Timetable class, display with cv2
This commit is contained in:
347
main.py
347
main.py
@@ -1,147 +1,238 @@
|
||||
import requests
|
||||
from datetime import datetime, timedelta
|
||||
import xmltodict
|
||||
from cachetools import cached, TTLCache
|
||||
from dataclasses import dataclass, field
|
||||
from src.api import Timetable, Stop
|
||||
|
||||
eva = 8002377
|
||||
from matplotlib.colors import hex2color
|
||||
|
||||
base_url = "https://iris.noncd.db.de/iris-tts/timetable"
|
||||
import cv2
|
||||
from PIL import ImageFont, ImageDraw, Image
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
@cached(cache=TTLCache(maxsize=128, ttl=60))
|
||||
def _get_changes(eva):
|
||||
url = f"{base_url}/fchg/{eva}"
|
||||
x = requests.get(url)
|
||||
changes = xmltodict.parse(x.text)['timetable']['s']
|
||||
return changes
|
||||
badge_colors = {
|
||||
"S1": {"fill": "#18bae7", "text": "#C6D3D7"},
|
||||
"S2": {"fill": "#74b72b", "text": "#C6D3D7"},
|
||||
"S3": {"fill": "#941d81", "text": "#C6D3D7"},
|
||||
"S4": {"fill": "#e20b1c", "text": "#C6D3D7"},
|
||||
"S5": {"fill": "#005280", "text": "#C6D3D7"},
|
||||
"S6": {"fill": "#008c59", "text": "#C6D3D7"},
|
||||
"S7": {"fill": "#892f24", "text": "#C6D3D7"},
|
||||
"S8": {"fill": "#0d0d11", "text": "#eeaa00"},
|
||||
"S20": {"fill": "#e8526d", "text": "#C6D3D7"},
|
||||
}
|
||||
|
||||
class Departure:
|
||||
def __init__(self, event, station=None, eva=None):
|
||||
dp = event['dp']
|
||||
self.id = event['@id']
|
||||
try:
|
||||
self.line = f"{event['tl']['@c']}{dp['@l']}"
|
||||
except KeyError:
|
||||
self.line = '??'
|
||||
self.time = datetime.strptime(dp['@pt'], "%y%m%d%H%M")
|
||||
self.old_time = self.time
|
||||
self.delayed = None
|
||||
self.platform = dp['@pp']
|
||||
path = dp['@ppth']
|
||||
self.target = path.split('|')[-1]
|
||||
self.path = path.split('|')
|
||||
self.station = station
|
||||
self.eva = eva
|
||||
@dataclass
|
||||
class ImageSetup:
|
||||
width: int = 1920
|
||||
height: int = 1080
|
||||
bg_color: str = "#282a35"
|
||||
fonts: dict = field(
|
||||
default_factory=lambda: {
|
||||
"title": ("./fonts/Rubik-Bold.ttf", 80),
|
||||
"badge": ("./fonts/Rubik-SemiBold.ttf", 70),
|
||||
"column_title": ("./fonts/Rubik-Regular.ttf", 50),
|
||||
"default": ("./fonts/Rubik-Regular.ttf", 60),
|
||||
}
|
||||
)
|
||||
|
||||
text_colors: dict = field(
|
||||
default_factory=lambda: {
|
||||
"title": "#899194",
|
||||
"column_title": "#555A5C",
|
||||
"default": "#899194"
|
||||
}
|
||||
)
|
||||
margins: tuple = (60, 60, 60, 60) # t, b, l, r
|
||||
|
||||
|
||||
def denormalize(coords, setup: ImageSetup, use_margins=True):
|
||||
(x, y) = coords
|
||||
margins = setup.margins if use_margins else (0, 0, 0, 0)
|
||||
return (
|
||||
int(x * (setup.width - margins[2] - margins[3]) + margins[2]),
|
||||
int(y * (setup.height - margins[0] - margins[1]) + margins[0]),
|
||||
)
|
||||
|
||||
|
||||
def create_image(tt: Timetable, setup: ImageSetup | None = None):
|
||||
if setup is None:
|
||||
setup = ImageSetup()
|
||||
|
||||
img = np.zeros((setup.height, setup.width, 3), dtype=np.uint8)
|
||||
|
||||
# fill with background color
|
||||
for i, c in enumerate(hex2color(setup.bg_color)):
|
||||
img[:, :, i] = int(c * 255)
|
||||
|
||||
### setup done ###
|
||||
|
||||
draw = ImageDraw.Draw(Image.fromarray(img))
|
||||
|
||||
title_font = ImageFont.truetype(*setup.fonts["title"])
|
||||
# ctitle_font = ImageFont.truetype(*setup.fonts["column_title"])
|
||||
badge_font = ImageFont.truetype(*setup.fonts["badge"])
|
||||
default_font = ImageFont.truetype(*setup.fonts["default"])
|
||||
|
||||
draw.text(
|
||||
xy=denormalize((0, 0), setup),
|
||||
text=tt.station,
|
||||
font=title_font,
|
||||
anchor="la",
|
||||
fill=setup.text_colors["title"]
|
||||
)
|
||||
|
||||
draw.text(
|
||||
xy=denormalize((0.815, 0), setup),
|
||||
text=tt.timestamp.strftime(r"%H:%M:%S"),
|
||||
font=title_font,
|
||||
anchor="la",
|
||||
fill=setup.text_colors["title"]
|
||||
)
|
||||
|
||||
|
||||
|
||||
line_badge_size = (2*setup.fonts["badge"][1], setup.fonts["badge"][1])
|
||||
line_height = setup.fonts["badge"][1]*1.75
|
||||
|
||||
def fetch_actual(self, eva):
|
||||
changes = _get_changes(eva)
|
||||
for event in changes:
|
||||
if event['@id'] == self.id:
|
||||
self.time = datetime.strptime(event['dp']['@ct'], "%y%m%d%H%M")
|
||||
self.delayed = self.time - self.old_time
|
||||
self.delayed = int(self.delayed.total_seconds()/60)
|
||||
if self.delayed == 0:
|
||||
self.delayed = None
|
||||
pass
|
||||
xy_start = denormalize((0,0.15), setup)
|
||||
dest_x = denormalize((0.12, 0), setup)[0]
|
||||
dep_x = denormalize((0.5, 0), setup)[0]
|
||||
max_dest_len = dep_x - dest_x - 60
|
||||
until_x = denormalize((0.5+0.5-0.12, 0), setup)[0]
|
||||
# col_title_y = denormalize((0, 0.15), setup)[1]
|
||||
|
||||
def __repr__(self):
|
||||
# draw.text(
|
||||
# xy=(dest_x, col_title_y),
|
||||
# text="Richtung",
|
||||
# font=ctitle_font,
|
||||
# anchor="lm",
|
||||
# fill=setup.text_colors["column_title"],
|
||||
# )
|
||||
|
||||
# draw.text(
|
||||
# xy=(dep_x, col_title_y),
|
||||
# text="Zeit",
|
||||
# font=ctitle_font,
|
||||
# anchor="lm",
|
||||
# fill=setup.text_colors["column_title"],
|
||||
# )
|
||||
|
||||
# draw.text(
|
||||
# xy=(until_x, col_title_y),
|
||||
# text="fährt in",
|
||||
# font=ctitle_font,
|
||||
# anchor="lm",
|
||||
# fill=setup.text_colors["column_title"],
|
||||
# )
|
||||
|
||||
current_xys_line_badge = [*xy_start, xy_start[0]+line_badge_size[0], xy_start[1]+line_badge_size[1]]
|
||||
for stop in tt.stops[:min(tt.min_stop_count, len(tt.stops), 7)]:
|
||||
stop: Stop = stop
|
||||
draw.rounded_rectangle(
|
||||
current_xys_line_badge,
|
||||
radius=min(current_xys_line_badge[2]-current_xys_line_badge[0], current_xys_line_badge[3]-current_xys_line_badge[1])//2,
|
||||
fill=badge_colors[stop.line]["fill"],
|
||||
outline=badge_colors[stop.line]["fill"],
|
||||
width=10
|
||||
)
|
||||
|
||||
center_xy = ((current_xys_line_badge[0]+current_xys_line_badge[2])//2, (current_xys_line_badge[1]+current_xys_line_badge[3])//2)
|
||||
|
||||
draw.text(
|
||||
xy=center_xy,
|
||||
text=stop.line,
|
||||
font=badge_font,
|
||||
anchor="mm",
|
||||
fill=badge_colors[stop.line]["text"],
|
||||
)
|
||||
|
||||
dest_text :str = stop.destination
|
||||
dest_text = dest_text.replace("München-", "")
|
||||
dest_text = dest_text.replace("München ", "")
|
||||
paren_index = dest_text.find("(")
|
||||
if paren_index != -1:
|
||||
dest_text = dest_text[:paren_index]
|
||||
|
||||
cutoff = 0
|
||||
while default_font.getlength(dest_text) > max_dest_len:
|
||||
dest_text = (dest_text[:len(dest_text)-cutoff+1]+ ("." if cutoff > 0 else "")).replace(" .", "")
|
||||
cutoff += 1
|
||||
|
||||
dep_text = stop.departure_time.strftime(r"%H:%M")
|
||||
|
||||
time_until = round((stop.departure_time - tt.timestamp).total_seconds()/60)
|
||||
|
||||
until_text = f"{time_until:d} min"
|
||||
|
||||
dep_color = setup.text_colors["default"]
|
||||
if stop.departure_delay is not None:
|
||||
dep_text = f"{dep_text} ({stop.departure_delay:+d})"
|
||||
|
||||
if time_until <= 6:
|
||||
dep_color = "#c83f4e"
|
||||
elif 6 < time_until < 10:
|
||||
dep_color = "#c8633f"
|
||||
|
||||
if stop.departure_canceled:
|
||||
dep_text = "Fällt aus :("
|
||||
dep_color = "#9f3c76"
|
||||
until_text = ""
|
||||
|
||||
draw.text(
|
||||
xy=(dest_x, center_xy[1]),
|
||||
text=dest_text,
|
||||
font=default_font,
|
||||
anchor="lm",
|
||||
fill=dep_color,
|
||||
)
|
||||
|
||||
draw.text(
|
||||
xy=(dep_x, center_xy[1]),
|
||||
text=dep_text,
|
||||
font=default_font,
|
||||
anchor="lm",
|
||||
fill=dep_color,
|
||||
)
|
||||
|
||||
draw.text(
|
||||
xy=(until_x, center_xy[1]),
|
||||
text=until_text,
|
||||
font=default_font,
|
||||
anchor="lm",
|
||||
fill=dep_color,
|
||||
)
|
||||
|
||||
current_xys_line_badge[1] += line_height
|
||||
current_xys_line_badge[3] += line_height
|
||||
|
||||
|
||||
# last step, convert from rgb to bgr
|
||||
img = np.asarray(draw.im, dtype=np.uint8).reshape(setup.height, setup.width, 3)
|
||||
return np.flip(img, axis=-1)
|
||||
|
||||
return f"{self.line} to {self.target}: {self.time.strftime("%Y-%m-%d %H:%M")}{f" ({self.delayed:+d})" if self.delayed is not None else ""} @ platform {self.platform}"
|
||||
|
||||
def main():
|
||||
# eva = 8002377
|
||||
eva = 8004158
|
||||
|
||||
# current_timestamp = datetime(year=2025, month=8, day=22, hour=22, minute=51, second=0)
|
||||
retrieve_and_print_schedule(eva, number_future_deps=100)
|
||||
cv2.namedWindow("fs", cv2.WND_PROP_FULLSCREEN)
|
||||
cv2.setWindowProperty("fs", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
||||
cv2.moveWindow("fs", 1920, 0)
|
||||
|
||||
pass
|
||||
tt = Timetable(eva=eva)
|
||||
|
||||
def retrieve_and_print_schedule(eva, target_timestamp = None, future_only = True, number_future_deps=6):
|
||||
current_timestamp = datetime.now() if target_timestamp is None else target_timestamp
|
||||
while True:
|
||||
start = time.time()
|
||||
tt.get_stops()
|
||||
img = create_image(tt)
|
||||
cv2.imshow("fs", img)
|
||||
|
||||
timetable = fetch_timetable(current_timestamp, eva)
|
||||
departures = extract_departures(timetable, eva)
|
||||
if cv2.waitKey(1) & 0xFF == ord("q"):
|
||||
break
|
||||
|
||||
if current_timestamp.minute < 10 and not future_only:
|
||||
timetable = fetch_timetable(current_timestamp - timedelta(hours=1), eva)
|
||||
departures = extract_departures(timetable, eva, departures)
|
||||
# elif current_timestamp.minute > 50:
|
||||
# # current_timestamp = timedelta(hours=1)
|
||||
# timetable = fetch_timetable(current_timestamp + timedelta(hours=1), eva)
|
||||
# departures = extract_departures(timetable, eva, departures)
|
||||
|
||||
if future_only:
|
||||
split_index = -1
|
||||
for di, dep in enumerate(departures):
|
||||
if dep.time < current_timestamp:
|
||||
split_index = di
|
||||
departures = departures[split_index+1:]
|
||||
elapsed = time.time() - start
|
||||
time.sleep(max(0, 1.0 - elapsed))
|
||||
|
||||
cnt = 0
|
||||
while len(departures) < number_future_deps and cnt <= 24:
|
||||
current_timestamp += timedelta(hours=1)
|
||||
timetable = fetch_timetable(current_timestamp, eva)
|
||||
departures = extract_departures(timetable, eva, departures)
|
||||
cnt += 1
|
||||
|
||||
# departures = departures[:number_future_deps]
|
||||
|
||||
print(f"{departures[0].station} ({departures[0].station})")
|
||||
for abf in departures:
|
||||
# if abf.time >= current_timestamp:
|
||||
print(abf)
|
||||
|
||||
def fetch_timetable(target_datetime: datetime, eva):
|
||||
|
||||
"""
|
||||
timetable:
|
||||
@station: // station name
|
||||
s: // stop, contains list of the following
|
||||
@id // id
|
||||
tl: // trip label
|
||||
@f // distance class, F: Fern, N: Nah, S: Stadt
|
||||
@t // trip type: e, p, z, s, h, n -> normally p
|
||||
@o // owner: EVU-Number 800725 is S-Bahn München
|
||||
@c // category: CE, IC, EC, IRE, RE, RB, S, MEX, TGJ, NJ, Bus
|
||||
@n // train number
|
||||
ar: // arrival
|
||||
see dp
|
||||
dp: // departure
|
||||
@pt // yyMMddHHmm
|
||||
@pp // platform number
|
||||
@l // line. -> tl.@c + dp.@l make up the train line (eg S3)
|
||||
@ppth // path, stations separated by |. for the last station in dp.@ppth is the destination, the first station in ar.@ppth is the origin
|
||||
|
||||
"""
|
||||
|
||||
daystamp = target_datetime.strftime(r"%y%m%d")
|
||||
hourstamp = target_datetime.strftime(r"%H")
|
||||
|
||||
url = f"{base_url}/plan/{eva}/{daystamp}/{hourstamp}"
|
||||
x = requests.get(url)
|
||||
if x.status_code != 200:
|
||||
return None
|
||||
timetable = xmltodict.parse(x.text)['timetable']
|
||||
return timetable
|
||||
|
||||
def extract_departures(timetable, eva, departures=None):
|
||||
if departures is None:
|
||||
departures = []
|
||||
if timetable is None:
|
||||
return departures
|
||||
|
||||
events = timetable['s']
|
||||
if not isinstance(events, list):
|
||||
events = [events]
|
||||
for event in events:
|
||||
if event['tl']['@f'] != 'S':
|
||||
# only s-bahnen are supported for now
|
||||
continue
|
||||
departure = Departure(event, station=timetable['@station'], eva=eva)
|
||||
departure.fetch_actual(eva)
|
||||
departures.append(departure)
|
||||
|
||||
departures.sort(key=lambda abf: abf.time)
|
||||
return departures
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user