Skip to content
Snippets Groups Projects
scheduler.py 17.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  engine
    
    #  Playout Daemon for autoradio project
    
    #  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
    
    #  This file is part of engine.
    #
    #  engine is free software: you can redistribute it and/or modify
    #  it under the terms of the GNU General Public License as published by
    #  the Free Software Foundation, either version 3 of the License, or
    #  any later version.
    #
    #  engine is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with engine. If not, see <http://www.gnu.org/licenses/>.
    
    __license__ = "GNU General Public License (GPL) Version 3"
    
    __version_info__ = (0, 0, 1)
    __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
    
    Is holding the eventqueue
    
    """
    import time
    import simplejson
    import datetime
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    import traceback
    
    from modules.communication.redis.messenger import RedisMessenger
    
    from modules.scheduling.calendar import AuraCalendarService
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
    
    from libraries.exceptions.exception_logger import ExceptionLogger
    
    from libraries.enum.auraenumerations import ScheduleEntryType
    
    def alchemyencoder(obj):
        """JSON encoder function for SQLAlchemy special classes."""
        if isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
    
        elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
            return ""
    
            return simplejson.dumps([obj._asdict()], default=alchemyencoder)
    
    # ------------------------------------------------------------------------------------------ #
    
    class AuraScheduler(ExceptionLogger, threading.Thread):
    
        """
        Aura Scheduler Class
        Gets data from pv and importer, stores and fires events
        """
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        redismessenger = RedisMessenger()
    
    
        liquidsoapcommunicator = None
        schedule_entries = None
        active_entry = None
    
            @param   config:               read engine.ini
    
            self.config = config
            self.logger = logging.getLogger("AuraEngine")
    
            # init threading
            threading.Thread.__init__(self)
    
            # init messenger.. probably not needed anymore
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.set_channel('scheduler')
            self.redismessenger.set_section('execjob')
    
            error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
    
            f = open(error_file)
            self.error_data = simplejson.load(f)
            f.close()
    
    
            # init database ?
            self.init_database()
    
    
            #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
    
            # start loading new programm every hour
            self.start()
    
    
        # ------------------------------------------------------------------------------------------ #
        def init_database(self):
            # check if tables do exist. if not create them
            try:
                ScheduleEntry.select_all()
            except sqlalchemy.exc.ProgrammingError as e:
    
                errcode = e.orig.args[0]
    
                if errcode == 1146: # error for no such table
                    x = AuraDatabaseModel()
                    x.recreate_db()
    
        # ------------------------------------------------------------------------------------------ #
    
                # set seconds to wait
                seconds_to_wait = int(self.config.get("fetching_frequency"))
    
    
                # calc next time
                next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
    
                # write to logger
    
                self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))
    
    #            self.logger.info("emptying database")
    #            ScheduleEntry.truncate()
    
                # fetch new programme
    
        # ------------------------------------------------------------------------------------------ #
    
        # ------------------------------------------------------------------------------------------ #
    
        def get_active_entry(self):
    
            now_unix = time.mktime(datetime.datetime.now().timetuple())
            lastentry = None
    
            # load programme if necessary
    
            if self.programme is None:
    
                self.logger.debug("want to get active channel, but have to load programme first")
    
                self.load_programme_from_db()
    
            for entry in self.programme:
                # check if lastentry is set and if act entry is in the future
    
                if lastentry is not None and entry.entry_start_unix > now_unix:
    
                    return entry # actsource = entry.source
    
            return None
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def load_programme_from_db(self, silent=False):
    
            #self.programme = ScheduleEntry.select_all()
            self.programme = ScheduleEntry.select_act_programme()
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            # now in unixtime
            now_unix = time.mktime(datetime.datetime.now().timetuple())
    
    
            # switch to check if its the first stream in loaded programme
            first_stream_in_programme = False
    
    
            # fading times
            fade_in_time = float(self.config.get("fade_in_time"))
            fade_out_time = float(self.config.get("fade_out_time"))
    
            # old entry for fading out
            old_entry = None
    
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            for entry in self.programme:
                # since we get also programmes from act hour, filter these out
                if entry.entry_start_unix > now_unix:
    
                    # when do we have to start?
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    diff = entry.entry_start_unix - now_unix
    
                    diff = diff/1000 # testing purpose
    
    
                    self.enable_fading(fade_in_time, fade_out_time, old_entry, entry, diff)
    
                    # create the activation threads and run them after <diff> seconds
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    if entry.source.startswith("linein"):
    
                        self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
    
                    elif entry.type == ScheduleEntryType.STREAM:
    
                        if first_stream_in_programme:
                            self.liquidsoapcommunicator.next_stream_source(entry.source)
                            first_stream_in_programme = False
    
    
                        self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
    
                    elif entry.type == ScheduleEntryType.FILESYSTEM:
    
                        self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
    
                        self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry))
                        self.logger.warning("         Not setting any activation Thread!")
    
                # store the old entry for fading out
                old_entry = entry
    
            self.logger.warning(self.print_message_queue())
    
        # ------------------------------------------------------------------------------------------ #
        def enable_fading(self, fade_in_time, fade_out_time, old_entry, new_entry, diff):
            # enable fading when entry types are different
            if old_entry is not None:
                if old_entry.type != new_entry.type:
                    # enable fadeout if enabled
                    if fade_out_time != 0 and old_entry is not None:
                        params = [old_entry, -fade_out_time]
                        self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, params)
    
                    # same for fadein
                    if fade_in_time != 0:
                        params = [new_entry, fade_in_time]
                        self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, params)
    
        # ------------------------------------------------------------------------------------------ #
    
        def add_or_update_timer(self, diff, func, parameters):
            length = len(parameters)
    
            if length == 2:
                entry = parameters[0]
                fadingtime = parameters[1]
            elif length == 1:
                entry = parameters[0]
                fadingtime = 0
    
    
            # check if something is planned at given time
    
            if fadingtime < 0:
                planned_timer = self.is_something_planned_at_time(entry.entry_start + datetime.timedelta(seconds=fadingtime))
            else:
                planned_timer = self.is_something_planned_at_time(entry.entry_start)
    
            # if something is planned on entry.entry_start
            if planned_timer:
                planned_entry = planned_timer.entry
    
                # check if the playlist_id's are different
                if planned_entry.playlist_id != entry.playlist_id:
                    # if not stop the old timer and remove it from the list
                    self.stop_timer(planned_timer)
    
                    # and create a new one
    
                    self.create_timer(diff, func, parameters)
    
                # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same
    
            # if nothing is planned at given time, create a new timer
    
                self.create_timer(diff, func, parameters)
    
    
        # ------------------------------------------------------------------------------------------ #
        def stop_timer(self, timer):
            # stop timer
            timer.cancel()
    
            self.message_timer.remove(timer)
    
        # ------------------------------------------------------------------------------------------ #
    
        def create_timer(self, diff, func, parameters):
            t = CallFunctionTimer(diff, func, parameters)
    
            self.message_timer.append(t)
            t.start()
    
        # ------------------------------------------------------------------------------------------ #
    
        def is_something_planned_at_time(self, given_time):
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
    
        def get_act_programme_as_string(self):
    
            programme_as_string = ""
    
            if self.programme is None or len(self.programme) == 0:
                self.fetch_new_programme()
    
                programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
    
            except Exception as e:
                self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
    
                traceback.print_exc()
    
            return programme_as_string
    
        # ------------------------------------------------------------------------------------------ #
        def print_message_queue(self):
            message_queue = ""
            for t in self.message_timer:
    
        # ------------------------------------------------------------------------------------------ #
    
        def swap_playlist_entries(self, indexes):
            from_entry = None
            to_entry = None
            from_idx = indexes["from_index"]
            to_idx = indexes["to_index"]
    
    
            # find the entries
    
            for p in self.programme:
                if p.programme_index == int(from_idx):
                    from_entry = p
    
                if p.programme_index == int(to_idx):
                    to_entry = p
    
    
                # break out of loop, if both entries found
    
                if from_entry is not None and to_entry is not None:
                    break
    
    
            # check if entries are found
    
            if from_entry is None or to_entry is None:
                return "From or To Entry not found!"
    
    
            # swap sources
    
            from_entry.source = to_entry.source
    
            # store to database
    
            from_entry.store(add=False, commit=False)
            to_entry.store(add=False, commit=True)
    
            # and return the programme with swapped entries
    
            return self.get_act_programme_as_string()
    
        # ------------------------------------------------------------------------------------------ #
        def delete_playlist_entry(self, index):
            found = False
    
            for p in self.programme:
                if p.programme_index == int(index):
                    p.delete(True)
                    self.load_programme_from_db()
                    found = True
                    break
    
            if not found:
    
    
            return self.get_act_programme_as_string()
    
    
        # ------------------------------------------------------------------------------------------ #
        def insert_playlist_entry(self, fromtime_source):
            fromtime = fromtime_source["fromtime"]
            source = fromtime_source["source"]
    
            entry = ScheduleEntry()
            entry.entry_start = fromtime
            entry.source = source
            entry.playlist_id = 0
            entry.schedule_id = 0
            entry.entry_num = ScheduleEntry.select_next_manual_entry_num()
    
    
            entry.store(add=True, commit=True)
    
            self.load_programme_from_db()
    
            return self.get_act_programme_as_string()
    
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.info("trying to fetch new programme")
    
            if self.tried_fetching == self.fetch_max:
    
                msg = "Cannot connect to PV or Tank! No Programme loaded!"
                self.logger.error(msg)
    
            acs = AuraCalendarService(self.config)
            queue = acs.get_queue()
    
            if type(response) is dict:
    
                if self.programme is not None and len(self.programme) > 0:
                    self.tried_fetching = 0
    
                if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
                    self.logger.critical("Programme loaded from database has no entries!")
    
                # return self.get_act_programme_as_string()
    
            elif response.startswith("fetching_aborted"):
                self.logger.warning("Fetching was being aborted from AuraCalendarService! Are you connected? Reason: " + response)
    
                self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
    
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
            self.logger.critical(str(self.get_active_entry()))
    
            if playlistname == "station":
                file = "/var/audio/fallback/eins.zwo.bombe.mp3"
            elif playlistname == "timeslot":
                file = "/var/audio/fallback/ratm.killing.mp3"
            elif playlistname == "show":
                file = "/var/audio/fallback/weezer.hash.pipe.mp3"
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
    
            self.logger.info("Set next fallback file for " + playlistname + ": " + file)
            self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
    # ------------------------------------------------------------------------------------------ #
    
    class CallFunctionTimer(threading.Timer):
    
            self.logger = logging.getLogger("AuraEngine")
    
        # ------------------------------------------------------------------------------------------ #
        def __str__(self):
            if len(self.param) >= 2:
                # fading in
                if self.param[1] > 0:
                    return "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " fading in source '" + str(self.entry.source) + "' in seconds: " + str(self.diff)
                # fading out
                else:
                    return "CallFunctionTimer starting @ " + str(self.entry.entry_start + datetime.timedelta(seconds=self.param[1])) + " fading out source '" + str(self.entry.source) + "' in seconds: " + str(self.diff+self.param[1])
            else:
                return "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " switching to source '" + str(self.entry.source) + "' in seconds: " + str(self.diff)