From 654ad05185cc5a2e5b0184e0da918506e7dc6dc0 Mon Sep 17 00:00:00 2001 From: David Trattnig <david.trattnig@o94.at> Date: Thu, 21 Nov 2019 02:35:44 +0100 Subject: [PATCH] Fixed scheduling logic to allow first playout. --- modules/scheduling/scheduler.py | 437 +++++++++++++++++++------------- 1 file changed, 266 insertions(+), 171 deletions(-) diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index 738470b1..8f741fcf 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -1,5 +1,5 @@ # -# engine +# Aura Engine # # Playout Daemon for autoradio project # @@ -28,10 +28,7 @@ __license__ = "GNU General Public License (GPL) Version 3" __version_info__ = (0, 0, 1) __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>' -""" -Aura Scheduler -Is holding the eventqueue -""" + import time import json import datetime @@ -69,38 +66,51 @@ def alchemyencoder(obj): class AuraScheduler(ExceptionLogger, threading.Thread): """ Aura Scheduler Class - Gets data from pv and importer, stores and fires events + + - Gets data from Steering and Tanks + - Stores and fires events for LiquidSoap + + Attributes: + config (AuraConfig): Holds the Engine Configuration + logger: The logger + exit_event(threading.Event): Used to exit the thread if requested + liquidsoapcommunicator: Stores the connection to LiquidSoap + last_successful_fetch (datetime): Stores the last time a fetch from Steering/Tank was successful + + programme: The current radio programme to be played as defined in the local engine database + active_entry(Show, Track): This is a Tuple consisting of the currently played `Show` and `Track` + message_timer(Array<threading.Timer>): The message queue of tracks to be played """ redismessenger = None - message_timer = [] job_result = {} - # stores the conn to liquidsoap + config = None + logger = None + exit_event = None liquidsoapcommunicator = None - # stores the last time when a fetch from pv/tank gone right last_successful_fetch = None - schedule_entries = None - active_entry = None - exit_event = None programme = None + active_entry = None + message_timer = [] + + #schedule_entries = None client = None - logger = None - config = None + def __init__(self, config): """ Constructor - @type config: ConfigReader - @param config: read engine.ini + + Args: + config (AuraConfig): Reads the engine configuration """ self.config = config - - # init database ? + self.logger = logging.getLogger("AuraEngine") + self.init_error_messages() self.init_database() self.redismessenger = RedisMessenger(config) - self.logger = logging.getLogger("AuraEngine") # init threading threading.Thread.__init__(self) @@ -108,88 +118,226 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # init messenger.. probably not needed anymore self.redismessenger.set_channel('scheduler') self.redismessenger.set_section('execjob') - - # load error messages - error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" - f = open(error_file) - self.error_data = json.load(f) - f.close() - #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal') - # create exit event + # Create exit event self.exit_event = threading.Event() - # start loading new programm every hour + # Start thread to load new programme info every hour self.start() - # ------------------------------------------------------------------------------------------ # - def init_database(self): - if self.config.get("recreate_db") is not None: - AuraDatabaseModel.recreate_db(systemexit=True) - - # check if tables do exist. if not create them - try: - Playlist.select_all() - except sqlalchemy.exc.ProgrammingError as e: - errcode = e.orig.args[0] - if errcode == 1146: # error for no such table - x = AuraDatabaseModel() - x.recreate_db() - else: - raise - # ------------------------------------------------------------------------------------------ # def run(self): + """ + Called when thread is started via `start()`. It calls `self.fetch_new_program()` + periodically depending on the `fetching_frequency` define engine configuration. + """ while not self.exit_event.is_set(): - # set seconds to wait seconds_to_wait = int(self.config.get("fetching_frequency")) - - # calc next time next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) - - # write to logger - self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time)) + self.logger.info("Fetch new programmes every %ss. Next fetching in %ss." % (str(seconds_to_wait), str(next_time))) # empty database # self.logger.info("emptying database") # ScheduleEntry.truncate() - # fetch new programme self.fetch_new_programme() - - # and wait self.exit_event.wait(seconds_to_wait) - # ------------------------------------------------------------------------------------------ # - def stop(self): - self.exit_event.set() - # ------------------------------------------------------------------------------------------ # + +# +# PUBLIC METHODS +# + def get_active_entry(self): - now_unix = time.mktime(datetime.datetime.now().timetuple()) - lastentry = None + """ + Retrieves the current `Show` and `Track` tuple being played. + Externally called via `LiquidSoapCommunicator`. - # load programme if necessary - if self.programme is None: - self.logger.debug("want to get active channel, but have to load programme first") - self.load_programme_from_db() + Returns: + (Show, Entry): The show and track to be played next. + """ + # now_unix = time.mktime(datetime.datetime.now().timetuple()) + # lastentry = None + + # # Load programme if necessary + # if self.programme is None: + # self.logger.info("Next track requested: Need to load programme from database first.") + # self.load_programme_from_db() + + # # Get the entry currently being played + # for show in self.programme: + # for entry in show.playlist: + # # check if lastentry is set and if act entry is in the future + # if lastentry is not None and entry.start_unix > now_unix: + # # return entry if so + # return (show,entry) # actsource = entry.source + + # lastentry = entry + + # return None, None + + # FIXME active_entry logic + if not self.active_entry: + self.logger.warning("No active entry set! Is currently nothing or a fallback playing?") + return (None, None) + else: + return self.active_entry + - # get active source - for show in self.programme: - for entry in show.playlist: - # check if lastentry is set and if act entry is in the future - if lastentry is not None and entry.entry_start_unix > now_unix: - # return entry if so - return (show,entry) # actsource = entry.source - lastentry = entry + def get_act_programme_as_string(self): + """ + Fetches the latest programme and returns it as `String`. + Also used by `ServerRedisAdapter`. + + Return: + (String): Programme + + Raises: + (Exception): In case the programme cannot be converted to String + """ + programme_as_string = "" + + if self.programme is None or len(self.programme) == 0: + self.fetch_new_program() + + try: + programme_as_string = json.dumps([p._asdict() for p in self.programme], default=alchemyencoder) + # FIXME Change to more specific exception + except Exception as e: + self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e)) + traceback.print_exc() + + return programme_as_string + + + + def print_message_queue(self): + """ + Prints the current message queue i.e. tracks in the queue to be played. + """ + message_queue = "" + messages = sorted(self.message_timer, key=attrgetter('diff')) + if not messages: + self.logger.warning("There's nothing in the Message Queue!") + else: + for msg in messages: + message_queue += str(msg)+"\n" + + self.logger.info("Message Queue: " + message_queue) - return None, None # ------------------------------------------------------------------------------------------ # - def load_programme_from_db(self, silent=False): + def set_next_file_for(self, playlistname): + self.logger.critical("HAVE TO <SET> NEXT FILE FOR: " + playlistname) + self.logger.critical(str(self.get_active_entry())) + + if playlistname == "station": + file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" + elif playlistname == "timeslot": + file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" + elif playlistname == "show": + file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" + else: + file = "" + self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!") + + self.logger.info("Set next fallback file for " + playlistname + ": " + file) + self.redismessenger.set_next_file_for(playlistname, file) + return file + + + def get_next_file_for(self, fallbackname): + """ + Evaluates the next fallback file to be played for a given fallback-type. + Valid fallback-types are: + + * timeslot + * show + * station + + Returns: + (String): Absolute path to the file to be played as a fallback. + """ + self.logger.critical("HAVE TO <GET> NEXT FILE FOR: " + fallbackname) + (show, entry) = self.get_active_entry() + self.logger.critical(str(show) + " " + str(entry)) + + if fallbackname == "timeslot": + file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" + elif fallbackname == "show": + file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" + elif fallbackname == "station": + file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" + else: + file = "" + self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!") + + #set_next_file_thread = SetNextFile(fallbackname, show) + #set_next_file_thread.start() + + self.logger.info("Got next fallback file for '" + fallbackname + "': " + file) +# self.redismessenger.set_next_file_for(playlistname, file) + return file + + +# +# PRIVATE METHODS +# + + + def fetch_new_programme(self): + """ + Fetch the latest programme from `AuraCalendarService`. + In case no programme is successfully returned, it is tried + to retrieve the programme from Engine's database. + """ + self.logger.info("Trying to fetch new program...") + + acs = AuraCalendarService(self.config) + queue = acs.get_queue() + acs.start() # start fetching thread + response = queue.get() # wait for the end + + # Reset last successful fetch state + lsf = self.last_successful_fetch + self.last_successful_fetch = None + + if response is None: + self.logger.warning("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.") + + elif type(response) is list: + self.programme = response + + if self.programme is not None and len(self.programme) > 0: + self.last_successful_fetch = datetime.datetime.now() + + if len(self.programme) == 0: + self.logger.critical("Programme fetched from Steering/Tank has no entries!") + + # return self.get_act_programme_as_string() + elif response.startswith("fetching_aborted"): + # TODO Check why the 16th entry is logged only + self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:]) + else: + self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response) + + # if somehow the programme could not be fetched => try to load it from database + #if self.last_successful_fetch is None: + self.last_successful_fetch = lsf + self.load_programme_from_db() + + + + def load_programme_from_db(self): + """ + Loads the programme from Engine's database and enables + them via `self.enable_entries(..)`. After that, the + current message queue is printed to the console. + """ self.programme = Schedule.select_act_programme() if self.programme is None or len(self.programme) == 0: @@ -212,8 +360,8 @@ class AuraScheduler(ExceptionLogger, threading.Thread): planned_entries.append(p) self.enable_entries(planned_entries) - - self.logger.warning(self.print_message_queue()) + self.print_message_queue() + # ------------------------------------------------------------------------------------------ # def enable_entries(self, playlist): @@ -225,11 +373,16 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # old entry for fading out old_entry = None - for entry in playlist: + # FIXME Correct timing behaviour + time_marker = playlist[0].start_unix + + for entry in playlist[0].entries: + track_len = (entry.duration / 1000000 / 60) + time_marker += track_len # since we get also programmes from the past, filter these out - if entry.entry_start_unix > now_unix: + if time_marker > now_unix: # when do we have to start? - diff = entry.entry_start_unix - now_unix + diff = time_marker - now_unix diff = diff/100 # testing purpose @@ -239,6 +392,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # store the old entry for fading out old_entry = entry + # ------------------------------------------------------------------------------------------ # def enable_timer(self, diff, entry, old_entry): # create the activation threads and run them after <diff> seconds self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry)) @@ -263,6 +417,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread): new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True) self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry)) + # ------------------------------------------------------------------------------------------ # def add_or_update_timer(self, diff, func, parameters): timer = None @@ -270,6 +425,8 @@ class AuraScheduler(ExceptionLogger, threading.Thread): planned_timer = self.is_something_planned_at_time(entry.schedule_start) # if something is planned on entry.entry_start + #FIXME + #if 1==0: if planned_timer: planned_entry = planned_timer.entry @@ -324,110 +481,48 @@ class AuraScheduler(ExceptionLogger, threading.Thread): return t return False - # ------------------------------------------------------------------------------------------ # - def get_act_programme_as_string(self): - programme_as_string = "" - - if self.programme is None or len(self.programme) == 0: - self.fetch_new_programme() - - try: - programme_as_string = json.dumps([p._asdict() for p in self.programme], default=alchemyencoder) - except Exception as e: - self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e)) - traceback.print_exc() - - return programme_as_string - - # ------------------------------------------------------------------------------------------ # - def print_message_queue(self): - message_queue = "" - for t in sorted(self.message_timer, key=attrgetter('diff')): - message_queue += str(t)+"\n" - - return message_queue - - # ------------------------------------------------------------------------------------------ # - def fetch_new_programme(self): - self.logger.info("trying to fetch new programme") - - acs = AuraCalendarService(self.config) - queue = acs.get_queue() - - # start fetching thread - acs.start() - - # wait for the end - response = queue.get() - - # reset - lsf = self.last_successful_fetch - self.last_successful_fetch = None - if response is None: - self.logger.warning("Trying to load programme from database, because i got an EMPTY (None) response from AuraCalendarService.") - - elif type(response) is list: - self.programme = response + def init_error_messages(self): + """ + Load error messages + """ + error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" + f = open(error_file) + self.error_data = json.load(f) + f.close() - if self.programme is not None and len(self.programme) > 0: - self.last_successful_fetch = datetime.datetime.now() - if len(self.programme) == 0: - self.logger.critical("Programme fetched from pv/tank has no entries!") - # return self.get_act_programme_as_string() - elif response.startswith("fetching_aborted"): - self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:]) - else: - self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response) - - # if somehow the programme could not be fetched => try to load it from database - if self.last_successful_fetch is None: - self.last_successful_fetch = lsf - self.load_programme_from_db() + def init_database(self): + """ + Initializes the database. - # ------------------------------------------------------------------------------------------ # - def set_next_file_for(self, playlistname): - self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname) - self.logger.critical(str(self.get_active_entry())) + Raises: + sqlalchemy.exc.ProgrammingError: In case the DB model is invalid + """ + if self.config.get("recreate_db") is not None: + AuraDatabaseModel.recreate_db(systemexit=True) - if playlistname == "station": - file = "/var/audio/fallback/eins.zwo.bombe.flac" - elif playlistname == "timeslot": - file = "/var/audio/fallback/ratm.killing.flac" - elif playlistname == "show": - file = "/var/audio/fallback/weezer.hash.pipe.flac" - else: - file = "" - self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!") + # Check if tables exists, if not create them + try: + Playlist.select_all() + except sqlalchemy.exc.ProgrammingError as e: + errcode = e.orig.args[0] - self.logger.info("Set next fallback file for " + playlistname + ": " + file) - self.redismessenger.set_next_file_for(playlistname, file) - return file + if errcode == 1146: # Error for no such table + x = AuraDatabaseModel() + x.recreate_db() + else: + raise - # ------------------------------------------------------------------------------------------ # - def get_next_file_for(self, fallbackname): - self.logger.critical("HAVE TO SET NEXT FILE FOR: " + fallbackname) - (show, entry) = self.get_active_entry() - self.logger.critical(str(show) + " " + str(entry)) - if fallbackname == "station": - file = "/home/david/Code/aura/engine/testing/content/1.flac" - elif fallbackname == "timeslot": - file = "/home/david/Code/aura/engine/testing/content/2.flac" - elif fallbackname == "show": - file = "/home/david/Code/aura/engine/testing/content/3.flac" - else: - file = "" - self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!") - #set_next_file_thread = SetNextFile(fallbackname, show) - #set_next_file_thread.start() + def stop(self): + """ + Called when thread is stopped. + """ + self.exit_event.set() -# self.logger.info("Set next fallback file for " + playlistname + ": " + file) -# self.redismessenger.set_next_file_for(playlistname, file) - return file # ------------------------------------------------------------------------------------------ # class SetNextFile(threading.Thread): -- GitLab