# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Services representing and dealing with the program timetable. - TimetableService: Representation of the timetable containing timeslots for being scheduled. - TimetableMerger: Processor to merge remote timeslots with the local ones. """ from __future__ import annotations import logging import os from datetime import datetime import confuse import jsonpickle import aura_engine.engine as engine import aura_engine.scheduling.api as api from aura_engine.base.config import AuraConfig from aura_engine.base.lang import synchronized from aura_engine.base.utils import SimpleUtil as SU from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot class TimetableService: """ Timetable Service. The current program timetable as per scheduling calendar. The timetable is a set of timeslots over the period of 24 hours. """ config: confuse.Configuration logger: logging.Logger timetable: list[Timeslot] | None = None api_fetcher: api.ApiFetcher | None = None last_successful_fetch: datetime | None = None cache_location: str | None = None timetable_file: str | None = None restorable: bool def __init__(self, cache_location: str): """ Initialize. Args: cache_location (str): Path to folder where timetable.json is stored. """ self.config = AuraConfig.instance.config self.logger = logging.getLogger("engine") jsonpickle.set_encoder_options("json", sort_keys=False, indent=2) if cache_location[-1] != "/": cache_location += "/" cache_location += "timetable" os.makedirs(cache_location, exist_ok=True) self.cache_location = cache_location self.timetable_file = self.cache_location + "/timetable.json" self.restorable = False @synchronized def refresh(self): """ Update the timetable. 1. Fetch the latest timeslots from the API or the API cache. 2. Merge with the current timetable. 3. Persist to `timetable.json` """ self.logger.debug("Trying to fetch new timeslots from API endpoints...") if not self.api_fetcher: self.api_fetcher = api.ApiFetcher() self.api_fetcher.start() response = self.api_fetcher.fetch() self.api_fetcher = None if response.code == 0: if len(response.timeslots) > 0: self.last_successful_fetch = datetime.now() msg = f"Finished fetching current timeslots from API ({len(response)})" self.logger.info(SU.green(msg)) self.merge_timetable(response.timeslots) # FIXME: for some unknown reason storing the timetable fails. This does not affect # the playout, since storing the timetable is not in use atm. # self.persist_timetable() else: self.logger.warning("Program fetched from Steering contains no timeslots!") else: msg = SU.red(f"Keep using current timetable, as API returned: {response.message}") self.logger.warning(msg) self.logger.debug("Exception in API Fetcher: \n" + str(response.exception)) def merge_timetable(self, remote_timeslots: list): """ Merge the fetched timeslots with the current local timetable. Args: remote_timeslots (list): the timeslots fetched from the API. """ merger: TimetableMerger = TimetableMerger() self.timetable = merger.merge(self.timetable, remote_timeslots) self.logger.info(SU.green("Merged timetable")) def persist_timetable(self): """ Stores the current timetable to the local JSON file used for caching. """ if self.timetable_file is None: self.logger.error(SU.red("No timetable file specified.")) return # Ensure the directory exists os.makedirs(os.path.dirname(self.timetable_file), exist_ok=True) try: with open(self.timetable_file, "w") as file: file.write(jsonpickle.encode(self.timetable, unpicklable=self.restorable)) self.logger.info(SU.green("timetable.json stored")) except FileNotFoundError as fnf_error: self.logger.error(SU.red(f"File not found: {self.timetable_file}. {fnf_error}")) except PermissionError as perm_error: self.logger.error( SU.red(f"Permission denied for file: {self.timetable_file}. {perm_error}") ) except Exception as e: self.logger.error(SU.red(f"Unexpected error while storing {self.timetable_file}: {e}")) def get_current_item(self) -> PlaylistItem | None: """ Retrieve the current playlist item which should be played as per timetable. Returns: (PlaylistItem): The track which is (or should) currently be on air. """ now: float = engine.Engine.engine_time() # If necessary initialize timetable if not self.timetable: self.refresh() # Check for current timeslot current_timeslot: Timeslot = self.get_current_timeslot() if not current_timeslot: self.logger.warning(SU.red("There's no active timeslot")) return None # Check for scheduled playlist current_playlist: Playlist = current_timeslot.get_current_playlist() if not current_playlist: msg = ( "There's no (default) playlist assigned to the current timeslot." " Most likely a fallback will make things okay again." ) self.logger.warning(SU.red(msg)) return None # Iterate over playlist items and store the current one current_item = None for item in current_playlist.items: if item.get_start() <= now and now <= item.get_end(): current_item = item break if not current_item: # Nothing playing ... fallback will kick-in msg = f"There's no current item in playlist {current_playlist}" self.logger.warning(SU.red(msg)) return None return current_item def get_current_timeslot(self) -> Timeslot: """ Retrieve the timeslot currently to be played. Returns: (Timeslot): The current timeslot. """ current_timeslot = None now = engine.Engine.engine_time() # Iterate over all timeslots and find the one to be played right now if self.timetable: timeslot: Timeslot for timeslot in self.timetable: if timeslot.get_start() <= now and now < timeslot.get_end(): current_timeslot = timeslot break return current_timeslot def get_next_timeslots(self, max_count: int = 0, window_aware=False) -> list[Timeslot]: """ Retrieve the timeslots to be played after the current one. The method allows to return only a max count of timeslots. Also, timeslots which are not within the scheduling window can be optionally omitted. The scheduling window behaviour has these effects, when scheduling timeslots: - Before the scheduling window: Timeslots can still be deleted in Steering and the playout will respect this. - During the scheduling window: Timeslots and it's playlists are queued as timed commands. - After the scheduling window: Such timeslots are ignored, because it doesn't make sense anymore to schedule them before the next timeslot starts. Args: max_count (Integer): Maximum of timeslots to return. If `0` is passed, all existing ones are returned. window_aware (bool): If set to true, only timeslots within the scheduling window are returned. Returns: ([Timeslot]): The upcoming timeslots. """ now = engine.Engine.engine_time() next_timeslots = [] if not self.timetable: return [] ts: Timeslot for ts in self.timetable: if ts.get_start() > now: in_window = self.is_timeslot_in_window(ts) if not window_aware or in_window: if (len(next_timeslots) < max_count) or max_count == 0: next_timeslots.append(ts) else: break else: start = SU.fmt_time(ts.get_start()) end = SU.fmt_time(ts.get_end()) t1: int = self.config.scheduler.scheduling_window_start t2: int = self.config.scheduler.scheduling_window_end msg = f"Skip timeslot #{ts.get_id()} [{start} - {end}] " msg += f"- not in scheduling window T¹-{t1}s to T²-{t2}s" self.logger.debug(msg) return next_timeslots def is_timeslot_in_window(self, timeslot: Timeslot) -> bool: """ Check if the timeslot is within the scheduling window. The scheduling window represents the soonest and latest point some timeslot can be scheduled. Args: timeslot (Timeslot): The timeslot to check. Returns (bool): True if it is within the window. """ now = engine.Engine.engine_time() window_start = self.config.scheduler.scheduling_window_start window_end = self.config.scheduler.scheduling_window_end if timeslot.get_start() - window_start < now and timeslot.get_end() - window_end > now: return True return False def terminate(self): """ Call this when the thread is stopped or a signal to terminate is received. """ self.logger.info(SU.yellow("[TimetableService] Shutting down...")) if self.api_fetcher: self.api_fetcher.terminate() class TimetableMerger: """ Compare and merge the local with the remote set of timeslots. """ config: confuse.Configuration logger: logging.Logger def __init__(self): """ Initialize. """ self.config = AuraConfig.instance.config self.logger = logging.getLogger("engine") def build_map( self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None ) -> dict[str, dict]: """ Build a map of local and remote timeslots relations. The start time of any timeslot acts as the key. The map value holds a dictionary, where the `local` key is the local timeslot, and the `remote` key holds the remote timeslot. Args: local_timeslots ([Timeslot]): The current timetable. remote_timeslots ([Timeslot]): The latest timeslots from the API. Returns: ({str: {}}): Map with timestamp as key and a dictionary referencing timeslots. """ timeslot_map: dict[str, dict] = {} if local_timeslots: for ts in local_timeslots: if not timeslot_map.get(str(ts.get_start())): timeslot_map[str(ts.get_start())] = {} idx = timeslot_map[str(ts.get_start())] # Existing virtual timeslots are ignored, because they are likely updated remotely. # if not ts.is_virtual(): idx["local"] = ts idx["remote"] = None if remote_timeslots: for ts in remote_timeslots: if not timeslot_map.get(str(ts.get_start())): timeslot_map[str(ts.get_start())] = {} idx = timeslot_map[str(ts.get_start())] idx["remote"] = ts if not idx.get("local"): idx["local"] = None return timeslot_map @synchronized def merge( self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None ) -> list[Timeslot]: """ Merge strategy for local and remote timeslots. Args: local_timeslots (Timeslot]): Local timeslots. remote_timeslots (Timeslot]): Remote timeslots. Returns: [Timeslot]: The merged timeslots. """ merged_ts = [] timeslot_map = self.build_map(local_timeslots, remote_timeslots) if not timeslot_map: return merged_ts scheduling_window_start = self.config.scheduler.scheduling_window_start now: float = SU.timestamp() resolution: str = "" merge_info = SU.cyan("\nMap for timetable merge:") for timestamp, ts_relation in timeslot_map.items(): local: Timeslot = ts_relation.get("local") remote: Timeslot = ts_relation.get("remote") if (float(timestamp) - scheduling_window_start) < now: # It's past the scheduling window start, so keep the local one as is if local: # Only keep timeslots which have not yet ended. if local.end > now: merged_ts.append(local) resolution = "add | currently playing" else: resolution = "skip | out of scheduling window" else: # No local timeslot, so add it in any case. resolution = "add" merged_ts.append(remote) else: if local and not remote: # Timeslot was deleted remotely, remove any local one resolution = "remove" elif not local and remote: # Timeslot was added remotely resolution = "add" merged_ts.append(remote) elif local and remote: # Timeslot existing locally, was updated or did not change remotely # Update the local timeslot with possibly changed data local.update(remote) merged_ts.append(local) resolution = "update" else: # Relations w/o local and remote timeslots should not happen self.logger.critical(SU.red("Skipping invalid merge case!")) resolution = "skip | invalid merge case" local_str = "local:" + str(local).ljust(25) remote_str = "remote: " + str(remote).ljust(25) merge_info += SU.cyan( f"\n\tTIME:{timestamp} - {local_str} | {remote_str} ↦ ({resolution})" ) self.logger.debug(merge_info + "\n") return merged_ts