Skip to content
Snippets Groups Projects
scheduler.py 18.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  engine
    
    #  Playout Daemon for autoradio project
    
    #  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
    
    #  This file is part of engine.
    #
    #  engine is free software: you can redistribute it and/or modify
    #  it under the terms of the GNU General Public License as published by
    #  the Free Software Foundation, either version 3 of the License, or
    #  any later version.
    #
    #  engine is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with engine. If not, see <http://www.gnu.org/licenses/>.
    
    __license__ = "GNU General Public License (GPL) Version 3"
    
    __version_info__ = (0, 0, 1)
    __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
    
    Is holding the eventqueue
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    import traceback
    
    from modules.communication.redis.messenger import RedisMessenger
    
    from modules.scheduling.calendar import AuraCalendarService
    
    from libraries.database.broadcasts import Schedule, Playlist, AuraDatabaseModel
    
    from libraries.exceptions.exception_logger import ExceptionLogger
    
    from libraries.enum.auraenumerations import ScheduleEntryType, TimerType
    
    def alchemyencoder(obj):
        """JSON encoder function for SQLAlchemy special classes."""
        if isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
    
        elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
            return ""
    
            return json.dumps([obj._asdict()], default=alchemyencoder)
    
    # ------------------------------------------------------------------------------------------ #
    
    class AuraScheduler(ExceptionLogger, threading.Thread):
    
        """
        Aura Scheduler Class
        Gets data from pv and importer, stores and fires events
        """
    
        redismessenger = None
    
    
        liquidsoapcommunicator = None
        schedule_entries = None
        active_entry = None
    
            @param   config:               read engine.ini
    
            self.redismessenger = RedisMessenger(config)
    
            self.logger = logging.getLogger("AuraEngine")
    
            # init threading
            threading.Thread.__init__(self)
    
            # init messenger.. probably not needed anymore
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.set_channel('scheduler')
            self.redismessenger.set_section('execjob')
    
            error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
    
            self.error_data = json.load(f)
    
            #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
    
            # start loading new programm every hour
            self.start()
    
    
        # ------------------------------------------------------------------------------------------ #
        def init_database(self):
    
            if self.config.get("recreate_db") is not None:
                AuraDatabaseModel.recreate_db(systemexit=True)
    
    
            # check if tables do exist. if not create them
            try:
    
            except sqlalchemy.exc.ProgrammingError as e:
    
                errcode = e.orig.args[0]
    
                if errcode == 1146: # error for no such table
                    x = AuraDatabaseModel()
                    x.recreate_db()
    
        # ------------------------------------------------------------------------------------------ #
    
                # set seconds to wait
                seconds_to_wait = int(self.config.get("fetching_frequency"))
    
    
                # calc next time
                next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
    
                # write to logger
    
                self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))
    
    #            self.logger.info("emptying database")
    #            ScheduleEntry.truncate()
    
                # fetch new programme
    
        # ------------------------------------------------------------------------------------------ #
    
        # ------------------------------------------------------------------------------------------ #
    
        def get_active_entry(self):
    
            now_unix = time.mktime(datetime.datetime.now().timetuple())
            lastentry = None
    
            # load programme if necessary
    
            if self.programme is None:
    
                self.logger.debug("want to get active channel, but have to load programme first")
    
                self.load_programme_from_db()
    
            for show in self.programme:
                for entry in show.playlist:
                    # check if lastentry is set and if act entry is in the future
                    if lastentry is not None and entry.entry_start_unix > now_unix:
                        # return entry if so
                        return (show,entry) # actsource = entry.source
    
            return None
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def load_programme_from_db(self, silent=False):
    
            self.programme = Schedule.select_act_programme()
    
            planned_entries = []
    
            for schedule in self.programme:
                # playlist to play
    
                schedule.playlist = Playlist.select_playlist(schedule.playlist_id)
    
                # show fallback is played when playlist fails
    
                schedule.showfallback = Playlist.select_playlist(schedule.show_fallback_id)
    
                # timeslot fallback is played when show fallback fails
    
                schedule.timeslotfallback = Playlist.select_playlist(schedule.timeslot_fallback_id)
    
                # station fallback is played when timeslot fallback fails
    
                schedule.stationfallback = Playlist.select_playlist(schedule.station_fallback_id)
    
                for p in schedule.playlist:
                    planned_entries.append(p)
    
            self.enable_entries(planned_entries)
    
            self.logger.warning(self.print_message_queue())
    
        # ------------------------------------------------------------------------------------------ #
        def enable_entries(self, playlist):
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            # now in unixtime
            now_unix = time.mktime(datetime.datetime.now().timetuple())
    
    
            # switch to check if its the first stream in loaded programme
    
            # old entry for fading out
            old_entry = None
    
    
            for entry in playlist:
                # since we get also programmes from the past, filter these out
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                if entry.entry_start_unix > now_unix:
    
                    # when do we have to start?
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    diff = entry.entry_start_unix - now_unix
    
                    # enable the three timer
                    self.enable_timer(diff, entry, old_entry)
    
                # store the old entry for fading out
                old_entry = entry
    
    
        def enable_timer(self, diff, entry, old_entry):
            # create the activation threads and run them after <diff> seconds
            self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry))
            entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
            self.enable_fading(diff, entry, old_entry)
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def enable_fading(self, diff, new_entry, old_entry):
            # fading times
            fade_out_time = float(self.config.get("fade_out_time"))
    
    
            # enable fading when entry types are different
            if old_entry is not None:
                if old_entry.type != new_entry.type:
    
                    #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry])
                    old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True)
                    self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry))
    
            # same for fadein except old_entry can be None
            if old_entry.type != new_entry.type:
                #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry])
                new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True)
                self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry))
    
        # ------------------------------------------------------------------------------------------ #
    
        def add_or_update_timer(self, diff, func, parameters):
    
            timer = None
            entry = parameters[0]
            planned_timer = self.is_something_planned_at_time(entry.entry_start)
    
            # if something is planned on entry.entry_start
            if planned_timer:
                planned_entry = planned_timer.entry
    
                # check if the playlist_id's are different
                if planned_entry.playlist_id != entry.playlist_id:
                    # if not stop the old timer and remove it from the list
                    self.stop_timer(planned_timer)
    
                    # and create a new one
    
                    timer = self.create_timer(diff, func, parameters, switcher=True)
    
                # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same
    
            # if nothing is planned at given time, create a new timer
    
                timer = self.create_timer(diff, func, parameters, switcher=True)
    
            if timer is None:
                return planned_timer
            return timer
    
    
        # ------------------------------------------------------------------------------------------ #
        def stop_timer(self, timer):
            # stop timer
            timer.cancel()
    
            if timer.entry.fadeintimer is not None:
                timer.entry.fadeintimer.cancel()
                self.message_timer.remove(timer.entry.fadeintimer)
            if timer.entry.fadeouttimer is not None:
                timer.entry.fadeouttimer.cancel()
                self.message_timer.remove(timer.entry.fadeouttimer)
    
    
            self.logger.critical("REMOVED TIMER for " + str(timer.entry))
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False):
            if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
                raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True")
    
            t = CallFunctionTimer(diff, func, parameters, fadein, fadeout, switcher)
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def is_something_planned_at_time(self, given_time):
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
    
        def get_act_programme_as_string(self):
    
            programme_as_string = ""
    
            if self.programme is None or len(self.programme) == 0:
                self.fetch_new_programme()
    
                programme_as_string = json.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
    
            except Exception as e:
                self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
    
                traceback.print_exc()
    
            return programme_as_string
    
        # ------------------------------------------------------------------------------------------ #
        def print_message_queue(self):
            message_queue = ""
    
            for t in sorted(self.message_timer, key=attrgetter('diff')):
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.info("trying to fetch new programme")
    
            if self.tried_fetching == self.fetch_max:
    
                msg = "Cannot connect to PV or Tank! No Programme loaded!"
                self.logger.error(msg)
    
            acs = AuraCalendarService(self.config)
            queue = acs.get_queue()
    
            if response is None:
                self.logger.critical("Got an EMPTY response from AuraCalendarService: " + str(response))
            elif type(response) is list:
                self.logger.critical("not loading from db")
                self.programme = response
                # self.load_programme_from_db()
    
                if self.programme is not None and len(self.programme) > 0:
                    self.tried_fetching = 0
    
                if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
                    self.logger.critical("Programme loaded from database has no entries!")
    
                # return self.get_act_programme_as_string()
    
                self.logger.warning("Fetching was being aborted from AuraCalendarService! Reason: " + response)
    
                self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
    
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
            self.logger.critical(str(self.get_active_entry()))
    
                file = "/var/audio/fallback/eins.zwo.bombe.flac"
    
                file = "/var/audio/fallback/ratm.killing.flac"
    
                file = "/var/audio/fallback/weezer.hash.pipe.flac"
    
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
    
            self.logger.info("Set next fallback file for " + playlistname + ": " + file)
            self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
        # ------------------------------------------------------------------------------------------ #
        def get_next_file_for(self, fallbackname):
            self.logger.critical("HAVE TO SET NEXT FILE FOR: " + fallbackname)
            (show, entry) = self.get_active_entry()
            self.logger.critical(str(show) + " " + str(entry))
    
            if fallbackname == "station":
                file = "/var/audio/fallback/eins.zwo.bombe.flac"
            elif fallbackname == "timeslot":
                file = "/var/audio/fallback/ratm.killing.flac"
            elif fallbackname == "show":
                file = "/var/audio/fallback/weezer.hash.pipe.flac"
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!")
    
            #set_next_file_thread = SetNextFile(fallbackname, show)
            #set_next_file_thread.start()
    
    #        self.logger.info("Set next fallback file for " + playlistname + ": " + file)
    #        self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
    # ------------------------------------------------------------------------------------------ #
    class SetNextFile(threading.Thread):
        fallbackname = None
        show = None
    
        def __init__(self, fallbackname, show):
            threading.Thread.__init__(self)
            self.fallbackname = fallbackname
            self.show = show
    
        def run(self):
            if self.fallbackname == "show":
                self.detect_next_file_for(self.show.showfallback)
            elif self.fallbackname == "timeslow":
                self.detect_next_file_for(self.show.timeslotfallback)
            elif self.fallbackname == "station":
                self.detect_next_file_for(self.show.stationfallback)
    
        def detect_next_file_for(self, playlist):
            return ""
            #if playlist.startswith("pool"):
            #    self.find_next_file_in_pool(playlist)
    
        #def find_next_file_in_pool(self, pool):
        #    return ""
    
    
    # ------------------------------------------------------------------------------------------ #
    
    class CallFunctionTimer(threading.Timer):
    
        def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False):
    
            if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
                raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")
    
    
            self.fadein = fadein
            self.fadeout = fadeout
            self.switcher = switcher
    
            self.logger = logging.getLogger("AuraEngine")
    
        # ------------------------------------------------------------------------------------------ #
        def __str__(self):
    
            if self.fadein:
                return "CallFunctionTimer starting in " + str(self.diff) + "s fading in source '" + str(self.entry)
            elif self.fadeout:
                return "CallFunctionTimer starting in " + str(self.diff) + "s fading out source '" + str(self.entry)
            elif self.switcher:
                return "CallFunctionTimer starting in " + str(self.diff) + "s switching to source '" + str(self.entry)
    
                return "CORRUPTED CallFunctionTimer around! How can that be?"