Skip to content
Snippets Groups Projects 20.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • #       Copyright 2018 Radio FRO <>, Radio Helsinki <>, Radio Orange <>
    #       This program is free software; you can redistribute it and/or modify
    #       it under the terms of the GNU General Public License as published by
    #       the Free Software Foundation; Version 3 of the License
    #       This program is distributed in the hope that it will be useful,
    #       but WITHOUT ANY WARRANTY; without even the implied warranty of
    #       GNU General Public License for more details.
    #       You should have received a copy of the GNU General Public License
    #       along with this program; if not, the license can be downloaded here:
    # Meta
    __license__ = "GNU General Public License (GPL) Version 3"
    __version_info__ = (0, 0, 1)
    __author__ = 'Gottfried Gaisbauer <>'
    Is holding the eventqueue
    import signal
    import pyev
    import os
    import os.path
    import time
    import simplejson
    import datetime
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    import traceback
    from datetime import timedelta
    from dateutil.relativedelta import relativedelta
    import logging
    from glob import glob
    import threading
    # Die eigenen Bibliotheken
    from libraries.base.schedulerconfig import AuraSchedulerConfig
    from modules.communication.redis.messenger import RedisMessenger
    from libraries.base.calendar import AuraCalendarService
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
    from libraries.exceptions.auraexceptions import NoProgrammeLoadedException
    from libraries.exceptions.exception_logger import ExceptionLogger
    def alchemyencoder(obj):
        """JSON encoder function for SQLAlchemy special classes."""
        if isinstance(obj,
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
        elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
            return ""
            return simplejson.dumps([obj._asdict()], default=alchemyencoder)
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
    Gets data from pv and importer, stores and fires events, 
    Liefert Start und Stop Jobs an den Comba Controller, lädt XML-Playlisten und räumt auf 
    class AuraScheduler(ExceptionLogger):
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        redismessenger = RedisMessenger()
        liquidsoapcommunicator = None
            @type    config:               ConfigReader
            @param   config:               read aura.ini
            self.auraconfig = config
            self.debug = config.get("debug")
            # Messenger für Systemzustände initieren
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.set_mail_addresses(self.auraconfig.get('frommail'), self.auraconfig.get('adminmail'))
            self.schedulerconfig = self.auraconfig.get("scheduler_config_file")
            # Die Signale, die Abbruch signalisieren
            self.stopsignals = (signal.SIGTERM, signal.SIGINT)
            # das pyev Loop-Object
            self.loop = pyev.default_loop()
            # Das ist kein Reload
            self.initial = True
            # Der Scheduler wartet noch auf den Start Befehl
            self.ready = False
            self.scriptdir = os.path.dirname(os.path.abspath(__file__)) + '/..'
            #errors_file = os.path.dirname(os.path.realpath(__file__)) + '/error/scheduler_error.js'
            json_data = open(self.auraconfig.get("install_dir") + "/errormessages/scheduler_error.js")
            self.errorData = simplejson.load(json_data)
            # init database ?
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
        # ------------------------------------------------------------------------------------------ #
        def init_database(self):
            # check if tables do exist. if not create them
            except sqlalchemy.exc.ProgrammingError as e:
                if e.__dict__["code"] == "f405":
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
    #    def set(self, key, value):
    #        """
    #        Eine property setzen
    #        @type    key: string
    #        @param   key: Der Key
    #        @type    value: mixed
    #        @param   value: Beliebiger Wert
    #        """
    #        self.__dict__[key] = value
        # ------------------------------------------------------------------------------------------ #
     #   def get(self, key, default=None):
     #       """
     #       Eine property holen
     #       @type    key: string
     #       @param   key: Der Key
     #       @type    default: mixed
     #       @param   default: Beliebiger Wert#
    #        """
     #       if key not in self.__dict__:
    #            if default:
    #                self.set(key, default)
    #            else:
    #                return None
    #        return self.__dict__[key]
        # ------------------------------------------------------------------------------------------ #
            Reload Scheduler - Config neu einlesen
            # Scheduler Config neu laden
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                self.redismessenger.send('Scheduler reloaded by user', '0500', 'success', 'reload', None, 'appinternal')
        # ------------------------------------------------------------------------------------------ #
        def get_active_source(self):
            now_unix = time.mktime(
            actsource = ""
            if self.programme is None:
                print("want to get active channel, but have to load programme first")
            for entry in self.programme:
                # check if lastentry is set and if act entry is in the future
                if lastentry is not None and entry.entry_start_unix > now_unix:
                    # return lastentry if so
                    actsource = entry.source
            if actsource.startswith("file") or actsource.startswith("pool") or actsource.startswith("playlist"):
                print("AuraScheduler found upcoming source '" + str(entry.__dict__) + "'! returning: fs")
            elif actsource.startswith("http"):
                print("AuraScheduler found upcoming source '" + str(entry.__dict__) + "'! returning: http")
            elif actsource.startswith("linein"):
                print("AuraScheduler found upcoming source '" + str(entry.__dict__) + "'! returning: linein")
        # ------------------------------------------------------------------------------------------ #
        def load_programme_from_db(self, silent=False):
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.programme = ScheduleEntry.select_all()
            if not silent:
                print("i am the scheduler and i am holding the following stuff")
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            # now in unixtime
            now_unix = time.mktime(
            # switch to check if its the first stream in loaded programme
            first_stream_in_programme = False
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            for entry in self.programme:
                # since we get also programmes from act hour, filter these out
                if entry.entry_start_unix > now_unix:
                    # when do we have to start?
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    diff = entry.entry_start_unix - now_unix
                    # create the activation threads and run them after <diff> seconds
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                    if entry.source.startswith("linein"):
                        self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate, "linein")
                    elif entry.source.startswith("http"):
                        if first_stream_in_programme:
                            first_stream_in_programme = False
                        self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate, "http")
                    elif entry.source.startswith("file"):
                        self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate, "fs")
                        print("WARNING: Cannot understand source '" + entry.source + "' from " + str(entry.__dict__))
                        print("         Not setting any activation Thread!")
                if not silent:
        # ------------------------------------------------------------------------------------------ #
        def add_or_update_timer(self, entry, diff, func, type):
            # check if something is planned at given time
            planned_timer = self.is_something_planned_at_time(entry.entry_start)
            # if something is planned on entry.entry_start
            if planned_timer:
                planned_entry = planned_timer.entry
                # check if the playlist_id's are different
                if planned_entry.playlist_id != entry.playlist_id:
                    # if not stop the old timer and remove it from the list
                    # and create a new one
                    self.create_timer(entry, diff, func, type)
                # if the playlist id's do not differ => do nothing, they are the same
            # if nothing is planned at given time, create a new timer
                self.create_timer(entry, diff, func, type)
        # ------------------------------------------------------------------------------------------ #
        def stop_timer(self, timer):
            # stop timer
        # ------------------------------------------------------------------------------------------ #
        def create_timer(self, entry, diff, func, type):
            t = MessageTimer(diff, func, [entry, type], self.debug)
        # ------------------------------------------------------------------------------------------ #
        def is_something_planned_at_time(self, given_time):
                    return t
            return False
        # ------------------------------------------------------------------------------------------ #
        def find_entry_in_timers(self, entry):
            # check if a playlist id is already planned
            for t in self.message_timer:
                if t.entry.playlist_id == entry.playlist_id and t.entry.entry_start == entry.entry_start:
                    return t
            return False
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
        def get_act_programme_as_string(self):
            programme_as_string = ""
            if self.programme is None:
                raise NoProgrammeLoadedException("")
                programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
            return programme_as_string
        # ------------------------------------------------------------------------------------------ #
        def print_message_queue(self):
            message_queue = ""
            for t in self.message_timer:
                message_queue += t.get_info()+"\n"
            return message_queue
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        # ------------------------------------------------------------------------------------------ #
        # ------------------------------------------------------------------------------------------ #
        def swap_playlist_entries(self, indexes):
            from_entry = None
            to_entry = None
            from_idx = indexes["from_index"]
            to_idx = indexes["to_index"]
            # find the entries
            for p in self.programme:
                if p.programme_index == int(from_idx):
                    from_entry = p
                if p.programme_index == int(to_idx):
                    to_entry = p
                # break out of loop, if both entries found
                if from_entry is not None and to_entry is not None:
            # check if entries are found
            if from_entry is None or to_entry is None:
                return "From or To Entry not found!"
            # swap sources
            swap_source = from_entry.source
            from_entry.source = to_entry.source
            to_entry.source = swap_source
            # store to database
  , commit=False)
  , commit=True)
            # and return the programme with swapped entries
            return self.get_act_programme_as_string()
        # ------------------------------------------------------------------------------------------ #
        def delete_playlist_entry(self, index):
            found = False
            for p in self.programme:
                if p.programme_index == int(index):
                    found = True
            if not found:
                print("WARNING: Nothing to delete")
            return self.get_act_programme_as_string()
        # ------------------------------------------------------------------------------------------ #
        def insert_playlist_entry(self, fromtime_source):
            fromtime = fromtime_source["fromtime"]
            source = fromtime_source["source"]
            entry = ScheduleEntry()
            entry.entry_start = fromtime
            entry.source = source
            entry.playlist_id = 0
            entry.schedule_id = 0
            entry.entry_num = ScheduleEntry.select_next_manual_entry_num()
  , commit=True)
            return self.get_act_programme_as_string()
        # ------------------------------------------------------------------------------------------ #
            Scheduler-Config importieren
            @rtype:   boolean
            @return:  True/False
            # Wenn das Scheduling bereits läuft, muss der Scheduler nicht unbedingt angehalten werden
            error_type = 'fatal' if self.initial else 'error'
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                self.redismessenger.send('Config is broken', '0301', error_type, 'loadConfig', None, 'config')
                if self.initial:
                    self.ready = False
                return False
                # Fehlermeldung senden, wenn keine Jobs gefunden worden sind
            if len(watcher_jobs) == 0:
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                self.redismessenger.send('No Jobs found in Config', '0302', error_type, 'loadConfig', None, 'config')
            error_type = 'fatal' if self.initial else 'error'
                # Das scheduler.xml laden
                self.schedulerconfig = AuraSchedulerConfig(self.schedulerconfig)
                # Das scheint kein gültiges XML zu sein
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                self.redismessenger.send('Config is broken', '0301', error_type, 'loadConfig', None, 'config')
                # Wenn das beim Start passiert können wir nix tun
                if self.initial:
                    self.ready = False
                return False
            jobs = self.schedulerconfig.getJobs()
            for job in jobs:
                if job['job'] == 'start_recording' or job['job'] == 'play_playlist':
            return jobs
        # -----------------------------------------------------------------------#
             job = {}
             job['job'] = 'stop_playlist' if startjob['job'] == 'play_playlist' else 'stop_recording'
             if startjob['day'] == 'all':
                 job['day'] = startjob['day']
                 if startjob['time'] < startjob['until']:
                     job['day'] = startjob['day']
                         day = int(startjob['day'])
                         stopday = 0 if  day > 5 else day+1
                         job['day'] = str(stopday)
                        job['day'] = 'all'
             job['time'] = startjob['until']
             return job
        # ------------------------------------------------------------------------------------------ #
        def start(self):
            Event Loop starten
            # Alle watcher starten
            for watcher in self.watchers:
            logging.debug("{0}: started".format(self))
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                self.redismessenger.send("Loop did'nt start", '0302', 'fatal', 'appstart', None, 'appinternal')
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
                self.redismessenger.send("Scheduler started", '0100', 'success', 'appstart', None, 'appinternal')
        # ------------------------------------------------------------------------------------------ #
        def stop(self):
            Event Loop stoppen
            # alle watchers stoppen und entfernen
            while self.watchers:
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.send("Loop stopped", '0400', 'success', 'appstart', None, 'appinternal')
        # ------------------------------------------------------------------------------------------ #
        def signal_cb(self, loop, revents):
            Signalverarbeitung bei Abbruch
            @type  loop: object
            @param loop: Das py_ev loop Objekt
            @type  revents: object
            @param revents: Event Callbacks
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
            self.redismessenger.send("Received stop signal", '1100', 'success', 'appstop', None, 'appinternal')
        # ------------------------------------------------------------------------------------------ #
        def signal_reload(self, loop, revents):
            Lädt Scheduling-Konfiguration neu bei Signal SIGUSR1
            @type  loop: object
            @param loop: Das py_ev loop Objekt
            @type  revents: object
            @param revents: Event Callbacks
            self.redismessenger.send("Comba Scheduler gracefully restarted", '1200', 'success', 'appreload', None, 'appinternal')
        # ------------------------------------------------------------------------------------------ #
        def load_playlist(self, data=None):
            Playlist laden
            store = AuraCalendarService()
            # wait until childs thread returns
            data = {}
            data['uri'] = uri
            result = self.client.playlist_load(uri)
                self.success('load_playlist', data, '00')
                self.error('load_playlist', data, '02')
        # ------------------------------------------------------------------------------------------ #
        def start_recording(self, data):
            Aufnahme starten
            result = self.client.recorder_start()
    #        store = AuraCalendarService()
    #        self._preparePlaylistStore(store,, data)
    #        uri = store.getUri()
    #        store.start()
                self.success('start_recording', result, '00')
                self.error('start_recording', result, '01')
        # ------------------------------------------------------------------------------------------ #
        def stop_recording(self, data):
            Aufnahme anhalten
            result = self.client.recorder_stop()
                self.success('stop_recording', result, '00')
                self.error('stop_recording', result, '01')
    Gottfried Gaisbauer's avatar
    Gottfried Gaisbauer committed
        def __init__(self, diff, func, param, debug=False):
            if self.debug:
                print("MessageTimer starting @ " + str(self.entry.entry_start) + " source '" + str(self.entry.source) + "' In seconds: " + str(self.diff))
            return "Calling " + str(self.func) + " @ " + str(self.entry.entry_start)