# -*- coding: utf-8 -*- # # scheduler.py # # Copyright 2018 Radio FRO <https://fro.at>, Radio Helsinki <https://helsinki.at>, Radio Orange <https://o94.at> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; Version 3 of the License # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, the license can be downloaded here: # # http://www.gnu.org/licenses/gpl.html # Meta __version__ = '0.0.1' __license__ = "GNU General Public License (GPL) Version 3" __version_info__ = (0, 0, 1) __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>' """ Aura Scheduler Is holding the eventqueue """ import time import simplejson import datetime import decimal import traceback import sqlalchemy import logging import threading # Die eigenen Bibliotheken from modules.communication.redis.messenger import RedisMessenger from modules.scheduling.calendar import AuraCalendarService from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel from libraries.exceptions.exception_logger import ExceptionLogger from libraries.enum.scheduleentrytype import ScheduleEntryType def alchemyencoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, decimal.Decimal): return float(obj) elif isinstance(obj, sqlalchemy.orm.state.InstanceState): return "" elif isinstance(obj, Schedule): return simplejson.dumps([obj._asdict()], default=alchemyencoder) else: return str(obj) class AuraScheduler(ExceptionLogger, threading.Thread): """ Aura Scheduler Class Gets data from pv and importer, stores and fires events """ redismessenger = RedisMessenger() message_timer = [] job_result = {} liquidsoapcommunicator = None schedule_entries = None active_entry = None exit_event = None programme = None client = None logger = None config = None tried_fetching = 0 fetch_max = 3 def __init__(self, config): """ Constructor @type config: ConfigReader @param config: read engine.ini """ self.config = config self.logger = logging.getLogger("AuraEngine") # init threading threading.Thread.__init__(self) # init messenger.. probably not needed anymore self.redismessenger.set_channel('scheduler') self.redismessenger.set_section('execjob') # load schedulerconfig... self.schedulerconfig = self.config.get("scheduler_config_file") # load error messages error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" f = open(error_file) self.error_data = simplejson.load(f) f.close() # init database ? self.init_database() self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal') # create exit event self.exit_event = threading.Event() # start loading new programm every hour self.start() # ------------------------------------------------------------------------------------------ # def init_database(self): # check if tables do exist. if not create them try: ScheduleEntry.select_all() except sqlalchemy.exc.ProgrammingError as e: errcode = e.orig.args[0] if errcode == 1146: # error for no such table x = AuraDatabaseModel() x.recreate_db() else: raise def run(self): # set seconds to wait seconds_to_wait = self.config.get("fetching_frequency") #while True: while not self.exit_event.is_set(): # calc next time next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) # write to logger self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time)) # empty database self.logger.info("emptying database") ScheduleEntry.truncate() # fetch new programme self.fetch_new_programme() # and wait self.exit_event.wait(seconds_to_wait) def stop(self): self.exit_event.set() # ------------------------------------------------------------------------------------------ # def get_active_entry(self): now_unix = time.mktime(datetime.datetime.now().timetuple()) actsource = "" lastentry = None # load programme if necessary if self.programme is None: self.logger.debug("want to get active channel, but have to load programme first") self.load_programme_from_db() # get active source for entry in self.programme: # check if lastentry is set and if act entry is in the future if lastentry is not None and entry.entry_start_unix > now_unix: # return lastentry if so return entry # actsource = entry.source # break lastentry = entry return None # ------------------------------------------------------------------------------------------ # def load_programme_from_db(self, silent=False): self.programme = ScheduleEntry.select_all() # now in unixtime now_unix = time.mktime(datetime.datetime.now().timetuple()) # switch to check if its the first stream in loaded programme first_stream_in_programme = False for entry in self.programme: # since we get also programmes from act hour, filter these out if entry.entry_start_unix > now_unix: # when do we have to start? diff = entry.entry_start_unix - now_unix diff = diff/10000 # testing purpose # create the activation threads and run them after <diff> seconds if entry.source.startswith("linein"): self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate) elif entry.type == ScheduleEntryType.STREAM: if first_stream_in_programme: self.liquidsoapcommunicator.next_stream_source(entry.source) first_stream_in_programme = False self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate) elif entry.type == ScheduleEntryType.FILESYSTEM: self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate) else: self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry)) self.logger.warning(" Not setting any activation Thread!") self.logger.info(str(entry)) # ------------------------------------------------------------------------------------------ # def add_or_update_timer(self, entry, diff, func): # check if something is planned at given time planned_timer = self.is_something_planned_at_time(entry.entry_start) # if something is planned on entry.entry_start if planned_timer: planned_entry = planned_timer.entry # check if the playlist_id's are different if planned_entry.playlist_id != entry.playlist_id: # if not stop the old timer and remove it from the list self.stop_timer(planned_timer) # and create a new one self.create_timer(entry, diff, func) # if the playlist id's do not differ => do nothing, they are the same # if nothing is planned at given time, create a new timer else: self.create_timer(entry, diff, func) # ------------------------------------------------------------------------------------------ # def stop_timer(self, timer): # stop timer timer.cancel() # and remove it from message queue self.message_timer.remove(timer) # ------------------------------------------------------------------------------------------ # def create_timer(self, entry, diff, func): t = CallFunctionTimer(diff, func, [entry]) self.message_timer.append(t) t.start() # ------------------------------------------------------------------------------------------ # def is_something_planned_at_time(self, given_time): for t in self.message_timer: if t.entry.entry_start == given_time: return t return False # ------------------------------------------------------------------------------------------ # def find_entry_in_timers(self, entry): # check if a playlist id is already planned for t in self.message_timer: if t.entry.playlist_id == entry.playlist_id and t.entry.entry_start == entry.entry_start: return t return False # ------------------------------------------------------------------------------------------ # def get_act_programme_as_string(self): programme_as_string = "" if self.programme is None or len(self.programme) == 0: self.fetch_new_programme() try: programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder) except Exception as e: self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e)) traceback.print_exc() return programme_as_string # ------------------------------------------------------------------------------------------ # def print_message_queue(self): message_queue = "" for t in self.message_timer: message_queue += t.get_info()+"\n" return message_queue # ------------------------------------------------------------------------------------------ # def swap_playlist_entries(self, indexes): from_entry = None to_entry = None from_idx = indexes["from_index"] to_idx = indexes["to_index"] # find the entries for p in self.programme: if p.programme_index == int(from_idx): from_entry = p if p.programme_index == int(to_idx): to_entry = p # break out of loop, if both entries found if from_entry is not None and to_entry is not None: break # check if entries are found if from_entry is None or to_entry is None: return "From or To Entry not found!" # swap sources swap = from_entry.source from_entry.source = to_entry.source to_entry.source = swap # store to database from_entry.store(add=False, commit=False) to_entry.store(add=False, commit=True) # and return the programme with swapped entries return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def delete_playlist_entry(self, index): found = False for p in self.programme: if p.programme_index == int(index): p.delete(True) self.load_programme_from_db() found = True break if not found: self.logger.warning("Nothing to delete") return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def insert_playlist_entry(self, fromtime_source): fromtime = fromtime_source["fromtime"] source = fromtime_source["source"] entry = ScheduleEntry() entry.entry_start = fromtime entry.source = source entry.playlist_id = 0 entry.schedule_id = 0 entry.entry_num = ScheduleEntry.select_next_manual_entry_num() entry.store(add=True, commit=True) self.load_programme_from_db() return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def fetch_new_programme(self): self.logger.info("trying to fetch new programme") if self.tried_fetching == self.fetch_max: msg = "Cannot connect to PV or Tank! No Programme loaded!" self.logger.error(msg) self.tried_fetching = 0 return msg self.tried_fetching += 1 acs = AuraCalendarService(self.config) queue = acs.get_queue() # start fetching thread acs.start() # wait for the end response = queue.get() if type(response) is dict: self.load_programme_from_db() if self.programme is not None and len(self.programme) > 0: self.tried_fetching = 0 if len(self.programme) == 0 and self.tried_fetching == self.fetch_max: self.logger.critical("Programme loaded from database has no entries!") # return self.get_act_programme_as_string() elif response.startswith("fetching_aborted"): self.logger.warning("Fetching was being aborted from AuraCalendarService! Are you connected? Reason: " + response) else: self.logger.warning("Got an unknown response from AuraCalendarService: " + response) # ------------------------------------------------------------------------------------------ # def set_next_file_for(self, playlistname): self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname) self.logger.critical(str(self.get_active_entry())) self.logger.critical(playlistname) self.redismessenger.set_next_file_for(playlistname, "/var/audio/blank.flac") #print('return self.config.get("install_dir") + "/configuration/blank.flac"') #import sys #sys.exit(0) # ------------------------------------------------------------------------------------------ # def start_recording(self, data): """ Aufnahme starten """ result = self.client.recorder_start() if self.__check_result__(result): self.success("start_recording", result, "00") else: self.error("start_recording", result, "01") # ------------------------------------------------------------------------------------------ # def stop_recording(self, data): """ Aufnahme anhalten """ result = self.client.recorder_stop() if self.__check_result__(result): self.success("stop_recording", result, "00") else: self.error("stop_recording", result, "01") class CallFunctionTimer(threading.Timer): logger = None entry = None debug = False diff = None def __init__(self, diff, func, param): threading.Timer.__init__(self, diff, func, param) self.diff = diff self.func = func self.entry = param[0] self.logger = logging.getLogger("AuraEngine") msg = "MessageTimer starting @ " + str(self.entry.entry_start) + " source '" + str(self.entry.source) + "' In seconds: " + str(self.diff) self.logger.debug(msg)