# # engine # # Playout Daemon for autoradio project # # # Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at> # # This file is part of engine. # # engine is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # any later version. # # engine is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with engine. If not, see <http://www.gnu.org/licenses/>. # # Meta __version__ = '0.0.1' __license__ = "GNU General Public License (GPL) Version 3" __version_info__ = (0, 0, 1) __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>' """ Aura Scheduler Is holding the eventqueue """ import time import simplejson import datetime import decimal import traceback import sqlalchemy import logging import threading from modules.communication.redis.messenger import RedisMessenger from modules.scheduling.calendar import AuraCalendarService from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel from libraries.exceptions.exception_logger import ExceptionLogger from libraries.enum.auraenumerations import ScheduleEntryType def alchemyencoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, decimal.Decimal): return float(obj) elif isinstance(obj, sqlalchemy.orm.state.InstanceState): return "" elif isinstance(obj, Schedule): return simplejson.dumps([obj._asdict()], default=alchemyencoder) else: return str(obj) # ------------------------------------------------------------------------------------------ # class AuraScheduler(ExceptionLogger, threading.Thread): """ Aura Scheduler Class Gets data from pv and importer, stores and fires events """ redismessenger = RedisMessenger() message_timer = [] job_result = {} liquidsoapcommunicator = None schedule_entries = None active_entry = None exit_event = None programme = None client = None logger = None config = None tried_fetching = 0 fetch_max = 3 def __init__(self, config): """ Constructor @type config: ConfigReader @param config: read engine.ini """ self.config = config self.logger = logging.getLogger("AuraEngine") # init threading threading.Thread.__init__(self) # init messenger.. probably not needed anymore self.redismessenger.set_channel('scheduler') self.redismessenger.set_section('execjob') # load error messages error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" f = open(error_file) self.error_data = simplejson.load(f) f.close() # init database ? self.init_database() #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal') # create exit event self.exit_event = threading.Event() # start loading new programm every hour self.start() # ------------------------------------------------------------------------------------------ # def init_database(self): # check if tables do exist. if not create them try: ScheduleEntry.select_all() except sqlalchemy.exc.ProgrammingError as e: errcode = e.orig.args[0] if errcode == 1146: # error for no such table x = AuraDatabaseModel() x.recreate_db() else: raise # ------------------------------------------------------------------------------------------ # def run(self): #while True: while not self.exit_event.is_set(): # set seconds to wait seconds_to_wait = int(self.config.get("fetching_frequency")) # calc next time next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) # write to logger self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time)) # empty database # self.logger.info("emptying database") # ScheduleEntry.truncate() # fetch new programme self.fetch_new_programme() # and wait self.exit_event.wait(seconds_to_wait) # ------------------------------------------------------------------------------------------ # def stop(self): self.exit_event.set() # ------------------------------------------------------------------------------------------ # def get_active_entry(self): now_unix = time.mktime(datetime.datetime.now().timetuple()) lastentry = None # load programme if necessary if self.programme is None: self.logger.debug("want to get active channel, but have to load programme first") self.load_programme_from_db() # get active source for entry in self.programme: # check if lastentry is set and if act entry is in the future if lastentry is not None and entry.entry_start_unix > now_unix: # return entry if so return entry # actsource = entry.source lastentry = entry return None # ------------------------------------------------------------------------------------------ # def load_programme_from_db(self, silent=False): #self.programme = ScheduleEntry.select_all() self.programme = ScheduleEntry.select_act_programme() # now in unixtime now_unix = time.mktime(datetime.datetime.now().timetuple()) # switch to check if its the first stream in loaded programme first_stream_in_programme = False # fading times fade_in_time = float(self.config.get("fade_in_time")) fade_out_time = float(self.config.get("fade_out_time")) # old entry for fading out old_entry = None for entry in self.programme: # since we get also programmes from act hour, filter these out if entry.entry_start_unix > now_unix: # when do we have to start? diff = entry.entry_start_unix - now_unix diff = diff/1000 # testing purpose self.enable_fading(fade_in_time, fade_out_time, old_entry, entry, diff) # create the activation threads and run them after <diff> seconds if entry.source.startswith("linein"): self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry]) elif entry.type == ScheduleEntryType.STREAM: if first_stream_in_programme: self.liquidsoapcommunicator.next_stream_source(entry.source) first_stream_in_programme = False self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry]) elif entry.type == ScheduleEntryType.FILESYSTEM: self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry]) else: self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry)) self.logger.warning(" Not setting any activation Thread!") # store the old entry for fading out old_entry = entry self.logger.warning(self.print_message_queue()) # ------------------------------------------------------------------------------------------ # def enable_fading(self, fade_in_time, fade_out_time, old_entry, new_entry, diff): # enable fading when entry types are different if old_entry is not None: if old_entry.type != new_entry.type: # enable fadeout if enabled if fade_out_time != 0 and old_entry is not None: params = [old_entry, -fade_out_time] self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, params) # same for fadein if fade_in_time != 0: params = [new_entry, fade_in_time] self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, params) # ------------------------------------------------------------------------------------------ # def add_or_update_timer(self, diff, func, parameters): length = len(parameters) if length == 2: entry = parameters[0] fadingtime = parameters[1] elif length == 1: entry = parameters[0] fadingtime = 0 # check if something is planned at given time if fadingtime < 0: planned_timer = self.is_something_planned_at_time(entry.entry_start + datetime.timedelta(seconds=fadingtime)) else: planned_timer = self.is_something_planned_at_time(entry.entry_start) # if something is planned on entry.entry_start if planned_timer: planned_entry = planned_timer.entry # check if the playlist_id's are different if planned_entry.playlist_id != entry.playlist_id: # if not stop the old timer and remove it from the list self.stop_timer(planned_timer) # and create a new one self.create_timer(diff, func, parameters) # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same # if nothing is planned at given time, create a new timer else: self.create_timer(diff, func, parameters) # ------------------------------------------------------------------------------------------ # def stop_timer(self, timer): # stop timer timer.cancel() # and remove it from message queue self.message_timer.remove(timer) # ------------------------------------------------------------------------------------------ # def create_timer(self, diff, func, parameters): t = CallFunctionTimer(diff, func, parameters) self.message_timer.append(t) t.start() # ------------------------------------------------------------------------------------------ # def is_something_planned_at_time(self, given_time): for t in self.message_timer: if t.entry.entry_start == given_time: return t return False # ------------------------------------------------------------------------------------------ # def get_act_programme_as_string(self): programme_as_string = "" if self.programme is None or len(self.programme) == 0: self.fetch_new_programme() try: programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder) except Exception as e: self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e)) traceback.print_exc() return programme_as_string # ------------------------------------------------------------------------------------------ # def print_message_queue(self): message_queue = "" for t in self.message_timer: message_queue += str(t)+"\n" return message_queue # ------------------------------------------------------------------------------------------ # def swap_playlist_entries(self, indexes): from_entry = None to_entry = None from_idx = indexes["from_index"] to_idx = indexes["to_index"] # find the entries for p in self.programme: if p.programme_index == int(from_idx): from_entry = p if p.programme_index == int(to_idx): to_entry = p # break out of loop, if both entries found if from_entry is not None and to_entry is not None: break # check if entries are found if from_entry is None or to_entry is None: return "From or To Entry not found!" # swap sources swap = from_entry.source from_entry.source = to_entry.source to_entry.source = swap # store to database from_entry.store(add=False, commit=False) to_entry.store(add=False, commit=True) # and return the programme with swapped entries return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def delete_playlist_entry(self, index): found = False for p in self.programme: if p.programme_index == int(index): p.delete(True) self.load_programme_from_db() found = True break if not found: self.logger.warning("Nothing to delete") return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def insert_playlist_entry(self, fromtime_source): fromtime = fromtime_source["fromtime"] source = fromtime_source["source"] entry = ScheduleEntry() entry.entry_start = fromtime entry.source = source entry.playlist_id = 0 entry.schedule_id = 0 entry.entry_num = ScheduleEntry.select_next_manual_entry_num() entry.store(add=True, commit=True) self.load_programme_from_db() return self.get_act_programme_as_string() # ------------------------------------------------------------------------------------------ # def fetch_new_programme(self): self.logger.info("trying to fetch new programme") if self.tried_fetching == self.fetch_max: msg = "Cannot connect to PV or Tank! No Programme loaded!" self.logger.error(msg) self.tried_fetching = 0 return msg self.tried_fetching += 1 acs = AuraCalendarService(self.config) queue = acs.get_queue() # start fetching thread acs.start() # wait for the end response = queue.get() if type(response) is dict: self.load_programme_from_db() if self.programme is not None and len(self.programme) > 0: self.tried_fetching = 0 if len(self.programme) == 0 and self.tried_fetching == self.fetch_max: self.logger.critical("Programme loaded from database has no entries!") # return self.get_act_programme_as_string() elif response.startswith("fetching_aborted"): self.logger.warning("Fetching was being aborted from AuraCalendarService! Are you connected? Reason: " + response) else: self.logger.warning("Got an unknown response from AuraCalendarService: " + response) # ------------------------------------------------------------------------------------------ # def set_next_file_for(self, playlistname): self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname) self.logger.critical(str(self.get_active_entry())) if playlistname == "station": file = "/var/audio/fallback/eins.zwo.bombe.mp3" elif playlistname == "timeslot": file = "/var/audio/fallback/ratm.killing.mp3" elif playlistname == "show": file = "/var/audio/fallback/weezer.hash.pipe.mp3" else: file = "" self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!") self.logger.info("Set next fallback file for " + playlistname + ": " + file) self.redismessenger.set_next_file_for(playlistname, file) return file # ------------------------------------------------------------------------------------------ # class CallFunctionTimer(threading.Timer): logger = None param = None entry = None diff = None def __init__(self, diff, func, param): threading.Timer.__init__(self, diff, func, param) self.diff = diff self.func = func self.param = param self.entry = param[0] self.logger = logging.getLogger("AuraEngine") self.logger.debug(str(self)) # ------------------------------------------------------------------------------------------ # def __str__(self): if len(self.param) >= 2: # fading in if self.param[1] > 0: return "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " fading in source '" + str(self.entry.source) + "' in seconds: " + str(self.diff) # fading out else: return "CallFunctionTimer starting @ " + str(self.entry.entry_start + datetime.timedelta(seconds=self.param[1])) + " fading out source '" + str(self.entry.source) + "' in seconds: " + str(self.diff+self.param[1]) else: return "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " switching to source '" + str(self.entry.source) + "' in seconds: " + str(self.diff)