Skip to content
Snippets Groups Projects
scheduler.py 18.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  engine
    
    #  Playout Daemon for autoradio project
    
    #  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
    
    #  This file is part of engine.
    #
    #  engine is free software: you can redistribute it and/or modify
    #  it under the terms of the GNU General Public License as published by
    #  the Free Software Foundation, either version 3 of the License, or
    #  any later version.
    #
    #  engine is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with engine. If not, see <http://www.gnu.org/licenses/>.
    
    __license__ = "GNU General Public License (GPL) Version 3"
    
    __version_info__ = (0, 0, 1)
    __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
    
    Is holding the eventqueue
    
    """
    import time
    import simplejson
    import datetime
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    import traceback
    
    from modules.communication.redis.messenger import RedisMessenger
    
    from modules.scheduling.calendar import AuraCalendarService
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
    
    from libraries.exceptions.exception_logger import ExceptionLogger
    
    from libraries.enum.auraenumerations import ScheduleEntryType, TimerType
    
    def alchemyencoder(obj):
        """JSON encoder function for SQLAlchemy special classes."""
        if isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
    
        elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
            return ""
    
            return simplejson.dumps([obj._asdict()], default=alchemyencoder)
    
    # ------------------------------------------------------------------------------------------ #
    
    class AuraScheduler(ExceptionLogger, threading.Thread):
    
        """
        Aura Scheduler Class
        Gets data from pv and importer, stores and fires events
        """
    
        redismessenger = None
    
    
        liquidsoapcommunicator = None
        schedule_entries = None
        active_entry = None
    
            @param   config:               read engine.ini
    
            self.redismessenger = RedisMessenger(config)
    
            self.logger = logging.getLogger("AuraEngine")
    
            # init threading
            threading.Thread.__init__(self)
    
            # init messenger.. probably not needed anymore
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.set_channel('scheduler')
            self.redismessenger.set_section('execjob')
    
            error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
    
            f = open(error_file)
            self.error_data = simplejson.load(f)
            f.close()
    
    
            # init database ?
            self.init_database()
    
    
            #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
    
            # start loading new programm every hour
            self.start()
    
    
        # ------------------------------------------------------------------------------------------ #
        def init_database(self):
            # check if tables do exist. if not create them
            try:
                ScheduleEntry.select_all()
            except sqlalchemy.exc.ProgrammingError as e:
    
                errcode = e.orig.args[0]
    
                if errcode == 1146: # error for no such table
                    x = AuraDatabaseModel()
                    x.recreate_db()
    
        # ------------------------------------------------------------------------------------------ #
    
                # set seconds to wait
                seconds_to_wait = int(self.config.get("fetching_frequency"))
    
    
                # calc next time
                next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
    
                # write to logger
    
                self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))
    
    #            self.logger.info("emptying database")
    #            ScheduleEntry.truncate()
    
                # fetch new programme
    
        # ------------------------------------------------------------------------------------------ #
    
        # ------------------------------------------------------------------------------------------ #
    
        def get_active_entry(self):
    
            now_unix = time.mktime(datetime.datetime.now().timetuple())
            lastentry = None
    
            # load programme if necessary
    
            if self.programme is None:
    
                self.logger.debug("want to get active channel, but have to load programme first")
    
                self.load_programme_from_db()
    
            for show in self.programme:
                for entry in show.playlist:
                    # check if lastentry is set and if act entry is in the future
                    if lastentry is not None and entry.entry_start_unix > now_unix:
                        # return entry if so
                        return (show,entry) # actsource = entry.source
    
            return None
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def load_programme_from_db(self, silent=False):
    
            self.programme = Schedule.select_act_programme()
    
            planned_entries = []
    
            for schedule in self.programme:
                # playlist to play
                schedule.playlist = ScheduleEntry.select_playlist(schedule.playlist_id)
                # show fallback is played when playlist fails
                schedule.showfallback = ScheduleEntry.select_playlist(schedule.show_fallback_id)
                # timeslot fallback is played when show fallback fails
                schedule.timeslotfallback = ScheduleEntry.select_playlist(schedule.timeslot_fallback_id)
                # station fallback is played when timeslot fallback fails
                schedule.stationfallback = ScheduleEntry.select_playlist(schedule.station_fallback_id)
    
                for p in schedule.playlist:
                    planned_entries.append(p)
    
            self.enable_entries(planned_entries)
    
            self.logger.warning(self.print_message_queue())
    
        # ------------------------------------------------------------------------------------------ #
        def enable_entries(self, playlist):
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            # now in unixtime
            now_unix = time.mktime(datetime.datetime.now().timetuple())
    
    
            # switch to check if its the first stream in loaded programme
    
            # old entry for fading out
            old_entry = None
    
    
            for entry in playlist:
                # since we get also programmes from the past, filter these out
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                if entry.entry_start_unix > now_unix:
    
                    # when do we have to start?
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    diff = entry.entry_start_unix - now_unix
    
                    # enable the three timer
                    self.enable_timer(diff, entry, old_entry)
    
                # store the old entry for fading out
                old_entry = entry
    
    
        def enable_timer(self, diff, entry, old_entry):
            # create the activation threads and run them after <diff> seconds
            self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry))
            entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
            self.enable_fading(diff, entry, old_entry)
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def enable_fading(self, diff, new_entry, old_entry):
            # fading times
            fade_out_time = float(self.config.get("fade_out_time"))
    
    
            # enable fading when entry types are different
            if old_entry is not None:
                if old_entry.type != new_entry.type:
    
                    #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry])
                    old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True)
                    self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry))
    
            # same for fadein except old_entry can be None
            if old_entry.type != new_entry.type:
                #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry])
                new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True)
                self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry))
    
        # ------------------------------------------------------------------------------------------ #
    
        def add_or_update_timer(self, diff, func, parameters):
    
            timer = None
            entry = parameters[0]
            planned_timer = self.is_something_planned_at_time(entry.entry_start)
    
            # if something is planned on entry.entry_start
            if planned_timer:
                planned_entry = planned_timer.entry
    
                # check if the playlist_id's are different
                if planned_entry.playlist_id != entry.playlist_id:
                    # if not stop the old timer and remove it from the list
                    self.stop_timer(planned_timer)
    
                    # and create a new one
    
                    timer = self.create_timer(diff, func, parameters, switcher=True)
    
                # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same
    
            # if nothing is planned at given time, create a new timer
    
                timer = self.create_timer(diff, func, parameters, switcher=True)
    
            if timer is None:
                return planned_timer
            return timer
    
    
        # ------------------------------------------------------------------------------------------ #
        def stop_timer(self, timer):
            # stop timer
            timer.cancel()
    
            if timer.entry.fadeintimer is not None:
                timer.entry.fadeintimer.cancel()
                self.message_timer.remove(timer.entry.fadeintimer)
            if timer.entry.fadeouttimer is not None:
                timer.entry.fadeouttimer.cancel()
                self.message_timer.remove(timer.entry.fadeouttimer)
    
    
            self.logger.critical("REMOVED TIMER for " + str(timer.entry))
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False):
            if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
                raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True")
    
            t = CallFunctionTimer(diff, func, parameters, fadein, fadeout, switcher)
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def is_something_planned_at_time(self, given_time):
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
    
        def get_act_programme_as_string(self):
    
            programme_as_string = ""
    
            if self.programme is None or len(self.programme) == 0:
                self.fetch_new_programme()
    
                programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
    
            except Exception as e:
                self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
    
                traceback.print_exc()
    
            return programme_as_string
    
        # ------------------------------------------------------------------------------------------ #
        def print_message_queue(self):
            message_queue = ""
    
            for t in sorted(self.message_timer, key=attrgetter('diff')):
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.info("trying to fetch new programme")
    
            if self.tried_fetching == self.fetch_max:
    
                msg = "Cannot connect to PV or Tank! No Programme loaded!"
                self.logger.error(msg)
    
            acs = AuraCalendarService(self.config)
            queue = acs.get_queue()
    
            if type(response) is dict:
    
                if self.programme is not None and len(self.programme) > 0:
                    self.tried_fetching = 0
    
                if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
                    self.logger.critical("Programme loaded from database has no entries!")
    
                # return self.get_act_programme_as_string()
    
                self.logger.warning("Fetching was being aborted from AuraCalendarService! Reason: " + response)
    
                self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
    
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
            self.logger.critical(str(self.get_active_entry()))
    
                file = "/var/audio/fallback/eins.zwo.bombe.flac"
    
                file = "/var/audio/fallback/ratm.killing.flac"
    
                file = "/var/audio/fallback/weezer.hash.pipe.flac"
    
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
    
            self.logger.info("Set next fallback file for " + playlistname + ": " + file)
            self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
        # ------------------------------------------------------------------------------------------ #
        def get_next_file_for(self, fallbackname):
            self.logger.critical("HAVE TO SET NEXT FILE FOR: " + fallbackname)
            (show, entry) = self.get_active_entry()
            self.logger.critical(str(show) + " " + str(entry))
    
            if fallbackname == "station":
                file = "/var/audio/fallback/eins.zwo.bombe.flac"
            elif fallbackname == "timeslot":
                file = "/var/audio/fallback/ratm.killing.flac"
            elif fallbackname == "show":
                file = "/var/audio/fallback/weezer.hash.pipe.flac"
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!")
    
            #set_next_file_thread = SetNextFile(fallbackname, show)
            #set_next_file_thread.start()
    
    #        self.logger.info("Set next fallback file for " + playlistname + ": " + file)
    #        self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
    # ------------------------------------------------------------------------------------------ #
    class SetNextFile(threading.Thread):
        fallbackname = None
        show = None
    
        def __init__(self, fallbackname, show):
            threading.Thread.__init__(self)
            self.fallbackname = fallbackname
            self.show = show
    
        def run(self):
            if self.fallbackname == "show":
                self.detect_next_file_for(self.show.showfallback)
            elif self.fallbackname == "timeslow":
                self.detect_next_file_for(self.show.timeslotfallback)
            elif self.fallbackname == "station":
                self.detect_next_file_for(self.show.stationfallback)
    
        def detect_next_file_for(self, playlist):
            return ""
            #if playlist.startswith("pool"):
            #    self.find_next_file_in_pool(playlist)
    
        #def find_next_file_in_pool(self, pool):
        #    return ""
    
    
    # ------------------------------------------------------------------------------------------ #
    
    class CallFunctionTimer(threading.Timer):
    
        def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False):
    
            if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
                raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")
    
    
            self.fadein = fadein
            self.fadeout = fadeout
            self.switcher = switcher
    
            self.logger = logging.getLogger("AuraEngine")
    
        # ------------------------------------------------------------------------------------------ #
        def __str__(self):
    
            if self.fadein:
                return "CallFunctionTimer starting in " + str(self.diff) + "s fading in source '" + str(self.entry)
            elif self.fadeout:
                return "CallFunctionTimer starting in " + str(self.diff) + "s fading out source '" + str(self.entry)
            elif self.switcher:
                return "CallFunctionTimer starting in " + str(self.diff) + "s switching to source '" + str(self.entry)
    
                return "CORRUPTED CallFunctionTimer around! How can that be?"