-
Ole Binder authoredOle Binder authored
timetable.py 15.35 KiB
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Services representing and dealing with the program timetable.
- TimetableService: Representation of the timetable containing timeslots for being scheduled.
- TimetableMerger: Processor to merge remote timeslots with the local ones.
"""
from __future__ import annotations
import logging
import os
from datetime import datetime
import confuse
import jsonpickle
import aura_engine.engine as engine
import aura_engine.scheduling.api as api
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import synchronized
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot
class TimetableService:
"""
Timetable Service.
The current program timetable as per scheduling calendar. The timetable is a set of
timeslots over the period of 24 hours.
"""
config: confuse.Configuration
logger: logging.Logger
timetable: list[Timeslot] | None = None
api_fetcher: api.ApiFetcher | None = None
last_successful_fetch: datetime | None = None
cache_location: str | None = None
timetable_file: str | None = None
restorable: bool
def __init__(self, cache_location: str):
"""
Initialize.
Args:
cache_location (str): Path to folder where timetable.json is stored.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
jsonpickle.set_encoder_options("json", sort_keys=False, indent=2)
if cache_location[-1] != "/":
cache_location += "/"
cache_location += "timetable"
os.makedirs(cache_location, exist_ok=True)
self.cache_location = cache_location
self.timetable_file = self.cache_location + "/timetable.json"
self.restorable = False
@synchronized
def refresh(self):
"""
Update the timetable.
1. Fetch the latest timeslots from the API or the API cache.
2. Merge with the current timetable.
3. Persist to `timetable.json`
"""
self.logger.debug("Trying to fetch new timeslots from API endpoints...")
if not self.api_fetcher:
self.api_fetcher = api.ApiFetcher()
self.api_fetcher.start()
response = self.api_fetcher.fetch()
self.api_fetcher = None
if response.code == 0:
if len(response.timeslots) > 0:
self.last_successful_fetch = datetime.now()
msg = f"Finished fetching current timeslots from API ({len(response)})"
self.logger.info(SU.green(msg))
self.merge_timetable(response.timeslots)
# FIXME: for some unknown reason storing the timetable fails. This does not affect
# the playout, since storing the timetable is not in use atm.
# self.persist_timetable()
else:
self.logger.warning("Program fetched from Steering contains no timeslots!")
else:
msg = SU.red(f"Keep using current timetable, as API returned: {response.message}")
self.logger.warning(msg)
self.logger.debug("Exception in API Fetcher: \n" + str(response.exception))
def merge_timetable(self, remote_timeslots: list):
"""
Merge the fetched timeslots with the current local timetable.
Args:
remote_timeslots (list): the timeslots fetched from the API.
"""
merger: TimetableMerger = TimetableMerger()
self.timetable = merger.merge(self.timetable, remote_timeslots)
self.logger.info(SU.green("Merged timetable"))
def persist_timetable(self):
"""
Stores the current timetable to the local JSON file used for caching.
"""
if self.timetable_file is None:
self.logger.error(SU.red("No timetable file specified."))
return
# Ensure the directory exists
os.makedirs(os.path.dirname(self.timetable_file), exist_ok=True)
try:
with open(self.timetable_file, "w") as file:
file.write(jsonpickle.encode(self.timetable, unpicklable=self.restorable))
self.logger.info(SU.green("timetable.json stored"))
except FileNotFoundError as fnf_error:
self.logger.error(SU.red(f"File not found: {self.timetable_file}. {fnf_error}"))
except PermissionError as perm_error:
self.logger.error(
SU.red(f"Permission denied for file: {self.timetable_file}. {perm_error}")
)
except Exception as e:
self.logger.error(SU.red(f"Unexpected error while storing {self.timetable_file}: {e}"))
def get_current_item(self) -> PlaylistItem | None:
"""
Retrieve the current playlist item which should be played as per timetable.
Returns:
(PlaylistItem): The track which is (or should) currently be on air.
"""
now: float = engine.Engine.engine_time()
# If necessary initialize timetable
if not self.timetable:
self.refresh()
# Check for current timeslot
current_timeslot: Timeslot = self.get_current_timeslot()
if not current_timeslot:
self.logger.warning(SU.red("There's no active timeslot"))
return None
# Check for scheduled playlist
current_playlist: Playlist = current_timeslot.get_current_playlist()
if not current_playlist:
msg = (
"There's no (default) playlist assigned to the current timeslot."
" Most likely a fallback will make things okay again."
)
self.logger.warning(SU.red(msg))
return None
# Iterate over playlist items and store the current one
current_item = None
for item in current_playlist.items:
if item.get_start() <= now and now <= item.get_end():
current_item = item
break
if not current_item:
# Nothing playing ... fallback will kick-in
msg = f"There's no current item in playlist {current_playlist}"
self.logger.warning(SU.red(msg))
return None
return current_item
def get_current_timeslot(self) -> Timeslot:
"""
Retrieve the timeslot currently to be played.
Returns:
(Timeslot): The current timeslot.
"""
current_timeslot = None
now = engine.Engine.engine_time()
# Iterate over all timeslots and find the one to be played right now
if self.timetable:
timeslot: Timeslot
for timeslot in self.timetable:
if timeslot.get_start() <= now and now < timeslot.get_end():
current_timeslot = timeslot
break
return current_timeslot
def get_next_timeslots(self, max_count: int = 0, window_aware=False) -> list[Timeslot]:
"""
Retrieve the timeslots to be played after the current one.
The method allows to return only a max count of timeslots. Also, timeslots which are
not within the scheduling window can be optionally omitted.
The scheduling window behaviour has these effects, when scheduling timeslots:
- Before the scheduling window: Timeslots can still be deleted in Steering and the
playout will respect this.
- During the scheduling window: Timeslots and it's playlists are queued as timed
commands.
- After the scheduling window: Such timeslots are ignored, because it doesn't make
sense anymore to schedule them before the next timeslot starts.
Args:
max_count (Integer): Maximum of timeslots to return. If `0` is passed, all existing
ones are returned.
window_aware (bool): If set to true, only timeslots within the scheduling window are
returned.
Returns:
([Timeslot]): The upcoming timeslots.
"""
now = engine.Engine.engine_time()
next_timeslots = []
if not self.timetable:
return []
ts: Timeslot
for ts in self.timetable:
if ts.get_start() > now:
in_window = self.is_timeslot_in_window(ts)
if not window_aware or in_window:
if (len(next_timeslots) < max_count) or max_count == 0:
next_timeslots.append(ts)
else:
break
else:
start = SU.fmt_time(ts.get_start())
end = SU.fmt_time(ts.get_end())
t1: int = self.config.scheduler.scheduling_window_start
t2: int = self.config.scheduler.scheduling_window_end
msg = f"Skip timeslot #{ts.get_id()} [{start} - {end}] "
msg += f"- not in scheduling window T¹-{t1}s to T²-{t2}s"
self.logger.debug(msg)
return next_timeslots
def is_timeslot_in_window(self, timeslot: Timeslot) -> bool:
"""
Check if the timeslot is within the scheduling window.
The scheduling window represents the soonest and latest
point some timeslot can be scheduled.
Args:
timeslot (Timeslot): The timeslot to check.
Returns
(bool): True if it is within the window.
"""
now = engine.Engine.engine_time()
window_start = self.config.scheduler.scheduling_window_start
window_end = self.config.scheduler.scheduling_window_end
if timeslot.get_start() - window_start < now and timeslot.get_end() - window_end > now:
return True
return False
def terminate(self):
"""
Call this when the thread is stopped or a signal to terminate is received.
"""
self.logger.info(SU.yellow("[TimetableService] Shutting down..."))
if self.api_fetcher:
self.api_fetcher.terminate()
class TimetableMerger:
"""
Compare and merge the local with the remote set of timeslots.
"""
config: confuse.Configuration
logger: logging.Logger
def __init__(self):
"""
Initialize.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
def build_map(
self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None
) -> dict[str, dict]:
"""
Build a map of local and remote timeslots relations.
The start time of any timeslot acts as the key. The map value holds a dictionary, where
the `local` key is the local timeslot, and the `remote` key holds the remote timeslot.
Args:
local_timeslots ([Timeslot]): The current timetable.
remote_timeslots ([Timeslot]): The latest timeslots from the API.
Returns:
({str: {}}): Map with timestamp as key and a dictionary referencing timeslots.
"""
timeslot_map: dict[str, dict] = {}
if local_timeslots:
for ts in local_timeslots:
if not timeslot_map.get(str(ts.get_start())):
timeslot_map[str(ts.get_start())] = {}
idx = timeslot_map[str(ts.get_start())]
# Existing virtual timeslots are ignored, because they are likely updated remotely.
# if not ts.is_virtual():
idx["local"] = ts
idx["remote"] = None
if remote_timeslots:
for ts in remote_timeslots:
if not timeslot_map.get(str(ts.get_start())):
timeslot_map[str(ts.get_start())] = {}
idx = timeslot_map[str(ts.get_start())]
idx["remote"] = ts
if not idx.get("local"):
idx["local"] = None
return timeslot_map
@synchronized
def merge(
self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None
) -> list[Timeslot]:
"""
Merge strategy for local and remote timeslots.
Args:
local_timeslots (Timeslot]): Local timeslots.
remote_timeslots (Timeslot]): Remote timeslots.
Returns:
[Timeslot]: The merged timeslots.
"""
merged_ts = []
timeslot_map = self.build_map(local_timeslots, remote_timeslots)
if not timeslot_map:
return merged_ts
scheduling_window_start = self.config.scheduler.scheduling_window_start
now: float = SU.timestamp()
resolution: str = ""
merge_info = SU.cyan("\nMap for timetable merge:")
for timestamp, ts_relation in timeslot_map.items():
local: Timeslot = ts_relation.get("local")
remote: Timeslot = ts_relation.get("remote")
if (float(timestamp) - scheduling_window_start) < now:
# It's past the scheduling window start, so keep the local one as is
if local:
# Only keep timeslots which have not yet ended.
if local.end > now:
merged_ts.append(local)
resolution = "add | currently playing"
else:
resolution = "skip | out of scheduling window"
else:
# No local timeslot, so add it in any case.
resolution = "add"
merged_ts.append(remote)
else:
if local and not remote:
# Timeslot was deleted remotely, remove any local one
resolution = "remove"
elif not local and remote:
# Timeslot was added remotely
resolution = "add"
merged_ts.append(remote)
elif local and remote:
# Timeslot existing locally, was updated or did not change remotely
# Update the local timeslot with possibly changed data
local.update(remote)
merged_ts.append(local)
resolution = "update"
else:
# Relations w/o local and remote timeslots should not happen
self.logger.critical(SU.red("Skipping invalid merge case!"))
resolution = "skip | invalid merge case"
local_str = "local:" + str(local).ljust(25)
remote_str = "remote: " + str(remote).ljust(25)
merge_info += SU.cyan(
f"\n\tTIME:{timestamp} - {local_str} | {remote_str} ↦ ({resolution})"
)
self.logger.debug(merge_info + "\n")
return merged_ts