Skip to content
Snippets Groups Projects
scheduler.py 15.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  engine
    
    #  Playout Daemon for autoradio project
    
    #  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
    
    #  This file is part of engine.
    #
    #  engine is free software: you can redistribute it and/or modify
    #  it under the terms of the GNU General Public License as published by
    #  the Free Software Foundation, either version 3 of the License, or
    #  any later version.
    #
    #  engine is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with engine. If not, see <http://www.gnu.org/licenses/>.
    
    __license__ = "GNU General Public License (GPL) Version 3"
    
    __version_info__ = (0, 0, 1)
    __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
    
    Is holding the eventqueue
    
    """
    import time
    import simplejson
    import datetime
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    import traceback
    
    import logging
    import threading
    
    # Die eigenen Bibliotheken
    
    from modules.communication.redis.messenger import RedisMessenger
    
    from modules.scheduling.calendar import AuraCalendarService
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
    
    from libraries.exceptions.exception_logger import ExceptionLogger
    
    from libraries.enum.auraenumerations import ScheduleEntryType
    
    def alchemyencoder(obj):
        """JSON encoder function for SQLAlchemy special classes."""
        if isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
    
        elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
            return ""
    
            return simplejson.dumps([obj._asdict()], default=alchemyencoder)
    
    # ------------------------------------------------------------------------------------------ #
    
    class AuraScheduler(ExceptionLogger, threading.Thread):
    
        """
        Aura Scheduler Class
        Gets data from pv and importer, stores and fires events
        """
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        redismessenger = RedisMessenger()
    
    
        liquidsoapcommunicator = None
        schedule_entries = None
        active_entry = None
    
            @param   config:               read engine.ini
    
            self.config = config
            self.logger = logging.getLogger("AuraEngine")
    
            # init threading
            threading.Thread.__init__(self)
    
            # init messenger.. probably not needed anymore
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.set_channel('scheduler')
            self.redismessenger.set_section('execjob')
    
            self.schedulerconfig = self.config.get("scheduler_config_file")
    
            error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
    
            f = open(error_file)
            self.error_data = simplejson.load(f)
            f.close()
    
    
            # init database ?
            self.init_database()
    
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
    
            # start loading new programm every hour
            self.start()
    
    
        # ------------------------------------------------------------------------------------------ #
        def init_database(self):
            # check if tables do exist. if not create them
            try:
                ScheduleEntry.select_all()
            except sqlalchemy.exc.ProgrammingError as e:
    
                errcode = e.orig.args[0]
    
                if errcode == 1146: # error for no such table
                    x = AuraDatabaseModel()
                    x.recreate_db()
    
        # ------------------------------------------------------------------------------------------ #
    
            # set seconds to wait
    
            seconds_to_wait = int(self.config.get("fetching_frequency"))
    
                # calc next time
                next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
    
                # write to logger
    
                self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))
    
    #            self.logger.info("emptying database")
    #            ScheduleEntry.truncate()
    
                # fetch new programme
    
        # ------------------------------------------------------------------------------------------ #
    
        # ------------------------------------------------------------------------------------------ #
    
        def get_active_entry(self):
    
            now_unix = time.mktime(datetime.datetime.now().timetuple())
            lastentry = None
    
            # load programme if necessary
    
            if self.programme is None:
    
                self.logger.debug("want to get active channel, but have to load programme first")
    
                self.load_programme_from_db()
    
            for entry in self.programme:
                # check if lastentry is set and if act entry is in the future
    
                if lastentry is not None and entry.entry_start_unix > now_unix:
    
                    # return lastentry if so
    
                    return entry # actsource = entry.source
    
            return None
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def load_programme_from_db(self, silent=False):
    
            #self.programme = ScheduleEntry.select_all()
            self.programme = ScheduleEntry.select_act_programme()
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            # now in unixtime
            now_unix = time.mktime(datetime.datetime.now().timetuple())
    
    
            # switch to check if its the first stream in loaded programme
            first_stream_in_programme = False
    
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            for entry in self.programme:
                # since we get also programmes from act hour, filter these out
                if entry.entry_start_unix > now_unix:
    
                    # when do we have to start?
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    diff = entry.entry_start_unix - now_unix
    
                    # create the activation threads and run them after <diff> seconds
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    if entry.source.startswith("linein"):
    
                        self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)
    
                    elif entry.type == ScheduleEntryType.STREAM:
    
                        if first_stream_in_programme:
                            self.liquidsoapcommunicator.next_stream_source(entry.source)
                            first_stream_in_programme = False
    
    
                        self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)
    
                    elif entry.type == ScheduleEntryType.FILESYSTEM:
    
                        self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)
    
                        self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry))
                        self.logger.warning("         Not setting any activation Thread!")
    
        # ------------------------------------------------------------------------------------------ #
    
        def add_or_update_timer(self, entry, diff, func):
    
            # check if something is planned at given time
            planned_timer = self.is_something_planned_at_time(entry.entry_start)
    
    
            # if something is planned on entry.entry_start
            if planned_timer:
                planned_entry = planned_timer.entry
    
                # check if the playlist_id's are different
                if planned_entry.playlist_id != entry.playlist_id:
                    # if not stop the old timer and remove it from the list
                    self.stop_timer(planned_timer)
    
                    # and create a new one
    
                    self.create_timer(entry, diff, func)
    
                # if the playlist id's do not differ => do nothing, they are the same
    
            # if nothing is planned at given time, create a new timer
    
                self.create_timer(entry, diff, func)
    
    
        # ------------------------------------------------------------------------------------------ #
        def stop_timer(self, timer):
            # stop timer
            timer.cancel()
    
            self.message_timer.remove(timer)
    
        # ------------------------------------------------------------------------------------------ #
    
        def create_timer(self, entry, diff, func):
    
            t = CallFunctionTimer(diff, func, [entry])
    
            self.message_timer.append(t)
            t.start()
    
        # ------------------------------------------------------------------------------------------ #
    
        def is_something_planned_at_time(self, given_time):
    
                    return t
            return False
    
        # ------------------------------------------------------------------------------------------ #
        def find_entry_in_timers(self, entry):
    
            # check if a playlist id is already planned
    
            for t in self.message_timer:
                if t.entry.playlist_id == entry.playlist_id and t.entry.entry_start == entry.entry_start:
                    return t
            return False
    
    
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
    
        def get_act_programme_as_string(self):
    
            programme_as_string = ""
    
            if self.programme is None or len(self.programme) == 0:
                self.fetch_new_programme()
    
                programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
    
            except Exception as e:
                self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
    
                traceback.print_exc()
    
            return programme_as_string
    
        # ------------------------------------------------------------------------------------------ #
        def print_message_queue(self):
            message_queue = ""
            for t in self.message_timer:
                message_queue += t.get_info()+"\n"
    
            return message_queue
    
    
        # ------------------------------------------------------------------------------------------ #
    
        def swap_playlist_entries(self, indexes):
            from_entry = None
            to_entry = None
            from_idx = indexes["from_index"]
            to_idx = indexes["to_index"]
    
    
            # find the entries
    
            for p in self.programme:
                if p.programme_index == int(from_idx):
                    from_entry = p
    
                if p.programme_index == int(to_idx):
                    to_entry = p
    
    
                # break out of loop, if both entries found
    
                if from_entry is not None and to_entry is not None:
                    break
    
    
            # check if entries are found
    
            if from_entry is None or to_entry is None:
                return "From or To Entry not found!"
    
    
            # swap sources
    
            from_entry.source = to_entry.source
    
            # store to database
    
            from_entry.store(add=False, commit=False)
            to_entry.store(add=False, commit=True)
    
            # and return the programme with swapped entries
    
            return self.get_act_programme_as_string()
    
        # ------------------------------------------------------------------------------------------ #
        def delete_playlist_entry(self, index):
            found = False
    
            for p in self.programme:
                if p.programme_index == int(index):
                    p.delete(True)
                    self.load_programme_from_db()
                    found = True
                    break
    
            if not found:
    
    
            return self.get_act_programme_as_string()
    
    
        # ------------------------------------------------------------------------------------------ #
        def insert_playlist_entry(self, fromtime_source):
            fromtime = fromtime_source["fromtime"]
            source = fromtime_source["source"]
    
            entry = ScheduleEntry()
            entry.entry_start = fromtime
            entry.source = source
            entry.playlist_id = 0
            entry.schedule_id = 0
            entry.entry_num = ScheduleEntry.select_next_manual_entry_num()
    
    
            entry.store(add=True, commit=True)
    
            self.load_programme_from_db()
    
            return self.get_act_programme_as_string()
    
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.info("trying to fetch new programme")
    
            if self.tried_fetching == self.fetch_max:
    
                msg = "Cannot connect to PV or Tank! No Programme loaded!"
                self.logger.error(msg)
    
            acs = AuraCalendarService(self.config)
            queue = acs.get_queue()
    
            if type(response) is dict:
    
                if self.programme is not None and len(self.programme) > 0:
                    self.tried_fetching = 0
    
                if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
                    self.logger.critical("Programme loaded from database has no entries!")
    
                # return self.get_act_programme_as_string()
    
            elif response.startswith("fetching_aborted"):
                self.logger.warning("Fetching was being aborted from AuraCalendarService! Are you connected? Reason: " + response)
    
                self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
    
    
        # ------------------------------------------------------------------------------------------ #
    
            self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
            self.logger.critical(str(self.get_active_entry()))
    
            if playlistname == "station":
                file = "/var/audio/fallback/eins.zwo.bombe.mp3"
            elif playlistname == "timeslot":
                file = "/var/audio/fallback/ratm.killing.mp3"
            elif playlistname == "show":
                file = "/var/audio/fallback/weezer.hash.pipe.mp3"
            else:
                file = ""
                self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
    
            self.logger.info("Set next fallback file for " + playlistname + ": " + file)
            self.redismessenger.set_next_file_for(playlistname, file)
            return file
    
    # ------------------------------------------------------------------------------------------ #
    
    class CallFunctionTimer(threading.Timer):
    
            self.logger = logging.getLogger("AuraEngine")
    
            msg = "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " source '" + str(self.entry.source) + "' In seconds: " + str(self.diff)