Commit fe585e9f authored by David Trattnig's avatar David Trattnig
Browse files

Dependency between related cmds. #44

parent 7115e3b6
......@@ -25,6 +25,7 @@ from enum import Enum
from threading import Thread, Timer
from datetime import datetime, timedelta
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer
from modules.core.resources import ResourceClass
......@@ -66,80 +67,49 @@ class FallbackManager:
logger = None
mailer = None
scheduler = None
message_timer = None
def __init__(self, config, logger, scheduler, message_timer):
def __init__(self, scheduler):
"""
Constructor
Args:
config (AuraConfig): Holds the engine configuration
"""
self.config = config
self.logger = logger
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.mailer = AuraMailer(self.config)
self.scheduler = scheduler
# self.message_timer = message_timer
self.message_timer = []
#
# PUBLIC METHODS
# METHODS
#
def schedule_fallback_playlist(self, schedule, schedule_now=False):
def queue_fallback_playlist(self, timeslot):
"""
Evaluates the scheduled fallback and queues it using a timed thread.
Args:
schedule_now (Boolean): If `True` it is executed immediately
"""
timer_start = None
timer_end = None
(fallback_type, playlist) = self.get_fallback_playlist(schedule)
(fallback_type, playlist) = self.get_fallback_playlist(timeslot)
if playlist:
self.logger.info(f"Resolved {fallback_type.value} fallback")
def do_schedule(entries):
self.logger.info(SU.cyan(f"=== set_fallback_playlist('{entries}') ==="))
self.scheduler.engine.player.start_fallback_playlist(entries)
def do_unschedule():
self.logger.info(SU.cyan("=== clear_fallback_playlist() ==="))
self.scheduler.engine.player.stop_fallback_playlist()
if schedule_now == True:
# Update queue immediately
thread = Thread(target = do_schedule, args = (playlist.entries,))
thread.start()
else:
# Update queue at the beginning of the timeslot
timer_start = FallbackCommandTimer(schedule.start_unix, do_schedule, playlist.entries)
self.message_timer.append(timer_start)
timer_start.start()
# Update fallback channel to be cleared at the end of the timeslot
timer_end = FallbackCommandTimer(schedule.end_unix, do_unschedule)
self.message_timer.append(timer_end)
timer_end.start()
return (timer_start, timer_end)
self.logger.info(f"Resolved {fallback_type.value} fallback")
return FallbackCommand(timeslot, playlist.entries)
else:
msg = f"There is no schedule- or show-fallback defined for timeslot#{schedule.schedule_id}. "
msg = f"There is no schedule- or show-fallback defined for timeslot#{timeslot.schedule_id}. "
msg += f"The station fallback will be used automatically."
self.logger.info(msg)
def resolve_playlist(self, schedule):
def resolve_playlist(self, timeslot):
"""
Retrieves the currently planned (fallback) playlist. If a normal playlist is available,
this one is returned. In case of station fallback no playlist is returned.
Args:
schedule (Schedule)
timeslot (Schedule)
Returns:
(FallbackType, Playlist)
......@@ -147,22 +117,22 @@ class FallbackManager:
planned_playlist = None
fallback_type = None
if self.validate_playlist(schedule, "playlist"):
planned_playlist = schedule.get_playlist()
if self.validate_playlist(timeslot, "playlist"):
planned_playlist = timeslot.get_playlist()
fallback_type = FallbackType.NONE
else:
(fallback_type, planned_playlist) = self.get_fallback_playlist(schedule)
(fallback_type, planned_playlist) = self.get_fallback_playlist(timeslot)
return (fallback_type, planned_playlist)
def get_fallback_playlist(self, schedule):
def get_fallback_playlist(self, timeslot):
"""
Retrieves the playlist to be used in a fallback scenario.
Args:
schedule (Schedule)
timeslot (Schedule)
Returns:
(Playlist)
......@@ -170,23 +140,18 @@ class FallbackManager:
playlist = None
fallback_type = FallbackType.STATION
if self.validate_playlist(schedule, "schedule_fallback"):
playlist = schedule.schedule_fallback[0]
if self.validate_playlist(timeslot, "schedule_fallback"):
playlist = timeslot.schedule_fallback[0]
fallback_type = FallbackType.SCHEDULE
elif self.validate_playlist(schedule, "show_fallback"):
playlist = schedule.show_fallback[0]
elif self.validate_playlist(timeslot, "show_fallback"):
playlist = timeslot.show_fallback[0]
fallback_type = FallbackType.SHOW
return (fallback_type, playlist)
#
# PRIVATE METHODS
#
def validate_playlist(self, schedule, playlist_type):
def validate_playlist(self, timeslot, playlist_type):
"""
Checks if a playlist is valid for play-out.
......@@ -202,7 +167,7 @@ class FallbackManager:
Otherwise, if a fallback playlist contains Live or Stream entries,
the exact playout behaviour can hardly be predicted.
"""
playlist = getattr(schedule, playlist_type)
playlist = getattr(timeslot, playlist_type)
if playlist \
and isinstance(playlist, list) \
and playlist[0].entries \
......@@ -229,12 +194,14 @@ class FallbackManager:
class EngineCommandTimer(Timer):
class EngineExecutor(Timer):
"""
Timer for timed executing of Engine commands.
Base class for timed or threaded execution of Engine commands.
"""
logger = logging.getLogger("AuraEngine")
timer_store = {}
logger = logging.getLogger("AuraEngine")
child_timer = None
direct_exec = None
timer_id = None
timer_type = None
param = None
......@@ -242,82 +209,155 @@ class EngineCommandTimer(Timer):
dt = None
def __init__(self, timer_type="BASE", due_time=None, func=None, param=None):
def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None):
"""
Constructor
"""
Args:
timer_type (String): Prefix used for the `timer_id` to make it unique
child_timer (EngineExeuctor): Child action which is bound to this timer
due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passt to the function
"""
from modules.core.engine import Engine
now_unix = Engine.engine_time()
self.child_timer = child_timer
self.direct_exec = False
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
diff = due_time - now_unix
if diff < 0.0:
msg = f"Trying to create timer in the past: {self.timer_id}"
self.logger.error(SU.red(msg))
raise Exception(msg)
if not due_time:
diff = 0
else:
diff = due_time - now_unix
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
self.func = func
self.param = param
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg))
self.exec_now()
elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
self.exec_timed()
self.start()
self.update_store()
def exec_now(self):
"""
Immediate execution within a thread. It's not stored in the timer store.
"""
self.direct_exec = True
thread = Thread(target = self.func, args = (self.param,))
thread.start()
def exec_timed(self):
"""
Timed execution in a thread.
"""
def wrapper_func(param=None):
# Remove from cache
# Remove from store
self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
del EngineCommandTimer.timer_store[self.timer_id]
del EngineExecutor.timer_store[self.timer_id]
# Call actual function
if param: func(param,)
else: func()
if param: self.func(param,)
else: self.func()
Timer.__init__(self, diff, wrapper_func, (param,))
self.update_cache()
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
Timer.__init__(self, self.diff, wrapper_func, (self.param,))
def update_cache(self):
def update_store(self):
"""
Adds the instance to the cache and cancels any previously existing commands.
Adds the instance to the store and cancels any previously existing commands.
"""
existing_command = None
if self.timer_id in EngineCommandTimer.timer_store:
existing_command = EngineCommandTimer.timer_store[self.timer_id]
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
existing_command.cancel()
EngineCommandTimer.timer_store[self.timer_id] = self
if existing_command.child_timer:
self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))
EngineExecutor.timer_store[self.timer_id] = self
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
def print_active_timers(self):
def is_alive(self):
"""
Prints a list of active timers to the log.
Returns true if the command is still due to be executed.
"""
for id, timer in EngineCommandTimer.timer_store.values():
EngineCommandTimer.logger.info(str(timer))
if self.direct_exec == True:
return False
return super().is_alive()
def __str__(self):
"""
String represenation of the timer.
"""
return f"[{self.timer_id}] COMMAND TIMER due at {str(self.dt)} (alive: {self.is_alive()})"
return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"
@staticmethod
def log_commands():
"""
Prints a list of active timers to the log.
"""
timers = EngineExecutor.timer_store.values()
msg = "\n [ ENGINE COMMAND QUEUE ]\n"
if not timers:
msg += "None available!\n"
else:
for timer in timers:
msg += f" => {str(timer)}\n"
if timer.child_timer:
msg += f" => {str(timer.child_timer)}\n"
EngineExecutor.logger.info(msg + "\n")
class FallbackCommandTimer(EngineCommandTimer):
class FallbackCommand(EngineExecutor):
"""
Timer for executing timed scheduling of fallback playlists.
Command composition for executing timed scheduling and unscheduling of fallback playlists.
Based on the `timeslot.start_date` and `timeslot.end_date` two `EngineExecutor commands
are created.
"""
def __init__(self, diff=None, func=None, param=None):
def __init__(self, timeslot, entries):
"""
Constructor
Args:
timeslot (Timeslot): The timeslot any fallback entries should be scheduled for
entries (List): List of entries to be scheduled as fallback
"""
super().__init__("FALLBACK", diff, func, param)
self.logger.info("Executing scheduled fallback playlist update '%s' in %s seconds..." % \
(str(func.__name__), str(diff)))
from modules.core.engine import Engine
def do_play(entries):
self.logger.info(SU.cyan(f"=== start_fallback_playlist('{entries}') ==="))
Engine.get_instance().player.start_fallback_playlist(entries)
def do_stop():
self.logger.info(SU.cyan("=== stop_fallback_playlist() ==="))
Engine.get_instance().player.stop_fallback_playlist()
child_timer = EngineExecutor("FALLBACK", None, timeslot.end_unix, do_stop, None)
super().__init__("FALLBACK", child_timer, timeslot.start_unix, do_play, entries)
\ No newline at end of file
......@@ -36,7 +36,7 @@ from modules.core.channels import ChannelType, TransitionType,
from modules.core.resources import ResourceClass, ResourceUtil
from modules.scheduling.calendar import AuraCalendarService
from modules.scheduling.fallback_manager import FallbackManager
from modules.scheduling.fallback_manager import EngineExecutor
......@@ -98,7 +98,7 @@ class AuraScheduler(threading.Thread):
self.logger = logging.getLogger("AuraEngine")
self.init_database()
self.fallback_manager = FallbackManager(config, self.logger, self, None)
self.fallback_manager = FallbackManager(self)
self.redismessenger = RedisMessenger(config)
self.engine = engine
self.engine.scheduler = self
......@@ -138,9 +138,7 @@ class AuraScheduler(threading.Thread):
try:
self.config.load_config()
seconds_to_wait = int(self.config.get("fetching_frequency"))
self.logger.info(SU.cyan("== start fetching new schedules =="))
next_time = str(datetime.now())
self.logger.info("Fetching new schedules every %ss. Next fetching at %ss." % (str(seconds_to_wait), next_time))
self.logger.info(SU.cyan(f"== start fetching new schedules (every {seconds_to_wait} seconds) =="))
self.fetch_new_programme()
# The scheduler is ready
......@@ -154,10 +152,18 @@ class AuraScheduler(threading.Thread):
self.queue_programme()
except Exception as e:
self.logger.critical(SU.red("Unhandled error while fetching & scheduling new programme! (%s)" % str(e)), e)
self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"), e)
# Keep on working anyway
self.clean_timer_queue()
self.print_timer_queue()
# FIXME better location for call
if self.engine.event_dispatcher:
current_timeslot = self.get_active_schedule()
self.engine.event_dispatcher.on_timeslot(current_timeslot)
EngineExecutor.log_commands()
self.exit_event.wait(seconds_to_wait)
......@@ -202,7 +208,7 @@ class AuraScheduler(threading.Thread):
# Schedule any available fallback playlist
if active_schedule:
self.fallback_manager.schedule_fallback_playlist(active_schedule, True)
self.fallback_manager.queue_fallback_playlist(active_schedule)
# Queue the fade-out of the schedule
if not active_schedule.fadeouttimer:
self.queue_end_of_schedule(active_schedule, True)
......@@ -312,8 +318,7 @@ class AuraScheduler(threading.Thread):
if schedule.start_unix <= now_unix and now_unix < schedule.end_unix:
current_schedule = schedule
break
self.engine.event_dispatcher.on_timeslot(current_schedule)
return current_schedule
......@@ -603,7 +608,7 @@ class AuraScheduler(threading.Thread):
if schedules:
for next_schedule in schedules:
# Schedule any available fallback playlist
self.fallback_manager.schedule_fallback_playlist(next_schedule, False)
self.fallback_manager.queue_fallback_playlist(next_schedule)
if next_schedule.playlist:
self.queue_playlist_entries(next_schedule, next_schedule.get_playlist().entries, False, True)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment