Skip to content
Snippets Groups Projects
scheduler.py 21.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  Playout Daemon for autoradio project
    
    #  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
    
    #  This file is part of engine.
    #
    #  engine is free software: you can redistribute it and/or modify
    #  it under the terms of the GNU General Public License as published by
    #  the Free Software Foundation, either version 3 of the License, or
    #  any later version.
    #
    #  engine is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with engine. If not, see <http://www.gnu.org/licenses/>.
    
    __license__ = "GNU General Public License (GPL) Version 3"
    
    __version_info__ = (0, 0, 1)
    __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    import traceback
    
    from modules.communication.redis.messenger import RedisMessenger
    
    from modules.scheduling.calendar import AuraCalendarService
    
    from libraries.database.broadcasts import Schedule, Playlist, AuraDatabaseModel
    
    from libraries.exceptions.exception_logger import ExceptionLogger
    
    from libraries.enum.auraenumerations import ScheduleEntryType, TimerType
    
    def alchemyencoder(obj):
        """JSON encoder function for SQLAlchemy special classes."""
        if isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
    
        elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
            return ""
    
            return json.dumps([obj._asdict()], default=alchemyencoder)
    
    # ------------------------------------------------------------------------------------------ #
    
    class AuraScheduler(ExceptionLogger, threading.Thread):
    
    
        - Gets data from Steering and Tanks
        - Stores and fires events for LiquidSoap
    
        Attributes:
            config (AuraConfig):                    Holds the Engine Configuration
            logger:                                 The logger
            exit_event(threading.Event):            Used to exit the thread if requested                             
            liquidsoapcommunicator:                 Stores the connection to LiquidSoap 
            last_successful_fetch (datetime):       Stores the last time a fetch from Steering/Tank was successful
    
            programme:                              The current radio programme to be played as defined in the local engine database
            active_entry(Show, Track):              This is a Tuple consisting of the currently played `Show` and `Track`
            message_timer(Array<threading.Timer>):  The message queue of tracks to be played
    
        redismessenger = None
    
        config = None
        logger = None
        exit_event = None
    
        liquidsoapcommunicator = None
    
        last_successful_fetch = None
    
    
        active_entry = None
        message_timer = []
    
        #schedule_entries = None
    
    
            Args:
                config (AuraConfig):    Reads the engine configuration
    
            self.logger = logging.getLogger("AuraEngine")
            self.init_error_messages()
    
            self.redismessenger = RedisMessenger(config)
    
            # init threading
            threading.Thread.__init__(self)
    
            # init messenger.. probably not needed anymore
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.set_channel('scheduler')
            self.redismessenger.set_section('execjob')
    
            #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
    
            # Create exit event
    
            # Start thread to load new programme info every hour
    
            """
            Called when thread is started via `start()`. It calls `self.fetch_new_program()`
            periodically depending on the `fetching_frequency` define engine configuration.
            """
    
                seconds_to_wait = int(self.config.get("fetching_frequency"))
    
                next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
    
                self.logger.info("Fetch new programmes every %ss. Next fetching in %ss." % (str(seconds_to_wait), str(next_time)))
    
    #            self.logger.info("emptying database")
    #            ScheduleEntry.truncate()
    
        def get_active_entry(self):
    
            """
            Retrieves the current `Show` and `Track` tuple being played.
            Externally called via `LiquidSoapCommunicator`.
    
            Returns:
                (Show, Entry): The show and track to be played next.
            """
            # now_unix = time.mktime(datetime.datetime.now().timetuple())
            # lastentry = None
    
            # # Load programme if necessary
            # if self.programme is None:
            #     self.logger.info("Next track requested: Need to load programme from database first.")
            #     self.load_programme_from_db()
    
            # # Get the entry currently being played
            # for show in self.programme:
            #     for entry in show.playlist:
            #         # check if lastentry is set and if act entry is in the future
            #         if lastentry is not None and entry.start_unix > now_unix:
            #             # return entry if so
            #             return (show,entry) # actsource = entry.source
    
            #         lastentry = entry
    
            # return None, None
    
            # FIXME active_entry logic
            if not self.active_entry:
                self.logger.warning("No active entry set! Is currently nothing or a fallback playing?")
                return (None, None)
            else:
                return self.active_entry
     
    
        def get_act_programme_as_string(self):
            """
            Fetches the latest programme and returns it as `String`.
            Also used by `ServerRedisAdapter`.
    
            Return:
                (String):       Programme
    
            Raises:
                (Exception):    In case the programme cannot be converted to String
            """
            programme_as_string = ""
    
            if self.programme is None or len(self.programme) == 0:
                self.fetch_new_program()
    
            try:
                programme_as_string = json.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
                # FIXME Change to more specific exception
            except Exception as e:
                self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
                traceback.print_exc()
    
            return programme_as_string
    
    
    
        def print_message_queue(self):
            """
            Prints the current message queue i.e. tracks in the queue to be played.
            """
            message_queue = ""
            messages = sorted(self.message_timer, key=attrgetter('diff'))
            if not messages:
                self.logger.warning("There's nothing in the Message Queue!")
            else:
                for msg in messages:
                    message_queue += str(msg)+"\n"
    
                self.logger.info("Message Queue: " + message_queue) 
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def set_next_file_for(self, playlistname):
            self.logger.critical("HAVE TO <SET> NEXT FILE FOR: " + playlistname)
            self.logger.critical(str(self.get_active_entry()))
    
            if playlistname == "station":
                file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
            elif playlistname == "timeslot":
                file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
            elif playlistname == "show":
                file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
    
            self.logger.info("Set next fallback file for " + playlistname + ": " + file)
            self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
    
        def get_next_file_for(self, fallbackname):
            """
            Evaluates the next fallback file to be played for a given fallback-type.
            Valid fallback-types are:
    
                * timeslot
                * show
                * station
    
            Returns:
                (String):   Absolute path to the file to be played as a fallback.
            """
            self.logger.critical("HAVE TO <GET> NEXT FILE FOR: " + fallbackname)
            (show, entry) = self.get_active_entry()
            self.logger.critical(str(show) + " " + str(entry))
    
            if fallbackname == "timeslot":
                file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
            elif fallbackname == "show":
                file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
            elif fallbackname == "station":
                file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!")
    
            #set_next_file_thread = SetNextFile(fallbackname, show)
            #set_next_file_thread.start()
    
            self.logger.info("Got next fallback file for '" + fallbackname + "': " + file)
    #        self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
    
    #
    #   PRIVATE METHODS
    #
    
    
        def fetch_new_programme(self):
            """
            Fetch the latest programme from `AuraCalendarService`.
            In case no programme is successfully returned, it is tried
            to retrieve the programme from Engine's database.
            """
            self.logger.info("Trying to fetch new program...")
    
            acs = AuraCalendarService(self.config)
            queue = acs.get_queue()
            acs.start() # start fetching thread
            response = queue.get() # wait for the end
    
            # Reset last successful fetch state
            lsf = self.last_successful_fetch
            self.last_successful_fetch = None
    
            if response is None:
                self.logger.warning("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.")
    
            elif type(response) is list:
                self.programme = response
    
                if self.programme is not None and len(self.programme) > 0:
                    self.last_successful_fetch = datetime.datetime.now()
    
                if len(self.programme) == 0:
                    self.logger.critical("Programme fetched from Steering/Tank has no entries!")
    
                # return self.get_act_programme_as_string()
            elif response.startswith("fetching_aborted"):
                # TODO Check why the 16th entry is logged only
                self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:])
            else:
                self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response)
    
            # if somehow the programme could not be fetched => try to load it from database
            #if self.last_successful_fetch is None:
            self.last_successful_fetch = lsf
            self.load_programme_from_db()
    
    
    
        def load_programme_from_db(self):
            """
            Loads the programme from Engine's database and enables
            them via `self.enable_entries(..)`. After that, the
            current message queue is printed to the console.
            """
    
            self.programme = Schedule.select_act_programme()
    
    
            if self.programme is None or len(self.programme) == 0:
                self.logger.critical("Could not load programme from database. We are in big trouble my friend!")
                return
    
    
            planned_entries = []
    
            for schedule in self.programme:
                # playlist to play
    
                schedule.playlist = Playlist.select_playlist(schedule.playlist_id)
    
                # show fallback is played when playlist fails
    
                schedule.showfallback = Playlist.select_playlist(schedule.show_fallback_id)
    
                # timeslot fallback is played when show fallback fails
    
                schedule.timeslotfallback = Playlist.select_playlist(schedule.timeslot_fallback_id)
    
                # station fallback is played when timeslot fallback fails
    
                schedule.stationfallback = Playlist.select_playlist(schedule.station_fallback_id)
    
                for p in schedule.playlist:
                    planned_entries.append(p)
    
            self.enable_entries(planned_entries)
    
            self.print_message_queue()
           
    
    
        # ------------------------------------------------------------------------------------------ #
        def enable_entries(self, playlist):
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            # now in unixtime
            now_unix = time.mktime(datetime.datetime.now().timetuple())
    
    
            # switch to check if its the first stream in loaded programme
    
            # old entry for fading out
            old_entry = None
    
    
            # FIXME Correct timing behaviour
            time_marker = playlist[0].start_unix
        
            for entry in playlist[0].entries:
                track_len = (entry.duration / 1000000 / 60)
                time_marker += track_len
    
                # since we get also programmes from the past, filter these out
    
                if time_marker > now_unix:
    
                    # when do we have to start?
    
                    diff = time_marker - now_unix
    
                    # enable the three timer
                    self.enable_timer(diff, entry, old_entry)
    
                # store the old entry for fading out
                old_entry = entry
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def enable_timer(self, diff, entry, old_entry):
            # create the activation threads and run them after <diff> seconds
            self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry))
            entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
            self.enable_fading(diff, entry, old_entry)
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def enable_fading(self, diff, new_entry, old_entry):
            # fading times
            fade_out_time = float(self.config.get("fade_out_time"))
    
    
            # enable fading when entry types are different
            if old_entry is not None:
                if old_entry.type != new_entry.type:
    
                    #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry])
                    old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True)
                    self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry))
    
            # same for fadein except old_entry can be None
    
                #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry])
                new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True)
                self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry))
    
        # ------------------------------------------------------------------------------------------ #
    
        def add_or_update_timer(self, diff, func, parameters):
    
            planned_timer = self.is_something_planned_at_time(entry.schedule_start)
    
            # if something is planned on entry.entry_start
    
            if planned_timer:
                planned_entry = planned_timer.entry
    
                # check if the playlist_id's are different
                if planned_entry.playlist_id != entry.playlist_id:
                    # if not stop the old timer and remove it from the list
                    self.stop_timer(planned_timer)
    
                    # and create a new one
    
                    timer = self.create_timer(diff, func, parameters, switcher=True)
    
                # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same
    
            # if nothing is planned at given time, create a new timer
    
                timer = self.create_timer(diff, func, parameters, switcher=True)
    
            if timer is None:
                return planned_timer
            return timer
    
    
        # ------------------------------------------------------------------------------------------ #
        def stop_timer(self, timer):
            # stop timer
            timer.cancel()
    
            if timer.entry.fadeintimer is not None:
                timer.entry.fadeintimer.cancel()
                self.message_timer.remove(timer.entry.fadeintimer)
            if timer.entry.fadeouttimer is not None:
                timer.entry.fadeouttimer.cancel()
                self.message_timer.remove(timer.entry.fadeouttimer)
    
    
            self.logger.critical("REMOVED TIMER for " + str(timer.entry))
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False):
            if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
                raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True")
    
            t = CallFunctionTimer(diff, func, parameters, fadein, fadeout, switcher)
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def is_something_planned_at_time(self, given_time):
    
        def init_error_messages(self):
            """
            Load error messages
            """
            error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
            f = open(error_file)
            self.error_data = json.load(f)
            f.close()
    
        def init_database(self):
            """
            Initializes the database.
    
            Raises:
                sqlalchemy.exc.ProgrammingError:    In case the DB model is invalid
            """
            if self.config.get("recreate_db") is not None:
                AuraDatabaseModel.recreate_db(systemexit=True)
    
            # Check if tables exists, if not create them
            try:
                Playlist.select_all()
            except sqlalchemy.exc.ProgrammingError as e:
                errcode = e.orig.args[0]
    
                if errcode == 1146: # Error for no such table
                    x = AuraDatabaseModel()
                    x.recreate_db()
                else:
                    raise
    
        def stop(self):
            """
            Called when thread is stopped.
            """
            self.exit_event.set()
    
    
    
    # ------------------------------------------------------------------------------------------ #
    class SetNextFile(threading.Thread):
        fallbackname = None
        show = None
    
        def __init__(self, fallbackname, show):
            threading.Thread.__init__(self)
            self.fallbackname = fallbackname
            self.show = show
    
        def run(self):
            if self.fallbackname == "show":
                self.detect_next_file_for(self.show.showfallback)
            elif self.fallbackname == "timeslow":
                self.detect_next_file_for(self.show.timeslotfallback)
            elif self.fallbackname == "station":
                self.detect_next_file_for(self.show.stationfallback)
    
        def detect_next_file_for(self, playlist):
            return ""
            #if playlist.startswith("pool"):
            #    self.find_next_file_in_pool(playlist)
    
        #def find_next_file_in_pool(self, pool):
        #    return ""
    
    
    # ------------------------------------------------------------------------------------------ #
    
    class CallFunctionTimer(threading.Timer):
    
        def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False):
    
            if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
                raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")
    
    
            self.fadein = fadein
            self.fadeout = fadeout
            self.switcher = switcher
    
            self.logger = logging.getLogger("AuraEngine")
    
        # ------------------------------------------------------------------------------------------ #
        def __str__(self):
    
            if self.fadein:
                return "CallFunctionTimer starting in " + str(self.diff) + "s fading in source '" + str(self.entry)
            elif self.fadeout:
                return "CallFunctionTimer starting in " + str(self.diff) + "s fading out source '" + str(self.entry)
            elif self.switcher:
                return "CallFunctionTimer starting in " + str(self.diff) + "s switching to source '" + str(self.entry)
    
                return "CORRUPTED CallFunctionTimer around! How can that be?"