diff --git a/modules/scheduling/fallback_manager.py b/modules/scheduling/fallback_manager.py index fc4f73426a8f53ae3cce789ddec26baaf38416c1..535f76d721ecff31be5ebd82aac8b9a9fbebe391 100644 --- a/modules/scheduling/fallback_manager.py +++ b/modules/scheduling/fallback_manager.py @@ -25,6 +25,7 @@ from enum import Enum from threading import Thread, Timer from datetime import datetime, timedelta +from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU from modules.base.mail import AuraMailer from modules.core.resources import ResourceClass @@ -66,80 +67,49 @@ class FallbackManager: logger = None mailer = None scheduler = None - message_timer = None - - def __init__(self, config, logger, scheduler, message_timer): + def __init__(self, scheduler): """ Constructor Args: - config (AuraConfig): Holds the engine configuration + """ - self.config = config - self.logger = logger + self.config = AuraConfig.config() + self.logger = logging.getLogger("AuraEngine") self.mailer = AuraMailer(self.config) self.scheduler = scheduler - # self.message_timer = message_timer - self.message_timer = [] # - # PUBLIC METHODS + # METHODS # - def schedule_fallback_playlist(self, schedule, schedule_now=False): + + def queue_fallback_playlist(self, timeslot): """ Evaluates the scheduled fallback and queues it using a timed thread. - - Args: - schedule_now (Boolean): If `True` it is executed immediately """ - timer_start = None - timer_end = None - (fallback_type, playlist) = self.get_fallback_playlist(schedule) + (fallback_type, playlist) = self.get_fallback_playlist(timeslot) if playlist: - self.logger.info(f"Resolved {fallback_type.value} fallback") - - def do_schedule(entries): - self.logger.info(SU.cyan(f"=== set_fallback_playlist('{entries}') ===")) - self.scheduler.engine.player.start_fallback_playlist(entries) - def do_unschedule(): - self.logger.info(SU.cyan("=== clear_fallback_playlist() ===")) - self.scheduler.engine.player.stop_fallback_playlist() - - if schedule_now == True: - # Update queue immediately - thread = Thread(target = do_schedule, args = (playlist.entries,)) - thread.start() - else: - # Update queue at the beginning of the timeslot - timer_start = FallbackCommandTimer(schedule.start_unix, do_schedule, playlist.entries) - self.message_timer.append(timer_start) - timer_start.start() - - # Update fallback channel to be cleared at the end of the timeslot - timer_end = FallbackCommandTimer(schedule.end_unix, do_unschedule) - self.message_timer.append(timer_end) - timer_end.start() - return (timer_start, timer_end) - + self.logger.info(f"Resolved {fallback_type.value} fallback") + return FallbackCommand(timeslot, playlist.entries) else: - msg = f"There is no schedule- or show-fallback defined for timeslot#{schedule.schedule_id}. " + msg = f"There is no schedule- or show-fallback defined for timeslot#{timeslot.schedule_id}. " msg += f"The station fallback will be used automatically." self.logger.info(msg) - def resolve_playlist(self, schedule): + def resolve_playlist(self, timeslot): """ Retrieves the currently planned (fallback) playlist. If a normal playlist is available, this one is returned. In case of station fallback no playlist is returned. Args: - schedule (Schedule) + timeslot (Schedule) Returns: (FallbackType, Playlist) @@ -147,22 +117,22 @@ class FallbackManager: planned_playlist = None fallback_type = None - if self.validate_playlist(schedule, "playlist"): - planned_playlist = schedule.get_playlist() + if self.validate_playlist(timeslot, "playlist"): + planned_playlist = timeslot.get_playlist() fallback_type = FallbackType.NONE else: - (fallback_type, planned_playlist) = self.get_fallback_playlist(schedule) + (fallback_type, planned_playlist) = self.get_fallback_playlist(timeslot) return (fallback_type, planned_playlist) - def get_fallback_playlist(self, schedule): + def get_fallback_playlist(self, timeslot): """ Retrieves the playlist to be used in a fallback scenario. Args: - schedule (Schedule) + timeslot (Schedule) Returns: (Playlist) @@ -170,23 +140,18 @@ class FallbackManager: playlist = None fallback_type = FallbackType.STATION - if self.validate_playlist(schedule, "schedule_fallback"): - playlist = schedule.schedule_fallback[0] + if self.validate_playlist(timeslot, "schedule_fallback"): + playlist = timeslot.schedule_fallback[0] fallback_type = FallbackType.SCHEDULE - elif self.validate_playlist(schedule, "show_fallback"): - playlist = schedule.show_fallback[0] + elif self.validate_playlist(timeslot, "show_fallback"): + playlist = timeslot.show_fallback[0] fallback_type = FallbackType.SHOW return (fallback_type, playlist) - # - # PRIVATE METHODS - # - - - def validate_playlist(self, schedule, playlist_type): + def validate_playlist(self, timeslot, playlist_type): """ Checks if a playlist is valid for play-out. @@ -202,7 +167,7 @@ class FallbackManager: Otherwise, if a fallback playlist contains Live or Stream entries, the exact playout behaviour can hardly be predicted. """ - playlist = getattr(schedule, playlist_type) + playlist = getattr(timeslot, playlist_type) if playlist \ and isinstance(playlist, list) \ and playlist[0].entries \ @@ -229,12 +194,14 @@ class FallbackManager: -class EngineCommandTimer(Timer): +class EngineExecutor(Timer): """ - Timer for timed executing of Engine commands. + Base class for timed or threaded execution of Engine commands. """ + logger = logging.getLogger("AuraEngine") timer_store = {} - logger = logging.getLogger("AuraEngine") + child_timer = None + direct_exec = None timer_id = None timer_type = None param = None @@ -242,82 +209,155 @@ class EngineCommandTimer(Timer): dt = None - def __init__(self, timer_type="BASE", due_time=None, func=None, param=None): + def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None): """ Constructor - """ + + Args: + timer_type (String): Prefix used for the `timer_id` to make it unique + child_timer (EngineExeuctor): Child action which is bound to this timer + due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way + func (function): The function to be called + param (object): Parameter passt to the function + """ + from modules.core.engine import Engine now_unix = Engine.engine_time() + self.child_timer = child_timer + self.direct_exec = False self.timer_type = timer_type self.timer_id = f"{timer_type}:{func.__name__}:{due_time}" - - diff = due_time - now_unix - if diff < 0.0: - msg = f"Trying to create timer in the past: {self.timer_id}" - self.logger.error(SU.red(msg)) - raise Exception(msg) + + if not due_time: + diff = 0 + else: + diff = due_time - now_unix self.diff = diff self.dt = datetime.now() + timedelta(seconds=diff) self.func = func self.param = param + if diff < 0: + msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..." + self.logger.error(SU.red(msg)) + self.exec_now() + elif diff == 0: + self.logger.info(f"Timer '{self.timer_id}' to be executed immediately") + self.exec_now() + else: + self.exec_timed() + self.start() + + self.update_store() + + + + def exec_now(self): + """ + Immediate execution within a thread. It's not stored in the timer store. + """ + self.direct_exec = True + thread = Thread(target = self.func, args = (self.param,)) + thread.start() + + + + def exec_timed(self): + """ + Timed execution in a thread. + """ def wrapper_func(param=None): - # Remove from cache + # Remove from store self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}")) - del EngineCommandTimer.timer_store[self.timer_id] + del EngineExecutor.timer_store[self.timer_id] + # Call actual function - if param: func(param,) - else: func() + if param: self.func(param,) + else: self.func() - Timer.__init__(self, diff, wrapper_func, (param,)) - self.update_cache() - self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}")) - + Timer.__init__(self, self.diff, wrapper_func, (self.param,)) - - def update_cache(self): + + def update_store(self): """ - Adds the instance to the cache and cancels any previously existing commands. + Adds the instance to the store and cancels any previously existing commands. """ existing_command = None - if self.timer_id in EngineCommandTimer.timer_store: - existing_command = EngineCommandTimer.timer_store[self.timer_id] + if self.timer_id in EngineExecutor.timer_store: + existing_command = EngineExecutor.timer_store[self.timer_id] if existing_command: self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}")) existing_command.cancel() - EngineCommandTimer.timer_store[self.timer_id] = self + if existing_command.child_timer: + self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}")) + EngineExecutor.timer_store[self.timer_id] = self + self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}")) - def print_active_timers(self): + def is_alive(self): """ - Prints a list of active timers to the log. + Returns true if the command is still due to be executed. """ - for id, timer in EngineCommandTimer.timer_store.values(): - EngineCommandTimer.logger.info(str(timer)) - + if self.direct_exec == True: + return False + return super().is_alive() def __str__(self): """ String represenation of the timer. """ - return f"[{self.timer_id}] COMMAND TIMER due at {str(self.dt)} (alive: {self.is_alive()})" + return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})" + @staticmethod + def log_commands(): + """ + Prints a list of active timers to the log. + """ + timers = EngineExecutor.timer_store.values() + msg = "\n [ ENGINE COMMAND QUEUE ]\n" + + if not timers: + msg += "None available!\n" + else: + for timer in timers: + msg += f" => {str(timer)}\n" + if timer.child_timer: + msg += f" => {str(timer.child_timer)}\n" + + EngineExecutor.logger.info(msg + "\n") + -class FallbackCommandTimer(EngineCommandTimer): +class FallbackCommand(EngineExecutor): """ - Timer for executing timed scheduling of fallback playlists. + Command composition for executing timed scheduling and unscheduling of fallback playlists. + + Based on the `timeslot.start_date` and `timeslot.end_date` two `EngineExecutor commands + are created. """ - def __init__(self, diff=None, func=None, param=None): + + def __init__(self, timeslot, entries): """ Constructor + + Args: + timeslot (Timeslot): The timeslot any fallback entries should be scheduled for + entries (List): List of entries to be scheduled as fallback """ - super().__init__("FALLBACK", diff, func, param) - self.logger.info("Executing scheduled fallback playlist update '%s' in %s seconds..." % \ - (str(func.__name__), str(diff))) + from modules.core.engine import Engine + + def do_play(entries): + self.logger.info(SU.cyan(f"=== start_fallback_playlist('{entries}') ===")) + Engine.get_instance().player.start_fallback_playlist(entries) + + def do_stop(): + self.logger.info(SU.cyan("=== stop_fallback_playlist() ===")) + Engine.get_instance().player.stop_fallback_playlist() + child_timer = EngineExecutor("FALLBACK", None, timeslot.end_unix, do_stop, None) + super().__init__("FALLBACK", child_timer, timeslot.start_unix, do_play, entries) \ No newline at end of file diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index 34e8a420def83db70e990e29e669e337c2df4c92..4f73c25771df948d969ef1b19d446de669205801 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -36,7 +36,7 @@ from modules.core.channels import ChannelType, TransitionType, from modules.core.resources import ResourceClass, ResourceUtil from modules.scheduling.calendar import AuraCalendarService from modules.scheduling.fallback_manager import FallbackManager - +from modules.scheduling.fallback_manager import EngineExecutor @@ -98,7 +98,7 @@ class AuraScheduler(threading.Thread): self.logger = logging.getLogger("AuraEngine") self.init_database() - self.fallback_manager = FallbackManager(config, self.logger, self, None) + self.fallback_manager = FallbackManager(self) self.redismessenger = RedisMessenger(config) self.engine = engine self.engine.scheduler = self @@ -138,9 +138,7 @@ class AuraScheduler(threading.Thread): try: self.config.load_config() seconds_to_wait = int(self.config.get("fetching_frequency")) - self.logger.info(SU.cyan("== start fetching new schedules ==")) - next_time = str(datetime.now()) - self.logger.info("Fetching new schedules every %ss. Next fetching at %ss." % (str(seconds_to_wait), next_time)) + self.logger.info(SU.cyan(f"== start fetching new schedules (every {seconds_to_wait} seconds) ==")) self.fetch_new_programme() # The scheduler is ready @@ -154,10 +152,18 @@ class AuraScheduler(threading.Thread): self.queue_programme() except Exception as e: - self.logger.critical(SU.red("Unhandled error while fetching & scheduling new programme! (%s)" % str(e)), e) + self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"), e) + # Keep on working anyway self.clean_timer_queue() self.print_timer_queue() + + # FIXME better location for call + if self.engine.event_dispatcher: + current_timeslot = self.get_active_schedule() + self.engine.event_dispatcher.on_timeslot(current_timeslot) + + EngineExecutor.log_commands() self.exit_event.wait(seconds_to_wait) @@ -202,7 +208,7 @@ class AuraScheduler(threading.Thread): # Schedule any available fallback playlist if active_schedule: - self.fallback_manager.schedule_fallback_playlist(active_schedule, True) + self.fallback_manager.queue_fallback_playlist(active_schedule) # Queue the fade-out of the schedule if not active_schedule.fadeouttimer: self.queue_end_of_schedule(active_schedule, True) @@ -312,8 +318,7 @@ class AuraScheduler(threading.Thread): if schedule.start_unix <= now_unix and now_unix < schedule.end_unix: current_schedule = schedule break - - self.engine.event_dispatcher.on_timeslot(current_schedule) + return current_schedule @@ -603,7 +608,7 @@ class AuraScheduler(threading.Thread): if schedules: for next_schedule in schedules: # Schedule any available fallback playlist - self.fallback_manager.schedule_fallback_playlist(next_schedule, False) + self.fallback_manager.queue_fallback_playlist(next_schedule) if next_schedule.playlist: self.queue_playlist_entries(next_schedule, next_schedule.get_playlist().entries, False, True)