From e88e2b622e9d7c27dd72f9a64285c0d1a7d2350d Mon Sep 17 00:00:00 2001 From: David Trattnig <david@subsquare.at> Date: Wed, 11 Oct 2023 10:54:16 +0200 Subject: [PATCH] refactor: timetable render with new domain model --- src/aura_engine/scheduling/utils.py | 214 ++++++++++++---------------- 1 file changed, 93 insertions(+), 121 deletions(-) diff --git a/src/aura_engine/scheduling/utils.py b/src/aura_engine/scheduling/utils.py index d1dea719..98bb02b0 100644 --- a/src/aura_engine/scheduling/utils.py +++ b/src/aura_engine/scheduling/utils.py @@ -22,11 +22,14 @@ A collection of utilities used for scheduling. """ +from __future__ import annotations + import logging +import aura_engine.scheduling as scheduling from aura_engine.base.config import AuraConfig from aura_engine.base.utils import SimpleUtil as SU -from aura_engine.scheduling.domain import PlaylistItem +from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot class M3UPlaylistProcessor: @@ -115,169 +118,140 @@ class TimetableRenderer: Display current and next timeslots in ASCII for maintenance and debugging. """ - logger = None - scheduler = None - programme = None + logger: logging.Logger + scheduler: scheduling.scheduler.AuraScheduler + timetable: timetable.TimetableService - def __init__(self, scheduler): + def __init__(self, scheduler: scheduling.scheduler.AuraScheduler): self.logger = logging.getLogger("engine") self.scheduler = scheduler - self.programme = scheduler.get_timetable() + self.timetable = scheduler.get_timetable() - def get_ascii_timeslots(self): + def get_ascii_timeslots(self) -> str: """ - Create a printable version of the current programme. + Create a printable version of the current timetable. The output contains playlists and items as per timeslot. Returns: - (String): An ASCII representation of the current and next timeslots + (String): An ASCII representation of the current and next timeslots. """ - active_timeslot = self.programme.get_current_timeslot() + active_timeslot: Timeslot = self.timetable.get_current_timeslot() s = "\n\n SCHEDULED NOW:" - s += ( - "\n┌───────────────────────────────────────────────────────────────────────" - "───────────────────────────────" - ) - if active_timeslot: - planned_playlist = None - if active_timeslot.playlist: - planned_playlist = active_timeslot.playlist + s += "\n┌".ljust(102, "-") - (playlist_type, resolved_playlist) = self.scheduler.resolve_playlist(active_timeslot) + if active_timeslot: + planned_playlist: Playlist = active_timeslot.get_current_playlist() + s += f"\n│ Playing timeslot {active_timeslot}" - s += "\n│ Playing timeslot %s " % active_timeslot if planned_playlist: - if ( - resolved_playlist - and resolved_playlist.playlist_id != planned_playlist.playlist_id - ): - s += "\n│ └── Playlist %s " % planned_playlist - s += "\n│ " - s += SU.red("↑↑↑ That's the originally planned playlist.") + ( - "Instead playing the default playlist below ↓↓↓" - ) - - if resolved_playlist: - if not planned_playlist: - s += "\n│ " - s += SU.red( - f"No playlist assigned to timeslot. Instead playing the" - f" '{playlist_type.get('name')}' playlist below ↓↓↓" - ) - - s += "\n│ └── Playlist %s " % resolved_playlist - - active_item = self.programme.get_current_item() - - # Finished items - for item in resolved_playlist.items: - if active_item == item: - break - else: - s += self.build_item_string("\n│ └── ", item, True) - - # Item currently being played - if active_item: - s += "\n│ └── Item %s | %s " % ( - str(active_item.item_num + 1), - SU.green("PLAYING > " + str(active_item)), - ) - - # Open items for current playlist - rest_of_playlist = active_item.get_next_items(False) - items = self.preprocess_items(rest_of_playlist, False) - s += self.build_playlist_string(items) - + t = planned_playlist.get_type() + s += f"\n│ └── {t}: {planned_playlist}" + + active_item = self.timetable.get_current_item() + item_num: int = 1 + + # Display previous playlist items + prev_items: [PlaylistItem] = active_item.get_all_prev() + for item in prev_items: + s += self.build_item_string("\n│ └── ", item_num, item, True) + item_num += 1 + + # Display current playlist item + active_str = SU.green(f"PLAYING > {active_item}]") + s += f"\n│ └── Item {item_num} | {active_str}" + item_num += 1 + + # Display next playlist items + next_items: [PlaylistItem] = active_item.get_all_next() + items = self.preprocess_items(next_items, False) + s += self.build_playlist_string(items, item_num) else: - s += "\n│ └── %s" % ( - SU.red( - "No active playlist." - " There should be at least some fallback playlist running..." - ) - ) + s += "\n│ " + s += SU.red("No playlist assigned. Station fallback will take over.") else: s += "\n│ Nothing. " - s += ( - "\n└───────────────────────────────────────────────────────────────────────" - "───────────────────────────────" - ) + s += "\n└".ljust(102, "-") s += "\n SCHEDULED NEXT:" - s += ( - "\n┌───────────────────────────────────────────────────────────────────────" - "───────────────────────────────" - ) + s += "\n┌".ljust(102, "-") - next_timeslots = self.programme.get_next_timeslots() + next_timeslots: [Timeslot] = self.timetable.get_next_timeslots() if not next_timeslots: s += "\n│ Nothing. " else: for timeslot in next_timeslots: - (playlist_type, resolved_playlist) = self.scheduler.resolve_playlist(timeslot) - if resolved_playlist: - s += "\n│ Queued timeslot %s " % timeslot - s += "\n│ └── Playlist %s (Type: %s)" % ( - resolved_playlist, - SU.cyan(str(playlist_type)), - ) - if resolved_playlist.end_unix > timeslot.end_unix: - s += "\n│ %s! " % ( - SU.red( - "↑↑↑ Playlist #%s ends after timeslot #%s!" - % (resolved_playlist.playlist_id, timeslot.timeslot_id) - ) - ) - - items = self.preprocess_items(resolved_playlist.items, False) - s += self.build_playlist_string(items) - - s += ( - "\n└───────────────────────────────────────────────────────────────────────" - "───────────────────────────────\n\n" - ) + playlist: Playlist = timeslot.get_current_playlist() + if playlist: + t = SU.cyan(str(playlist.get_type())) + s += f"\n│ Queued timeslot {timeslot} " + s += f"\n│ └── {t}: {playlist}" + if playlist.get_duration() > timeslot.get_end() - timeslot.get_start(): + warning = SU.red("↑↑↑ Playlist ends after timeslot!") + s += f"\n│ {warning}! " + + items = self.preprocess_items(playlist.items, False) + s += self.build_playlist_string(items, 1) + + s += "\n└".ljust(102, "-") + "\n\n" return s - def build_playlist_string(self, items): + def build_playlist_string(self, items: [PlaylistItem], num_start: int) -> str: """ Return a stringified list of items. + + Args: + items ([PlaylistItem]): The items. + num_start (int): The number of the first item. + + Returns: + (str): The resulting string to be displayed. """ s = "" is_out_of_timeslot = False + item: PlaylistItem for item in items: if ( - item.queue_state == PlaylistItem.QueueState.OUT_OF_SCHEDULE + item.play_queue_state == PlaylistItem.QueueState.OUT_OF_SCHEDULE and not is_out_of_timeslot ): - s += "\n│ %s" % SU.red( - "↓↓↓ These items won't be played because they are out of timeslot." - ) + warning = SU.red("↓↓↓ These 'out of timeslot' items won't be played.") + s += f"\n│ {warning}" is_out_of_timeslot = True - s += self.build_item_string("\n│ └── ", item, is_out_of_timeslot) + s += self.build_item_string("\n│ └── ", num_start, item, is_out_of_timeslot) + num_start += 1 return s - def build_item_string(self, prefix, item, strike): + def build_item_string(self, prefix: str, num: int, item: PlaylistItem, strike: bool) -> str: """ Return an stringified item. + + Args: + prefix (str): The prefix. + num (int): The number of the item in the playlist. + strike (bool): If the string should be striked through. + + Returns: + (str): The item as string to be displayed. """ s = "" - if item.queue_state == PlaylistItem.QueueState.CUT: - s = "\n│ %s" % SU.red("↓↓↓ This item is going to be cut.") + if item.play_queue_state == PlaylistItem.QueueState.CUT: + warning = SU.red("↓↓↓ This item is going to be cut.") + s = f"\n│ {warning}" if strike: item_str = SU.strike(item) else: item_str = str(item) - item_line = "%sItem %s | %s" % (prefix, str(item.item_num + 1), item_str) - return s + item_line + return s + f"{prefix}Item {num} | {item_str}" - def preprocess_items(self, items, cut_oos): + def preprocess_items(self, items: [PlaylistItem], cut_oos: bool) -> [PlaylistItem]: """ Analyse and mark items which are going to be cut or excluded. @@ -287,24 +261,22 @@ class TimetableRenderer: Returns: ([PlaylistItem]): The list of processed playlist items - """ clean_items = [] + item: PlaylistItem for item in items: - if item.item_start >= item.playlist.timeslot.timeslot_end: - msg = "Filtered item (%s) after end-of timeslot (%s) ... SKIPPED" % ( - item, - item.playlist.timeslot, - ) + timeslot: Timeslot = item.get_playlist().get_timeslot() + if item.get_start() >= timeslot.get_end(): + msg = f"Filtered item ({item}) after end-of timeslot ({timeslot}) ... SKIPPED" self.logger.debug(msg) - item.queue_state = PlaylistItem.QueueState.OUT_OF_SCHEDULE - elif item.end_unix > item.playlist.timeslot.end_unix: - item.queue_state = PlaylistItem.QueueState.CUT + item.play_queue_state = PlaylistItem.QueueState.OUT_OF_SCHEDULE + elif item.get_end() > timeslot.get_end(): + item.play_queue_state = PlaylistItem.QueueState.CUT else: - item.queue_state = PlaylistItem.QueueState.OKAY + item.play_queue_state = PlaylistItem.QueueState.OKAY - if not item.queue_state == PlaylistItem.QueueState.OUT_OF_SCHEDULE or not cut_oos: + if not item.play_queue_state == PlaylistItem.QueueState.OUT_OF_SCHEDULE or not cut_oos: clean_items.append(item) return clean_items -- GitLab