# # engine # # Playout Daemon for autoradio project # # # Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at> # # This file is part of engine. # # engine is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # any later version. # # engine is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with engine. If not, see <http://www.gnu.org/licenses/>. # # Meta __version__ = '0.0.1' __license__ = "GNU General Public License (GPL) Version 3" __version_info__ = (0, 0, 1) __author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>' """ Aura Scheduler Is holding the eventqueue """ import time import json import datetime import decimal import traceback import sqlalchemy import logging import threading from operator import attrgetter from modules.communication.redis.messenger import RedisMessenger from modules.scheduling.calendar import AuraCalendarService from libraries.database.broadcasts import Schedule, Playlist, AuraDatabaseModel from libraries.exceptions.exception_logger import ExceptionLogger from libraries.enum.auraenumerations import ScheduleEntryType, TimerType def alchemyencoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, decimal.Decimal): return float(obj) elif isinstance(obj, sqlalchemy.orm.state.InstanceState): return "" elif isinstance(obj, Schedule): return json.dumps([obj._asdict()], default=alchemyencoder) else: return str(obj) # ------------------------------------------------------------------------------------------ # class AuraScheduler(ExceptionLogger, threading.Thread): """ Aura Scheduler Class Gets data from pv and importer, stores and fires events """ redismessenger = None message_timer = [] job_result = {} # stores the conn to liquidsoap liquidsoapcommunicator = None # stores the last time when a fetch from pv/tank gone right last_successful_fetch = None schedule_entries = None active_entry = None exit_event = None programme = None client = None logger = None config = None def __init__(self, config): """ Constructor @type config: ConfigReader @param config: read engine.ini """ self.config = config # init database ? self.init_database() self.redismessenger = RedisMessenger(config) self.logger = logging.getLogger("AuraEngine") # init threading threading.Thread.__init__(self) # init messenger.. probably not needed anymore self.redismessenger.set_channel('scheduler') self.redismessenger.set_section('execjob') # load error messages error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" f = open(error_file) self.error_data = json.load(f) f.close() #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal') # create exit event self.exit_event = threading.Event() # start loading new programm every hour self.start() # ------------------------------------------------------------------------------------------ # def init_database(self): if self.config.get("recreate_db") is not None: AuraDatabaseModel.recreate_db(systemexit=True) # check if tables do exist. if not create them try: Playlist.select_all() except sqlalchemy.exc.ProgrammingError as e: errcode = e.orig.args[0] if errcode == 1146: # error for no such table x = AuraDatabaseModel() x.recreate_db() else: raise # ------------------------------------------------------------------------------------------ # def run(self): while not self.exit_event.is_set(): # set seconds to wait seconds_to_wait = int(self.config.get("fetching_frequency")) # calc next time next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) # write to logger self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time)) # empty database # self.logger.info("emptying database") # ScheduleEntry.truncate() # fetch new programme self.fetch_new_programme() # and wait self.exit_event.wait(seconds_to_wait) # ------------------------------------------------------------------------------------------ # def stop(self): self.exit_event.set() # ------------------------------------------------------------------------------------------ # def get_active_entry(self): now_unix = time.mktime(datetime.datetime.now().timetuple()) lastentry = None # load programme if necessary if self.programme is None: self.logger.debug("want to get active channel, but have to load programme first") self.load_programme_from_db() # get active source for show in self.programme: for entry in show.playlist: # check if lastentry is set and if act entry is in the future if lastentry is not None and entry.entry_start_unix > now_unix: # return entry if so return (show,entry) # actsource = entry.source lastentry = entry return None, None # ------------------------------------------------------------------------------------------ # def load_programme_from_db(self, silent=False): self.programme = Schedule.select_act_programme() if self.programme is None or len(self.programme) == 0: self.logger.critical("Could not load programme from database. We are in big trouble my friend!") return planned_entries = [] for schedule in self.programme: # playlist to play schedule.playlist = Playlist.select_playlist(schedule.playlist_id) # show fallback is played when playlist fails schedule.showfallback = Playlist.select_playlist(schedule.show_fallback_id) # timeslot fallback is played when show fallback fails schedule.timeslotfallback = Playlist.select_playlist(schedule.timeslot_fallback_id) # station fallback is played when timeslot fallback fails schedule.stationfallback = Playlist.select_playlist(schedule.station_fallback_id) for p in schedule.playlist: planned_entries.append(p) self.enable_entries(planned_entries) self.logger.warning(self.print_message_queue()) # ------------------------------------------------------------------------------------------ # def enable_entries(self, playlist): # now in unixtime now_unix = time.mktime(datetime.datetime.now().timetuple()) # switch to check if its the first stream in loaded programme first_stream_in_programme = True # old entry for fading out old_entry = None for entry in playlist: # since we get also programmes from the past, filter these out if entry.entry_start_unix > now_unix: # when do we have to start? diff = entry.entry_start_unix - now_unix diff = diff/100 # testing purpose # enable the three timer self.enable_timer(diff, entry, old_entry) # store the old entry for fading out old_entry = entry def enable_timer(self, diff, entry, old_entry): # create the activation threads and run them after <diff> seconds self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry)) entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry]) self.enable_fading(diff, entry, old_entry) # ------------------------------------------------------------------------------------------ # def enable_fading(self, diff, new_entry, old_entry): # fading times fade_out_time = float(self.config.get("fade_out_time")) # enable fading when entry types are different if old_entry is not None: if old_entry.type != new_entry.type: #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry]) old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True) self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry)) # same for fadein except old_entry can be None if old_entry.type != new_entry.type: #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry]) new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True) self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry)) # ------------------------------------------------------------------------------------------ # def add_or_update_timer(self, diff, func, parameters): timer = None entry = parameters[0] planned_timer = self.is_something_planned_at_time(entry.entry_start) # if something is planned on entry.entry_start if planned_timer: planned_entry = planned_timer.entry # check if the playlist_id's are different if planned_entry.playlist_id != entry.playlist_id: # if not stop the old timer and remove it from the list self.stop_timer(planned_timer) # and create a new one timer = self.create_timer(diff, func, parameters, switcher=True) # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same # if nothing is planned at given time, create a new timer else: timer = self.create_timer(diff, func, parameters, switcher=True) if timer is None: return planned_timer return timer # ------------------------------------------------------------------------------------------ # def stop_timer(self, timer): # stop timer timer.cancel() if timer.entry.fadeintimer is not None: timer.entry.fadeintimer.cancel() self.message_timer.remove(timer.entry.fadeintimer) if timer.entry.fadeouttimer is not None: timer.entry.fadeouttimer.cancel() self.message_timer.remove(timer.entry.fadeouttimer) # and remove it from message queue self.message_timer.remove(timer) self.logger.critical("REMOVED TIMER for " + str(timer.entry)) # ------------------------------------------------------------------------------------------ # def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False): if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher: raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True") t = CallFunctionTimer(diff, func, parameters, fadein, fadeout, switcher) self.message_timer.append(t) t.start() return t # ------------------------------------------------------------------------------------------ # def is_something_planned_at_time(self, given_time): for t in self.message_timer: if t.entry.entry_start == given_time: return t return False # ------------------------------------------------------------------------------------------ # def get_act_programme_as_string(self): programme_as_string = "" if self.programme is None or len(self.programme) == 0: self.fetch_new_programme() try: programme_as_string = json.dumps([p._asdict() for p in self.programme], default=alchemyencoder) except Exception as e: self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e)) traceback.print_exc() return programme_as_string # ------------------------------------------------------------------------------------------ # def print_message_queue(self): message_queue = "" for t in sorted(self.message_timer, key=attrgetter('diff')): message_queue += str(t)+"\n" return message_queue # ------------------------------------------------------------------------------------------ # def fetch_new_programme(self): self.logger.info("trying to fetch new programme") acs = AuraCalendarService(self.config) queue = acs.get_queue() # start fetching thread acs.start() # wait for the end response = queue.get() # reset lsf = self.last_successful_fetch self.last_successful_fetch = None if response is None: self.logger.warning("Trying to load programme from database, because i got an EMPTY (None) response from AuraCalendarService.") elif type(response) is list: self.programme = response if self.programme is not None and len(self.programme) > 0: self.last_successful_fetch = datetime.datetime.now() if len(self.programme) == 0: self.logger.critical("Programme fetched from pv/tank has no entries!") # return self.get_act_programme_as_string() elif response.startswith("fetching_aborted"): self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:]) else: self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response) # if somehow the programme could not be fetched => try to load it from database if self.last_successful_fetch is None: self.last_successful_fetch = lsf self.load_programme_from_db() # ------------------------------------------------------------------------------------------ # def set_next_file_for(self, playlistname): self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname) self.logger.critical(str(self.get_active_entry())) if playlistname == "station": file = "/var/audio/fallback/eins.zwo.bombe.flac" elif playlistname == "timeslot": file = "/var/audio/fallback/ratm.killing.flac" elif playlistname == "show": file = "/var/audio/fallback/weezer.hash.pipe.flac" else: file = "" self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!") self.logger.info("Set next fallback file for " + playlistname + ": " + file) self.redismessenger.set_next_file_for(playlistname, file) return file # ------------------------------------------------------------------------------------------ # def get_next_file_for(self, fallbackname): self.logger.critical("HAVE TO SET NEXT FILE FOR: " + fallbackname) (show, entry) = self.get_active_entry() self.logger.critical(str(show) + " " + str(entry)) if fallbackname == "station": file = "/home/david/Code/aura/engine/testing/content/1.flac" elif fallbackname == "timeslot": file = "/home/david/Code/aura/engine/testing/content/2.flac" elif fallbackname == "show": file = "/home/david/Code/aura/engine/testing/content/3.flac" else: file = "" self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!") #set_next_file_thread = SetNextFile(fallbackname, show) #set_next_file_thread.start() # self.logger.info("Set next fallback file for " + playlistname + ": " + file) # self.redismessenger.set_next_file_for(playlistname, file) return file # ------------------------------------------------------------------------------------------ # class SetNextFile(threading.Thread): fallbackname = None show = None def __init__(self, fallbackname, show): threading.Thread.__init__(self) self.fallbackname = fallbackname self.show = show def run(self): if self.fallbackname == "show": self.detect_next_file_for(self.show.showfallback) elif self.fallbackname == "timeslow": self.detect_next_file_for(self.show.timeslotfallback) elif self.fallbackname == "station": self.detect_next_file_for(self.show.stationfallback) def detect_next_file_for(self, playlist): return "" #if playlist.startswith("pool"): # self.find_next_file_in_pool(playlist) #def find_next_file_in_pool(self, pool): # return "" # ------------------------------------------------------------------------------------------ # class CallFunctionTimer(threading.Timer): logger = None param = None entry = None diff = None fadein = False fadeout = False switcher = False def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False): threading.Timer.__init__(self, diff, func, param) if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher: raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True") self.diff = diff self.func = func self.param = param self.entry = param[0] self.fadein = fadein self.fadeout = fadeout self.switcher = switcher self.logger = logging.getLogger("AuraEngine") self.logger.debug(str(self)) # ------------------------------------------------------------------------------------------ # def __str__(self): if self.fadein: return "CallFunctionTimer starting in " + str(self.diff) + "s fading in source '" + str(self.entry) elif self.fadeout: return "CallFunctionTimer starting in " + str(self.diff) + "s fading out source '" + str(self.entry) elif self.switcher: return "CallFunctionTimer starting in " + str(self.diff) + "s switching to source '" + str(self.entry) else: return "CORRUPTED CallFunctionTimer around! How can that be?"