# -*- coding: utf-8 -*- # # scheduler.py # # Copyright 2018 Radio FRO <https://fro.at>, Radio Helsinki <https://helsinki.at>, Radio Orange <https://o94.at> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; Version 3 of the License # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, the license can be downloaded here: # # http://www.gnu.org/licenses/gpl.html # Meta __version__ = '0.0.1' __license__ = "GNU General Public License (GPL) Version 3" __version_info__ = (0, 0, 1) __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>' """ Comba Scheduler Klasse Is holding the eventqueue """ import signal import pyev import os import os.path import time import simplejson import datetime import decimal import traceback import sqlalchemy import MySQLdb import sys from datetime import timedelta from dateutil.relativedelta import relativedelta import logging from glob import glob import threading # Die eigenen Bibliotheken from libraries.base.schedulerconfig import AuraSchedulerConfig from modules.communication.redis.messenger import RedisMessenger from libraries.base.calendar import AuraCalendarService from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel from libraries.exceptions.auraexceptions import NoProgrammeLoadedException from libraries.exceptions.exception_logger import ExceptionLogger from libraries.enum.scheduleentrytype import ScheduleEntryType def alchemyencoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, decimal.Decimal): return float(obj) elif isinstance(obj, sqlalchemy.orm.state.InstanceState): return "" elif isinstance(obj, Schedule): return simplejson.dumps([obj._asdict()], default=alchemyencoder) else: return str(obj) """ Aura Scheduler Class Gets data from pv and importer, stores and fires events, Liefert Start und Stop Jobs an den Comba Controller, lädt XML-Playlisten und räumt auf """ class AuraScheduler(ExceptionLogger): redismessenger = RedisMessenger() liquidsoapcommunicator = None schedule_entries = None message_timer = [] schedulerconfig = None job_result = {} programme = None client = None debug = False active_entry = None def __init__(self, config): """ Constructor @type config: ConfigReader @param config: read aura.ini """ # Model.recreate_db(True) self.auraconfig = config self.debug = config.get("debug") # Messenger für Systemzustände initieren self.redismessenger.set_channel('scheduler') self.redismessenger.set_section('execjob') self.redismessenger.set_mail_addresses(self.auraconfig.get('frommail'), self.auraconfig.get('adminmail')) self.schedulerconfig = self.auraconfig.get("scheduler_config_file") # Die Signale, die Abbruch signalisieren self.stopsignals = (signal.SIGTERM, signal.SIGINT) # das pyev Loop-Object self.loop = pyev.default_loop() # Das ist kein Reload self.initial = True # Der Scheduler wartet noch auf den Start Befehl self.ready = False # Die Config laden # self.__load_config__() self.scriptdir = os.path.dirname(os.path.abspath(__file__)) + '/..' #errors_file = os.path.dirname(os.path.realpath(__file__)) + '/error/scheduler_error.js' error_file = self.auraconfig.get("install_dir") + "/errormessages/scheduler_error.js" self.error_data = simplejson.load(open(error_file)) # init database ? self.init_database() self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal') # ------------------------------------------------------------------------------------------ # def init_database(self): # check if tables do exist. if not create them try: ScheduleEntry.select_all() except sqlalchemy.exc.ProgrammingError as e: if e.__dict__["code"] == "f405": AuraDatabaseModel.recreate_db() else: raise # ------------------------------------------------------------------------------------------ # # def set(self, key, value): # """ # Eine property setzen # @type key: string # @param key: Der Key # @type value: mixed # @param value: Beliebiger Wert # """ # self.__dict__[key] = value # ------------------------------------------------------------------------------------------ # # def get(self, key, default=None): # """ # Eine property holen # @type key: string # @param key: Der Key # @type default: mixed # @param default: Beliebiger Wert# # # """ # if key not in self.__dict__: # if default: # self.set(key, default) # else: # return None # return self.__dict__[key] # ------------------------------------------------------------------------------------------ # def reload_config(self): """ Reload Scheduler - Config neu einlesen """ self.stop() # Scheduler Config neu laden if self.__load_config__(): self.redismessenger.send('Scheduler reloaded by user', '0500', 'success', 'reload', None, 'appinternal') self.start() # ------------------------------------------------------------------------------------------ # def get_active_entry(self): now_unix = time.mktime(datetime.datetime.now().timetuple()) actsource = "" lastentry = None # load programme if necessary if self.programme is None: print("want to get active channel, but have to load programme first") self.load_programme_from_db() # get active source for entry in self.programme: # check if lastentry is set and if act entry is in the future if lastentry is not None and entry.entry_start_unix > now_unix: # return lastentry if so return entry # actsource = entry.source # break lastentry = entry # if actsource.startswith("file") or actsource.startswith("pool") or actsource.startswith("playlist"): # print("AuraScheduler found upcoming source '" + str(entry.__dict__) + "'! returning: fs") # return "fs" # elif actsource.startswith("http"): # print("AuraScheduler found upcoming source '" + str(entry.__dict__) + "'! returning: http") # return "http" # elif actsource.startswith("linein"): # print("AuraScheduler found upcoming source '" + str(entry.__dict__) + "'! returning: linein") # return "linein" return None # ------------------------------------------------------------------------------------------ # def load_programme_from_db(self, silent=False): self.programme = ScheduleEntry.select_all() if not silent: print("i am the scheduler and i am holding the following stuff") # now in unixtime now_unix = time.mktime(datetime.datetime.now().timetuple()) # switch to check if its the first stream in loaded programme first_stream_in_programme = False for entry in self.programme: # since we get also programmes from act hour, filter these out if entry.entry_start_unix > now_unix: # when do we have to start? diff = entry.entry_start_unix - now_unix diff = diff/50000 # testing purpose # create the activation threads and run them after <diff> seconds if entry.source.startswith("linein"): self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate) elif entry.type == ScheduleEntryType.STREAM: if first_stream_in_programme: self.liquidsoapcommunicator.next_stream_source(entry.source) first_stream_in_programme = False self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate) elif entry.type == ScheduleEntryType.FILESYSTEM: self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate) else: print("WARNING: Cannot understand source '" + entry.source + "' from " + str(entry.__dict__)) print(" Not setting any activation Thread!") if not silent: print(entry.__dict__) # ------------------------------------------------------------------------------------------ # def add_or_update_timer(self, entry, diff, func): # check if something is planned at given time planned_timer = self.is_something_planned_at_time(entry.entry_start) # if something is planned on entry.entry_start if planned_timer: planned_entry = planned_timer.entry # check if the playlist_id's are different if planned_entry.playlist_id != entry.playlist_id: # if not stop the old timer and remove it from the list self.stop_timer(planned_timer) # and create a new one self.create_timer(entry, diff, func) # if the playlist id's do not differ => do nothing, they are the same # if nothing is planned at given time, create a new timer else: self.create_timer(entry, diff, func) # ------------------------------------------------------------------------------------------ # def stop_timer(self, timer): # stop timer timer.cancel() # and remove it from message queue self.message_timer.remove(timer) # ------------------------------------------------------------------------------------------ # def create_timer(self, entry, diff, func): t = MessageTimer(diff, func, [entry], self.debug) self.message_timer.append(t) t.start() # ------------------------------------------------------------------------------------------ # def is_something_planned_at_time(self, given_time): for t in self.message_timer: if t.entry.entry_start == given_time: return t return False # ------------------------------------------------------------------------------------------ # def find_entry_in_timers(self, entry): # check if a playlist id is already planned for t in self.message_timer: if t.entry.playlist_id == entry.playlist_id and t.entry.entry_start == entry.entry_start: return t return False # ------------------------------------------------------------------------------------------ # def get_act_programme_as_string(self): programme_as_string = "" if self.programme is None: raise NoProgrammeLoadedException("") try: programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder) except: traceback.print_exc() return programme_as_string # ------------------------------------------------------------------------------------------ # def print_message_queue(self): message_queue = "" for t in self.message_timer: message_queue += t.get_info()+"\n" return message_queue # ------------------------------------------------------------------------------------------ # # ------------------------------------------------------------------------------------------ # def swap_playlist_entries(self, indexes): from_entry = None to_entry = None from_idx = indexes["from_index"] to_idx = indexes["to_index"] # find the entries for p in self.programme: if p.programme_index == int(from_idx): from_entry = p if p.programme_index == int(to_idx): to_entry = p # break out of loop, if both entries found if from_entry is not None and to_entry is not None: break # check if entries are found if from_entry is None or to_entry is None: return "From or To Entry not found!" # swap sources swap_source = from_entry.source from_entry.source = to_entry.source to_entry.source = swap_source # store to database from_entry.store(add=False, commit=False) to_entry.store(add=False, commit=True) # and return the programme with swapped entries return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def delete_playlist_entry(self, index): found = False for p in self.programme: if p.programme_index == int(index): p.delete(True) self.load_programme_from_db() found = True break if not found: print("WARNING: Nothing to delete") return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def insert_playlist_entry(self, fromtime_source): fromtime = fromtime_source["fromtime"] source = fromtime_source["source"] entry = ScheduleEntry() entry.entry_start = fromtime entry.source = source entry.playlist_id = 0 entry.schedule_id = 0 entry.entry_num = ScheduleEntry.select_next_manual_entry_num() entry.store(add=True, commit=True) self.load_programme_from_db() return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def __load_config__(self): """ Scheduler-Config importieren @rtype: boolean @return: True/False """ # Wenn das Scheduling bereits läuft, muss der Scheduler nicht unbedingt angehalten werden error_type = 'fatal' if self.initial else 'error' # watcher_jobs = self.getJobs() try: # Die Jobs aus der Config ... watcher_jobs = self.get_jobs() except: self.redismessenger.send('Config is broken', '0301', error_type, 'loadConfig', None, 'config') if self.initial: self.ready = False return False # Fehlermeldung senden, wenn keine Jobs gefunden worden sind if len(watcher_jobs) == 0: self.redismessenger.send('No Jobs found in Config', '0302', error_type, 'loadConfig', None, 'config') return True def get_jobs(self): error_type = 'fatal' if self.initial else 'error' try: # Das scheduler.xml laden self.schedulerconfig = AuraSchedulerConfig(self.schedulerconfig) except: # Das scheint kein gültiges XML zu sein self.redismessenger.send('Config is broken', '0301', error_type, 'loadConfig', None, 'config') # Wenn das beim Start passiert können wir nix tun if self.initial: self.ready = False return False jobs = self.schedulerconfig.getJobs() for job in jobs: if job['job'] == 'start_recording' or job['job'] == 'play_playlist': stopjob = self.__get_stop_job__(job) jobs.append(stopjob) return jobs # -----------------------------------------------------------------------# def __get_stop_job__(self, startjob): job = {} job['job'] = 'stop_playlist' if startjob['job'] == 'play_playlist' else 'stop_recording' if startjob['day'] == 'all': job['day'] = startjob['day'] else: if startjob['time'] < startjob['until']: job['day'] = startjob['day'] else: try: day = int(startjob['day']) stopday = 0 if day > 5 else day+1 job['day'] = str(stopday) except: job['day'] = 'all' job['time'] = startjob['until'] return job # ------------------------------------------------------------------------------------------ # def start(self): """ Event Loop starten """ # Alle watcher starten for watcher in self.watchers: watcher.start() logging.debug("{0}: started".format(self)) try: self.loop.start() except: self.redismessenger.send("Loop did'nt start", '0302', 'fatal', 'appstart', None, 'appinternal') else: self.redismessenger.send("Scheduler started", '0100', 'success', 'appstart', None, 'appinternal') # ------------------------------------------------------------------------------------------ # def stop(self): """ Event Loop stoppen """ self.loop.stop(pyev.EVBREAK_ALL) # alle watchers stoppen und entfernen while self.watchers: self.watchers.pop().stop() self.redismessenger.send("Loop stopped", '0400', 'success', 'appstart', None, 'appinternal') # ------------------------------------------------------------------------------------------ # def signal_cb(self, loop, revents): """ Signalverarbeitung bei Abbruch @type loop: object @param loop: Das py_ev loop Objekt @type revents: object @param revents: Event Callbacks """ self.redismessenger.send("Received stop signal", '1100', 'success', 'appstop', None, 'appinternal') self.stop() # ------------------------------------------------------------------------------------------ # def signal_reload(self, loop, revents): """ Lädt Scheduling-Konfiguration neu bei Signal SIGUSR1 @type loop: object @param loop: Das py_ev loop Objekt @type revents: object @param revents: Event Callbacks """ self.redismessenger.send("Comba Scheduler gracefully restarted", '1200', 'success', 'appreload', None, 'appinternal') self.reload_config() # ------------------------------------------------------------------------------------------ # def load_playlist(self, data=None): """ Playlist laden """ store = AuraCalendarService() uri = store.get_uri() store.start() # wait until childs thread returns store.join() data = {} data['uri'] = uri result = self.client.playlist_load(uri) if self.__check_result__(result): self.success('load_playlist', data, '00') else: self.error('load_playlist', data, '02') # ------------------------------------------------------------------------------------------ # def start_recording(self, data): """ Aufnahme starten """ result = self.client.recorder_start() # store = AuraCalendarService() # self._preparePlaylistStore(store, datetime.datetime.now(), data) # uri = store.getUri() # store.start() if self.__check_result__(result): self.success('start_recording', result, '00') else: self.error('start_recording', result, '01') # ------------------------------------------------------------------------------------------ # def stop_recording(self, data): """ Aufnahme anhalten """ result = self.client.recorder_stop() if self.__check_result__(result): self.success('stop_recording', result, '00') else: self.error('stop_recording', result, '01') class MessageTimer(threading.Timer): entry = None debug = False diff = None def __init__(self, diff, func, param, debug=False): threading.Timer.__init__(self, diff, func, param) self.diff = diff self.func = func self.entry = param[0] self.debug = True # debug self.get_info() def get_info(self): if self.debug: print("MessageTimer starting @ " + str(self.entry.entry_start) + " source '" + str(self.entry.source) + "' In seconds: " + str(self.diff)) return "Calling " + str(self.func) + " @ " + str(self.entry.entry_start)