From 298ebf3c1a82203f0cbb72aa1a8bf994d2ce5265 Mon Sep 17 00:00:00 2001 From: David Trattnig <david.trattnig@o94.at> Date: Wed, 10 Jun 2020 15:10:26 +0200 Subject: [PATCH] Pro-active fallback handling. --- modules/scheduling/fallback_manager.py | 137 ++++++-- modules/scheduling/scheduler.py | 429 ++++++++++++++++--------- 2 files changed, 386 insertions(+), 180 deletions(-) diff --git a/modules/scheduling/fallback_manager.py b/modules/scheduling/fallback_manager.py index 05e110ba..83a301fc 100644 --- a/modules/scheduling/fallback_manager.py +++ b/modules/scheduling/fallback_manager.py @@ -21,15 +21,16 @@ import os, os.path +import ntpath import logging import random import librosa from accessify import private, protected -from modules.base.enum import FallbackType -from modules.base.utils import SimpleUtil +from modules.base.enum import PlaylistType +from modules.base.utils import SimpleUtil, EngineUtil from modules.communication.mail import AuraMailer - +from modules.base.enum import ChannelType class FallbackManager: @@ -73,39 +74,83 @@ class FallbackManager: # PUBLIC METHODS # - - def get_fallback(self, schedule, type): + def resolve_playlist(self, schedule): """ - Checks if the given schedule is valid and returns a valid fallback - if required. + Resolves the (fallback) playlist for the given schedule in case of pro-active fallback scenarios. + + A resolved playlist represents the state how it would currently be aired. For example the `FallbackManager` + evaluated, that the actually planned playlist cannot be played for various reasons (e.g. entries n/a). + Instead one of the fallback playlists should be played. If the method is called some time later, + it actually planned playlist might be valid, thus returned as the resolved playlist. + + As long the adressed schedule is still within the scheduling window, the resolved playlist can + always change. + + This method also updates `schedule.fallback_state` to the current fallback type (`PlaylistType`). + + Args: + schedule (Schedule): The schedule to resolve the playlist for + + + Returns: + (Playlist): The resolved playlist """ + playlist = None type = None - playlist_id = schedule.playlist_id + self.logger.info("Resolving playlist for schedule #%s ..." % schedule.schedule_id) - if not schedule.playlist_id: - if not schedule.show_fallback_id: - if not schedule.schedule_fallback_id: - if not schedule.station_fallback_id: - raise Exception + if not self.validate_playlist(schedule, "playlist"): + if not self.validate_playlist(schedule, "show_fallback"): + if not self.validate_playlist(schedule, "schedule_fallback"): + if not self.validate_playlist(schedule, "station_fallback"): + raise Exception("No (fallback) playlists for schedule #%s available - not even a single one!" % schedule.schedule_id) else: - type = FallbackType.STATION - playlist_id = schedule.station_fallback_id + type = PlaylistType.STATION + playlist = schedule.station_fallback else: - type = FallbackType.TIMESLOT - playlist_id = schedule.schedule_fallback_id + type = PlaylistType.TIMESLOT + playlist = schedule.schedule_fallback else: - type = FallbackType.SHOW - playlist_id = schedule.show_fallback_id + type = PlaylistType.SHOW + playlist = schedule.show_fallback + else: + type = PlaylistType.DEFAULT + playlist = schedule.playlist - if type: - self.logger.warn("Detected fallback type '%s' required for schedule %s" % (type, str(schedule))) + if type and type != PlaylistType.DEFAULT: + previous_type = schedule.fallback_state + if type == previous_type: + self.logger.info("Fallback state for schedule #%s is still '%s'" % (schedule.schedule_id, type)) + else: + self.logger.warn("Detected fallback type switch from '%s' to '%s' is required for schedule %s." % (previous_type, type, str(schedule))) + + schedule.fallback_state = type + return playlist[0] - return (type, playlist_id) + def handle_proactive_fallback(self, scheduler, playlist): + """ + This is the 1st level strategy for fallback handling. When playlist entries are pre-rolled their + state is validated. If any of them doesn't become "ready to play" in time, some fallback entries + are queued. + """ + resolved_playlist = self.resolve_playlist(playlist.schedule) + if playlist != resolved_playlist: + self.logger.info("Switching from playlist #%s to fallback playlist #%s ..." % (playlist.playlist_id, resolved_playlist.playlist_id)) + + # Destroy any existing queue timers + for entry in playlist.entries: + scheduler.stop_timer(entry.switchtimer) + self.logger.info("Stopped existing timers for entries") + + # Queue the fallback playlist + scheduler.queue_playlist_entries(resolved_playlist.schedule, resolved_playlist.entries, False, True) + self.logger.info("Queued fallback playlist entries (Fallback type: %s)" % playlist.type) + else: + self.logger.critical(SimpleUtil.red("For some strange reason the fallback playlist equals the currently failed one?!")) - def validate_playlist(self, playlist_id): - pass + def get_fallback_for(self, fallbackname): @@ -186,11 +231,45 @@ class FallbackManager: duration = librosa.get_duration(y=y, sr=sr) return duration + + # # PRIVATE METHODS # + + def validate_playlist(self, schedule, playlist_type): + """ + Checks if a playlist is valid for play-out. + """ + playlist = getattr(schedule, playlist_type) + if playlist \ + and isinstance(playlist, list) \ + and playlist[0].entries \ + and len(playlist[0].entries) > 0: + + return self.validate_entries(playlist[0].entries) + return False + + + + def validate_entries(self, entries): + """ + Checks if playlist entries are valid for play-out. + """ + for entry in entries: + if entry.get_type() == ChannelType.FILESYSTEM: + audio_store = self.config.get("audiofolder") + filepath = EngineUtil.uri_to_filepath(audio_store, entry.source) + + if not self.is_audio_file(filepath): + self.logger.warn("Invalid filesystem path '%s' in entry '%s'" % (filepath, str(entry))) + return False + return True + + + def get_playlist_items(self, schedule, fallback_key): """ Retrieves the list of tracks from a playlist defined by `fallback_key`. @@ -218,7 +297,7 @@ class FallbackManager: """ dir = self.config.fallback_music_folder files = os.listdir(dir) - audio_files = list(filter(lambda f: self.is_audio_file(dir, f), files)) + audio_files = list(filter(lambda f: self.is_audio_file(os.path.join(dir, f)), files)) if not dir or not audio_files: self.logger.error("Folder 'fallback_music_folder = %s' is empty!" % dir) @@ -253,22 +332,22 @@ class FallbackManager: - def is_audio_file(self, dir, file): + def is_audio_file(self, file): """ Checks if the passed file is an audio file i.e. has a file-extension known for audio files. Args: - (File): file: the file object. + dir (String): + file (File): the file object. Returns: (Boolean): True, if it's an audio file. """ audio_extensions = [".wav", ".flac", ".mp3", ".ogg", ".m4a"] ext = os.path.splitext(file)[1] - abs_path = os.path.join(dir, file) - if os.path.isfile(abs_path): + if os.path.isfile(file): if any(ext in s for s in audio_extensions): return True return False \ No newline at end of file diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index 720d2cc4..9294ff03 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -21,7 +21,6 @@ import time import json -import datetime import decimal import traceback import sqlalchemy @@ -29,6 +28,7 @@ import logging import threading from operator import attrgetter +from datetime import datetime, timedelta from modules.database.model import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData, SingleEntry, SingleEntryMetaData, TrackService @@ -40,6 +40,7 @@ from modules.scheduling.calendar import AuraCalendarService from modules.scheduling.fallback_manager import FallbackManager +# FIXME this is probably not needed? def alchemyencoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, datetime.date): @@ -54,7 +55,7 @@ def alchemyencoder(obj): return str(obj) -# ------------------------------------------------------------------------------------------ # + class AuraScheduler(threading.Thread): """ Aura Scheduler Class @@ -127,11 +128,13 @@ class AuraScheduler(threading.Thread): def run(self): """ - Called when thread is started via `start()`. It does following: + Called when thread is started via `start()`. It does the following: 1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration. 2. Loads the latest programme from the database and sets the instance state `self.programme` with current schedules. - 3. Queues all playlists of the programm, if the soundssystem is ready to accept commands. + 3. Queues all schedules of the programme, if the soundssystem is ready to accept commands. + 4. As long the scheduling window is not reached any existing, queued item will be re-evaluationed if it has changed or if some + playlist is valid. If not the relevant fallback playlist will be queued (compare "pro-active fallback handling"). On every cycle the configuration file is reloaded, to allow modifications while running the engine. """ @@ -140,8 +143,8 @@ class AuraScheduler(threading.Thread): self.config.load_config() seconds_to_wait = int(self.config.get("fetching_frequency")) self.logger.info(SimpleUtil.cyan("== start fetching new schedules ==")) - next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) - self.logger.info("Fetch new programmes every %ss. Next fetching in %ss." % (str(seconds_to_wait), str(next_time))) + next_time = str(datetime.now()) + self.logger.info("Fetching new schedules every %ss. Next fetching at %ss." % (str(seconds_to_wait), next_time)) self.fetch_new_programme() # The scheduler is ready @@ -171,24 +174,34 @@ class AuraScheduler(threading.Thread): """ Called when the soundsystem is ready. """ - self.queue_programme() + # self.queue_programme() self.logger.info(self.get_ascii_programme()) - self.play_active_entry() + + try: + self.play_active_entry() + self.queue_startup_entries() + except NoActiveScheduleException: + # That's not good, but keep on working... + pass + def play_active_entry(self): """ Plays the entry scheduled for the very current moment and forwards to the scheduled position in time. Usually called when the Engine boots. + + Raises: + (NoActiveScheduleException): If there's no schedule in the programme, within the scheduling window """ sleep_offset = 10 - + active_schedule = self.get_active_schedule() active_entry = self.get_active_entry() if not active_entry: raise NoActiveScheduleException # In case of a file-system source, we need to fast-foward to the current marker as per schedule - if active_entry.type == ChannelType.FILESYSTEM: + if active_entry.get_type() == ChannelType.FILESYSTEM: # Calculate the seconds we have to fast-forward now_unix = self.get_virtual_now() @@ -204,8 +217,8 @@ class AuraScheduler(threading.Thread): self.soundsystem.play(active_entry, TransitionType.FADE) # Check if this is the last item of the schedule - if active_entry.end_unix > active_entry.playlist.schedule.end_unix: - self.queue_end_of_schedule(active_entry, True) + # if active_entry.end_unix > active_entry.playlist.schedule.end_unix: + # self.queue_end_of_schedule(active_schedule, True) # Fast-forward to the scheduled position if seconds_to_seek > 0: @@ -218,19 +231,22 @@ class AuraScheduler(threading.Thread): self.soundsystem.disable_transaction() self.logger.info("LiquidSoap seek response: " + response) - elif active_entry.type == ChannelType.HTTP \ - or active_entry.type == ChannelType.HTTPS \ - or active_entry.type == ChannelType.LIVE: + elif active_entry.get_type() == ChannelType.HTTP \ + or active_entry.get_type() == ChannelType.HTTPS \ + or active_entry.get_type() == ChannelType.LIVE: # Pre-roll and play active entry self.soundsystem.preroll(active_entry) self.soundsystem.play(active_entry, TransitionType.FADE) - self.queue_end_of_schedule(active_entry, True) + # self.queue_end_of_schedule(active_schedule, True) else: self.logger.critical("Unknown Entry Type: %s" % active_entry) - + + # Queue the fade-out of the schedule + if active_schedule and not active_schedule.fadeouttimer: + self.queue_end_of_schedule(active_schedule, True) @@ -247,59 +263,54 @@ class AuraScheduler(threading.Thread): if not self.programme: self.load_programme_from_db() - # Check for scheduled playlist - current_schedule, current_playlist = self.get_active_playlist() - + # Check for current schedule + current_schedule = self.get_active_schedule() if not current_schedule: - self.logger.warning("There's no active schedule") + self.logger.warning(SimpleUtil.red("There's no active schedule")) return None + + # Check for scheduled playlist + current_playlist = self.fallback_manager.resolve_playlist(current_schedule) if not current_playlist: - self.logger.warning("There's no active playlist for a current schedule. Most likely the playlist finished before the end of the schedule.") + msg = "There's no active playlist for a current schedule. Most likely the playlist finished before the end of the schedule." + self.logger.warning(SimpleUtil.red(msg)) return None # Iterate over playlist entries and store the current one current_entry = None for entry in current_playlist.entries: - self.logger.info(entry) if entry.start_unix <= now_unix and now_unix <= entry.end_unix: current_entry = entry break if not current_entry: # Nothing playing ... fallback will kick-in - self.logger.warning("There's no entry scheduled for playlist '%s' at time %s" % (str(current_playlist), SimpleUtil.fmt_time(now_unix))) + msg = "There's no entry scheduled for playlist '%s' at %s" % (str(current_playlist), SimpleUtil.fmt_time(now_unix)) + self.logger.warning(SimpleUtil.red(msg)) return None return current_entry - def get_active_playlist(self): + def get_active_schedule(self): """ - Retrieves the schedule and playlist currently to be played as per - schedule. If the current point in time has no playlist assigned, - only the matching schedule is returned. - + Retrieves the schedule currently to be played. + Returns: - (Schedule, Playlist): The current schedule and playlist tuple. + (Schedule): The current schedule """ current_schedule = None - current_playlist = None now_unix = self.get_virtual_now() - # Iterate over all shows and playlists and find the one to be played right now + # Iterate over all schedules and find the one to be played right now if self.programme: for schedule in self.programme: if schedule.start_unix <= now_unix and now_unix < schedule.end_unix: current_schedule = schedule - for playlist in schedule.playlist: - if playlist.start_unix <= now_unix and now_unix < playlist.end_unix: - current_playlist = playlist - break break - return current_schedule, current_playlist - + return current_schedule # FIXME Review relevance. @@ -337,7 +348,7 @@ class AuraScheduler(threading.Thread): message_queue = "" messages = sorted(self.message_timer, key=attrgetter('diff')) if not messages: - self.logger.warning("There's nothing in the Timer Queue!") + self.logger.warning(SimpleUtil.red("There's nothing in the Timer Queue!")) else: for msg in messages: message_queue += str(msg)+"\n" @@ -353,7 +364,7 @@ class AuraScheduler(threading.Thread): len_before = len(self.message_timer) self.message_timer[:] = [m for m in self.message_timer if m.is_alive()] len_after = len(self.message_timer) - self.logger.info("Removed %s finished timer objects from queue" % (len_before - len_after)) + self.logger.debug("Removed %s finished timer objects from queue" % (len_before - len_after)) @@ -411,23 +422,37 @@ class AuraScheduler(threading.Thread): Creates a printable version of the current programme (playlists and entries as per schedule) Returns: - (String): A ASCII representation of the programme + (String): An ASCII representation of the programme """ - active_schedule, active_playlist = self.get_active_playlist() - playlists = self.get_next_playlists() + active_schedule = self.get_active_schedule() s = "\n\n PLAYING NOW:" s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────" if active_schedule: + planned_playlist = None + if active_schedule.playlist: + planned_playlist = active_schedule.playlist[0] # FIXME Improve model without list + resolved_playlist = self.fallback_manager.resolve_playlist(active_schedule) + type = str(EngineUtil.get_playlist_type(resolved_playlist.fallback_type)) + s += "\n│ Playing schedule %s " % active_schedule - if active_playlist: - s += "\n│ └── Playlist %s " % active_playlist - - # active_entry = active_playlist.current_entry + if planned_playlist: + if resolved_playlist and resolved_playlist.playlist_id != planned_playlist.playlist_id: + s += "\n│ └── Playlist %s " % planned_playlist + s += "\n│ " + s += SimpleUtil.red("↑↑↑ That's the originally planned playlist.") + ("Instead playing the `%s` playlist below ↓↓↓" % SimpleUtil.cyan(type)) + + if resolved_playlist: + if not planned_playlist: + s += "\n│ " + s += SimpleUtil.red("No Playlist assigned to schedule. Instead playing the `%s` playlist below ↓↓↓" % SimpleUtil.cyan(type)) + + s += "\n│ └── Playlist %s " % resolved_playlist + active_entry = self.get_active_entry() # Finished entries - for entry in active_playlist.entries: + for entry in resolved_playlist.entries: if active_entry == entry: break else: @@ -444,7 +469,7 @@ class AuraScheduler(threading.Thread): s += self.build_playlist_string(entries) else: - s += "\n│ └── %s" % (SimpleUtil.red("No playlist active. Did it finish before the end of the schedule?")) + s += "\n│ └── %s" % (SimpleUtil.red("No active playlist. There should be at least some fallback playlist running...")) else: s += "\n│ Nothing. " s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────" @@ -452,23 +477,27 @@ class AuraScheduler(threading.Thread): s += "\n PLAYING NEXT:" s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────" - if not playlists: + next_schedules = self.get_next_schedules() + if not next_schedules: s += "\n│ Nothing. " else: - for next_playlist in playlists: - s += "\n│ Queued schedule %s " % next_playlist.schedule - s += "\n│ └── Playlist %s " % next_playlist - if next_playlist.end_unix > next_playlist.schedule.end_unix: + for schedule in next_schedules: + resolved_playlist = self.fallback_manager.resolve_playlist(schedule) + type = str(EngineUtil.get_playlist_type(resolved_playlist.fallback_type)) + s += "\n│ Queued schedule %s " % schedule + s += "\n│ └── Playlist %s (Type: %s)" % (resolved_playlist, SimpleUtil.cyan(type)) + if resolved_playlist.end_unix > schedule.end_unix: s += "\n│ %s! " % \ - (SimpleUtil.red("↑↑↑ Playlist #%s ends after Schedule #%s!" % (next_playlist.playlist_id, next_playlist.schedule.schedule_id))) + (SimpleUtil.red("↑↑↑ Playlist #%s ends after Schedule #%s!" % (resolved_playlist.playlist_id, schedule.schedule_id))) - entries = self.preprocess_entries(next_playlist.entries, False) + entries = self.preprocess_entries(resolved_playlist.entries, False) s += self.build_playlist_string(entries) s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────\n\n" return s + def build_playlist_string(self, entries): """ Returns a stringified list of entries @@ -487,6 +516,7 @@ class AuraScheduler(threading.Thread): return s + def build_entry_string(self, prefix, entry, strike): """ Returns an stringified entry. @@ -509,6 +539,7 @@ class AuraScheduler(threading.Thread): # PRIVATE METHODS # + def get_virtual_now(self): """ Liquidsoap is slow in executing commands, therefore it's needed to schedule @@ -523,38 +554,55 @@ class AuraScheduler(threading.Thread): - def get_next_playlists(self): + def get_next_schedules(self): """ - Retrieves the playlists to be played after the current one. + Retrieves the schedules to be played after the current one. Returns: - ([Playlist]): The next playlists + ([Schedule]): The next schedules """ now_unix = self.get_virtual_now() - next_playlists = [] + next_schedules = [] for schedule in self.programme: - if schedule.end_unix > now_unix: - for playlist in schedule.playlist: - if playlist.start_unix > now_unix: - next_playlists.append(playlist) + if schedule.start_unix > now_unix: + next_schedules.append(schedule) + + return next_schedules + + - return next_playlists + def filter_scheduling_window(self, schedules): + """ + Ignore schedules which are beyond the scheduling window. The end of the scheduling window + is defined by the config option `scheduling_window_end`. This value defines the seconds + minus the actual start time of the schedule. + """ + now_unix = self.get_virtual_now() + len_before = len(schedules) + window_start = self.config.get("scheduling_window_start") + window_end = self.config.get("scheduling_window_end") + schedules = list(filter(lambda s: (s.start_unix - window_end) > now_unix and (s.start_unix - window_start) < now_unix, schedules)) + len_after = len(schedules) + self.logger.info("For now, skipped %s future schedule(s) which are out of the scheduling window (-%ss <-> -%ss)" % ((len_before - len_after), window_start, window_end)) + return schedules - def filter_scheduling_window(self, playlists): + + def is_schedule_in_window(self, schedule): """ - Ignore playlists which are beyond the scheduling window + Checks if the schedule is within the scheduling window. """ now_unix = self.get_virtual_now() - len_before = len(playlists) - scheduling_window = self.config.get("scheduling_window_offset") - playlists = list(filter(lambda entry: entry.start_unix > now_unix+scheduling_window, playlists)) - len_after = len(playlists) - self.logger.info("Removed %s playlist(s) which are out of the scheduling window (%ss)" % ((len_before - len_after), scheduling_window)) + window_start = self.config.get("scheduling_window_start") + window_end = self.config.get("scheduling_window_end") - return playlists + if schedule.start_unix - window_start < now_unix and \ + schedule.start_unix - window_end > now_unix: + + return True + return False @@ -563,36 +611,62 @@ class AuraScheduler(threading.Thread): Queues the current programme (playlists as per schedule) by creating timed commands to Liquidsoap to enable the individual tracks of playlists. """ - current_schedule, current_playlist = self.get_active_playlist() - playlists = self.get_next_playlists() - playlists = self.filter_scheduling_window(playlists) - if current_schedule and current_playlist: - active_entry = self.get_active_entry() - # Finished entries - for entry in current_playlist.entries: - if active_entry == entry: - break + # Get a clean set of the schedules within the scheduling window + schedules = self.get_next_schedules() + schedules = self.filter_scheduling_window(schedules) + + # Queue the schedules, their playlists and entries + if schedules: + for next_schedule in schedules: + playlist = self.fallback_manager.resolve_playlist(next_schedule) + self.queue_playlist_entries(next_schedule, playlist.entries, False, True) + + # Queue the fade-out of the schedule + if not next_schedule.fadeouttimer: + self.queue_end_of_schedule(next_schedule, True) - # Entry currently being played - if active_entry: - # Open entries for current playlist - rest_of_playlist = active_entry.get_next_entries(True) - self.queue_playlist_entries(rest_of_playlist, False, True) + self.logger.info(SimpleUtil.green("Finished queuing programme.")) - if playlists: - for next_playlist in playlists: - self.queue_playlist_entries(next_playlist.entries, False, True) + + + def queue_startup_entries(self): + """ + Queues all entries after the one currently playing upon startup. Don't use + this method in any other scenario, as it doesn't respect the scheduling window. + """ + current_schedule = self.get_active_schedule() - self.logger.info(SimpleUtil.green("Finished queuing programme!")) + # Queue the (rest of the) currently playing schedule upon startup + if current_schedule: + current_playlist = self.fallback_manager.resolve_playlist(current_schedule) + + if current_playlist: + active_entry = self.get_active_entry() + + # Finished entries + for entry in current_playlist.entries: + if active_entry == entry: + break + + # Entry currently being played + if active_entry: + + # Queue open entries for current playlist + rest_of_playlist = active_entry.get_next_entries(True) + self.queue_playlist_entries(current_schedule, rest_of_playlist, False, True) + # Store them for later reference + current_schedule.queued_entries = rest_of_playlist - def queue_playlist_entries(self, entries, fade_in, fade_out): + + def queue_playlist_entries(self, schedule, entries, fade_in, fade_out): """ Creates Liquidsoap player commands for all playlist items to be executed at the scheduled time. Args: + schedule (Schedule): The schedule this entries belong to entries ([PlaylistEntry]): The playlist entries to be scheduled for playout fade_in (Boolean): Fade-in at the beginning of the set of entries fade_out (Boolean): Fade-out at the end of the set of entries @@ -612,8 +686,8 @@ class AuraScheduler(threading.Thread): for entry in clean_entries: if previous_entry == None or \ (previous_entry != None and \ - previous_entry.type == entry.type and \ - entry.type == ChannelType.FILESYSTEM): + previous_entry.get_type() == entry.get_type() and \ + entry.get_type() == ChannelType.FILESYSTEM): entry_groups[index].append(entry) else: @@ -624,19 +698,20 @@ class AuraScheduler(threading.Thread): self.logger.info("Built %s entry group(s)" % len(entry_groups)) # Schedule function calls + do_queue_schedule_end = False if len(clean_entries) > 0 and len(entry_groups) > 0: for entries in entry_groups: if not isinstance(entries, list): raise ValueError("Invalid Entry Group: %s" % str(entries)) - + + # Create timers for each entry group self.set_entries_timer(entries, fade_in, fade_out) - # Check if it's the last item, which needs special handling - if entries[-1] == clean_entries[-1]: - # The end of schedule is the actual end of the track - self.queue_end_of_schedule(entries[-1], fade_out) + # Store them for later reference + schedule.queued_entries = clean_entries + else: - self.logger.warn(SimpleUtil.red("Nothing to schedule...")) + self.logger.warn(SimpleUtil.red("Nothing to schedule ...")) @@ -660,8 +735,9 @@ class AuraScheduler(threading.Thread): transition_type = TransitionType.FADE if entries[-1].status != EntryPlayState.READY: - self.logger.critical(SimpleUtil.red("PLAY: For some reason the entry/entries is not yet ready or could not be loaded (Entries: %s)" % str(entries))) - # TODO Pro-active fallback handling here + self.logger.critical(SimpleUtil.red("PLAY: For some reason the entry/entries are not yet ready to be played (Entries: %s)" % EngineUtil.get_entries_string(entries))) + # At this point it's too late to do any pro-active fallback handling. Is it? Wait for the silence detector to deal with it. + # TODO Observe the actual handling of this section and think about possible improvements. self.soundsystem.play(entries[0], transition_type) self.logger.info(self.get_ascii_programme()) @@ -669,12 +745,12 @@ class AuraScheduler(threading.Thread): if play_timer: # Check if the Playlist IDs are different - if play_timer.entries[0].playlist.playlist_id == entries[0].playlist.playlist_id: + if self.have_entries_changed(play_timer, entries): # If not, stop and remove the old timer, create a new one self.stop_timer(play_timer) else: - # If the playlists do not differ => reuse the old timer and do nothing - self.logger.info("Playlist Entry %s is already scheduled - no new timer created!" % str(entries)) + # If the playlist entries do not differ => reuse the old timer and do nothing + self.logger.debug("Playlist Entry %s is already scheduled - no new timer created." % EngineUtil.get_entries_string(entries)) return # If nothing is planned at given time, create a new timer @@ -682,6 +758,33 @@ class AuraScheduler(threading.Thread): + def have_entries_changed(self, timer, new_entries): + """ + Checks if the new entries and playlists are matching the existing queued ones, + or if they should be updated. + + Args: + timer (CallFunctionTimer): The timer holding queued entries + new_entries ([PlaylistEntry]): The possibly updated entries + + Returns: + (Boolean): `True` if it has changed + """ + old_entries = timer.entries + + if old_entries[0].playlist and new_entries[0].playlist: + if old_entries[0].playlist.playlist_id != new_entries[0].playlist.playlist_id: + return True + if len(old_entries) != len(new_entries): + return True + + for old_entry, new_entry in zip(old_entries, new_entries): + if old_entry.source != new_entry.source: + return True + + return False + + def preprocess_entries(self, entries, cut_oos): """ Analyses and marks entries which are going to be cut or excluded. @@ -713,36 +816,42 @@ class AuraScheduler(threading.Thread): - def queue_end_of_schedule(self, entry, fade_out): + def queue_end_of_schedule(self, schedule, fade_out): """ - Queues a soundsystem action to stop/fade-out the given schedule + Queues a soundsystem action to stop/fade-out the given schedule. Args: - entry (PlaylistEntry): The last entry of the schedule - fade_out (Boolean): If the entry should be faded-out + schedule (PlaylistEntry): The schedule + fade_out (Boolean): If the schedule should be faded-out """ - schedule_end = entry.playlist.schedule.schedule_end - schedule_end_unix = entry.playlist.schedule.end_unix + schedule_end = schedule.schedule_end + schedule_end_unix = schedule.end_unix now_unix = self.get_virtual_now() fade_out_time = 0 # Stop function to be called when schedule ends - def do_stop(entries): - entry = entries[0] - self.logger.info(SimpleUtil.cyan("=== stop('%s') ===" % entry)) + def do_stop(schedule): + last_entry = schedule.queued_entries[-1] + self.logger.info(SimpleUtil.cyan("=== stop('%s') ===" % str(last_entry.playlist.schedule))) transition_type = TransitionType.INSTANT if fade_out: transition_type = TransitionType.FADE - self.soundsystem.stop(entry, transition_type) + self.soundsystem.stop(last_entry, transition_type) if fade_out == True: fade_out_time = int(round(float(self.config.get("fade_out_time")))) #FIXME Use float + # Stop any existing fade-out timer + if schedule.fadeouttimer: + schedule.fadeouttimer.cancel() + self.message_timer.remove(schedule.fadeouttimer) + + # Create timer to fade-out start_fade_out = schedule_end_unix - now_unix - fade_out_time - entry.fadeouttimer = self.create_timer(start_fade_out, do_stop, [entry], fadeout=True) + # last_entry = schedule.queued_entries[-1] + schedule.fadeouttimer = self.create_timer(start_fade_out, do_stop, schedule, fadeout=True) - self.logger.info("Fading out schedule in %s seconds at %s | Last entry: %s" % \ - (str(start_fade_out), str(schedule_end), entry)) + self.logger.info("Fading out schedule in %s seconds at %s | Schedule: %s" % (str(start_fade_out), str(schedule_end), schedule)) @@ -753,30 +862,33 @@ class AuraScheduler(threading.Thread): """ # Fetch programme from API endpoints - self.logger.info("Trying to fetch new programe from API endpoints...") + self.logger.debug("Trying to fetch new programe from API endpoints...") acs = AuraCalendarService(self.config) queue = acs.get_queue() acs.start() # start fetching thread response = queue.get() # wait for the end - self.logger.info("... Programme fetch via API done!") + self.logger.debug("... Programme fetch via API done!") # Reset last successful fetch state lsf = self.last_successful_fetch self.last_successful_fetch = None if response is None: - self.logger.warning("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.") + msg = SimpleUtil.red("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.") + self.logger.warning(msg) elif type(response) is list: self.programme = response if self.programme is not None and len(self.programme) > 0: - self.last_successful_fetch = datetime.datetime.now() + self.last_successful_fetch = datetime.now() self.logger.info(SimpleUtil.green("Finished fetching current programme from API")) if len(self.programme) == 0: self.logger.critical("Programme fetched from Steering/Tank has no entries!") elif response.startswith("fetching_aborted"): - self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:]) + msg = SimpleUtil.red("Trying to load programme from database only, because fetching was being aborted from AuraCalendarService! Reason: ") + self.logger.warning(msg + response[16:]) else: - self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response) + msg = SimpleUtil.red("Trying to load programme from database only, because of an unknown response from AuraCalendarService: " + response) + self.logger.warning(msg) # Always load latest programme from the database self.last_successful_fetch = lsf @@ -807,28 +919,32 @@ class AuraScheduler(threading.Thread): Checks for existing timers at the given time. """ for t in self.message_timer: - if t.entries[0].start_unix == given_time and (t.fadein or t.switcher): - return t + if t.fadein or t.switcher: + if t.entries[0].start_unix == given_time: + return t return False - def create_timer(self, diff, func, entries, fadein=False, fadeout=False, switcher=False): + def create_timer(self, diff, func, param, fadein=False, fadeout=False, switcher=False): """ Creates a new timer for timed execution of mixer commands. Args: diff (Integer): The difference in seconds from now, when the call should happen func (Function): The function to call - entries ([]): The entries to be scheduled + param ([]): A schedule or list of entries + Returns: + (CallFunctionTimer, CallFunctionTimer): In case of a "switch" command, the switch and pre-roll timer is returned + (CallFunctionTimer): In all other cases only the timer for the command is returned """ if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher: raise ValueError("You have to call me with either fadein=true, fadeout=true or switcher=True") - if not isinstance(entries, list): - raise ValueError("No list of entries passed!") + if not isinstance(param, list) and not isinstance(param, Schedule): + raise ValueError("No list of entries nor schedule passed!") - t = CallFunctionTimer(diff=diff, func=func, param=entries, fadein=fadein, fadeout=fadeout, switcher=switcher) + t = CallFunctionTimer(diff=diff, func=func, param=param, fadein=fadein, fadeout=fadeout, switcher=switcher) self.message_timer.append(t) t.start() @@ -836,18 +952,26 @@ class AuraScheduler(threading.Thread): # Pre-roll function to be called by timer def do_preroll(entries): try: - if entries[0].type == ChannelType.FILESYSTEM: + if entries[0].get_type() == ChannelType.FILESYSTEM: self.logger.info(SimpleUtil.cyan("=== preroll_group('%s') ===" % EngineUtil.get_entries_string(entries))) self.soundsystem.preroll_group(entries) else: self.logger.info(SimpleUtil.cyan("=== preroll('%s') ===" % EngineUtil.get_entries_string(entries))) self.soundsystem.preroll(entries[0]) except LoadSourceException as e: - self.logger("Could not load entries %s:" % EngineUtil.get_entries_string(entries), e) - # TODO Fallback logic here + self.logger.critical(SimpleUtil.red("Could not pre-roll entries %s" % EngineUtil.get_entries_string(entries)), e) + + # Pro-active fallback handling, avoiding the need of the silence detector kicking-in. + self.fallback_manager.handle_proactive_fallback(self, entries[0].playlist) + + if entries[-1].status != EntryPlayState.READY: + self.logger.critical(SimpleUtil.red("Entries didn't reach 'ready' state during pre-rolling (Entries: %s)" % EngineUtil.get_entries_string(entries))) + + # Pro-active fallback handling, avoiding the need of the silence detector kicking-in. + self.fallback_manager.handle_proactive_fallback(self, entries[0].playlist) loader_diff = diff - self.config.get("preroll_offset") - loader = CallFunctionTimer(diff=loader_diff, func=do_preroll, param=entries, fadein=fadein, fadeout=fadeout, switcher=False, loader=True) + loader = CallFunctionTimer(diff=loader_diff, func=do_preroll, param=param, fadein=fadein, fadeout=fadeout, switcher=False, loader=True) self.message_timer.append(loader) loader.start() return (t, loader) @@ -866,20 +990,21 @@ class AuraScheduler(threading.Thread): timer.cancel() count = 1 - if timer.entries[0].loadtimer is not None: - timer.entries[0].loadtimer.cancel() - self.message_timer.remove(timer.entries[0].loadtimer) - count += 1 + for entry in timer.entries: + if entry.loadtimer is not None: + entry.loadtimer.cancel() + self.message_timer.remove(entry.loadtimer) + count += 1 - # if timer.entries[0].fadeintimer is not None: - # timer.entries[0].fadeintimer.cancel() - # self.message_timer.remove(timer.entries[0].fadeintimer) - # count += 1 + # if timer.entries[0].fadeintimer is not None: + # timer.entries[0].fadeintimer.cancel() + # self.message_timer.remove(timer.entries[0].fadeintimer) + # count += 1 - if timer.entries[0].fadeouttimer is not None: - timer.entries[0].fadeouttimer.cancel() - self.message_timer.remove(timer.entries[0].fadeouttimer) - count += 1 + # if entry.fadeouttimer is not None: + # entry.fadeouttimer.cancel() + # self.message_timer.remove(entry.fadeouttimer) + # count += 1 # Remove it from message queue self.message_timer.remove(timer) @@ -952,6 +1077,7 @@ class CallFunctionTimer(threading.Timer): param = None entries = None diff = None + dt = None fadein = False fadeout = False switcher = False @@ -971,9 +1097,11 @@ class CallFunctionTimer(threading.Timer): raise Exception("You have to create me with either fadein=True, fadeout=True or switcher=True") self.diff = diff + self.dt = datetime.now() + timedelta(seconds=diff) + self.func = func self.param = param - self.entries = param + self.entries = param # TODO Refactor since param can hold [entries] or a schedule, depending on the timer type self.fadein = fadein self.fadeout = fadeout self.switcher = switcher @@ -985,13 +1113,12 @@ class CallFunctionTimer(threading.Timer): String represenation of the timer. """ status = "Timer (Alive: %s)" % self.is_alive() - time_start = SimpleUtil.fmt_time(SimpleUtil.timestamp() + self.diff) - status += " starting in " + str(self.diff) + "s (" + time_start + ") " + status += " starting at " + str(self.dt) if self.fadein: return status + " fading in entries '" + EngineUtil.get_entries_string(self.entries) elif self.fadeout: - return status + " fading out schedule '" + str(self.entries[0].playlist.schedule) + return status + " fading out schedule '" + str(self.param) elif self.switcher: return status + " switching to entries '" + EngineUtil.get_entries_string(self.entries) elif self.loader: -- GitLab