Newer
Older
#
#
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# Meta

Gottfried Gaisbauer
committed
__version__ = '0.0.1'
__license__ = "GNU General Public License (GPL) Version 3"

Gottfried Gaisbauer
committed
__version_info__ = (0, 0, 1)
__author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
"""

Gottfried Gaisbauer
committed
Aura Scheduler
Is holding the eventqueue
"""
import time
import simplejson
import datetime

Gottfried Gaisbauer
committed
import decimal
import sqlalchemy

Gottfried Gaisbauer
committed
import logging
import threading

Gottfried Gaisbauer
committed
from operator import attrgetter
from modules.communication.redis.messenger import RedisMessenger

Gottfried Gaisbauer
committed
from modules.scheduling.calendar import AuraCalendarService
from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
from libraries.exceptions.exception_logger import ExceptionLogger

Gottfried Gaisbauer
committed
from libraries.enum.auraenumerations import ScheduleEntryType, TimerType

Gottfried Gaisbauer
committed
def alchemyencoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return float(obj)
elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
return ""

Gottfried Gaisbauer
committed
elif isinstance(obj, Schedule):
return simplejson.dumps([obj._asdict()], default=alchemyencoder)
else:

Gottfried Gaisbauer
committed
return str(obj)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
class AuraScheduler(ExceptionLogger, threading.Thread):

Gottfried Gaisbauer
committed
"""
Aura Scheduler Class
Gets data from pv and importer, stores and fires events
"""

Gottfried Gaisbauer
committed
message_timer = []

Gottfried Gaisbauer
committed
job_result = {}
liquidsoapcommunicator = None
schedule_entries = None
active_entry = None

Gottfried Gaisbauer
committed
exit_event = None

Gottfried Gaisbauer
committed
programme = None
client = None

Gottfried Gaisbauer
committed
logger = None
config = None

Gottfried Gaisbauer
committed
tried_fetching = 0

Gottfried Gaisbauer
committed
fetch_max = 3

Gottfried Gaisbauer
committed
def __init__(self, config):
"""
Constructor

Gottfried Gaisbauer
committed
@type config: ConfigReader
@param config: read engine.ini
"""

Gottfried Gaisbauer
committed
self.config = config
self.redismessenger = RedisMessenger(config)

Gottfried Gaisbauer
committed
self.logger = logging.getLogger("AuraEngine")
# init threading
threading.Thread.__init__(self)
# init messenger.. probably not needed anymore
self.redismessenger.set_channel('scheduler')
self.redismessenger.set_section('execjob')
# load error messages

Gottfried Gaisbauer
committed
error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
f = open(error_file)
self.error_data = simplejson.load(f)
f.close()
# init database ?
self.init_database()

Gottfried Gaisbauer
committed
#self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')

Gottfried Gaisbauer
committed
# create exit event

Gottfried Gaisbauer
committed
self.exit_event = threading.Event()
# start loading new programm every hour
self.start()
# ------------------------------------------------------------------------------------------ #
def init_database(self):
# check if tables do exist. if not create them
try:
ScheduleEntry.select_all()
except sqlalchemy.exc.ProgrammingError as e:
if errcode == 1146: # error for no such table
x = AuraDatabaseModel()
x.recreate_db()
# ------------------------------------------------------------------------------------------ #
def run(self):

Gottfried Gaisbauer
committed
while not self.exit_event.is_set():
# set seconds to wait
seconds_to_wait = int(self.config.get("fetching_frequency"))
# calc next time
next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
# write to logger

Gottfried Gaisbauer
committed
self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))

Gottfried Gaisbauer
committed
# empty database
# self.logger.info("emptying database")
# ScheduleEntry.truncate()

Gottfried Gaisbauer
committed
self.fetch_new_programme()

Gottfried Gaisbauer
committed
self.exit_event.wait(seconds_to_wait)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def stop(self):
self.exit_event.set()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
now_unix = time.mktime(datetime.datetime.now().timetuple())
lastentry = None
# load programme if necessary
if self.programme is None:

Gottfried Gaisbauer
committed
self.logger.debug("want to get active channel, but have to load programme first")

Gottfried Gaisbauer
committed
# get active source

Gottfried Gaisbauer
committed
for show in self.programme:
for entry in show.playlist:
# check if lastentry is set and if act entry is in the future
if lastentry is not None and entry.entry_start_unix > now_unix:
# return entry if so
return (show,entry) # actsource = entry.source

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
lastentry = entry

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def load_programme_from_db(self, silent=False):

Gottfried Gaisbauer
committed
self.programme = Schedule.select_act_programme()
planned_entries = []
for schedule in self.programme:
# playlist to play
schedule.playlist = ScheduleEntry.select_playlist(schedule.playlist_id)
# show fallback is played when playlist fails
schedule.showfallback = ScheduleEntry.select_playlist(schedule.show_fallback_id)
# timeslot fallback is played when show fallback fails
schedule.timeslotfallback = ScheduleEntry.select_playlist(schedule.timeslot_fallback_id)
# station fallback is played when timeslot fallback fails
schedule.stationfallback = ScheduleEntry.select_playlist(schedule.station_fallback_id)

Gottfried Gaisbauer
committed
for p in schedule.playlist:
planned_entries.append(p)
self.enable_entries(planned_entries)
self.logger.warning(self.print_message_queue())
# ------------------------------------------------------------------------------------------ #
def enable_entries(self, playlist):
# now in unixtime
now_unix = time.mktime(datetime.datetime.now().timetuple())
# switch to check if its the first stream in loaded programme

Gottfried Gaisbauer
committed
first_stream_in_programme = True
# old entry for fading out
old_entry = None

Gottfried Gaisbauer
committed
for entry in playlist:
# since we get also programmes from the past, filter these out

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
diff = diff/100 # testing purpose

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
# enable the three timer
self.enable_timer(diff, entry, old_entry)
# store the old entry for fading out
old_entry = entry

Gottfried Gaisbauer
committed
def enable_timer(self, diff, entry, old_entry):
# create the activation threads and run them after <diff> seconds
self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry))
entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
self.enable_fading(diff, entry, old_entry)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def enable_fading(self, diff, new_entry, old_entry):
# fading times
fade_out_time = float(self.config.get("fade_out_time"))
# enable fading when entry types are different
if old_entry is not None:
if old_entry.type != new_entry.type:

Gottfried Gaisbauer
committed
#self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry])
old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True)
self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry))

Gottfried Gaisbauer
committed
# same for fadein except old_entry can be None
if old_entry.type != new_entry.type:
#self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry])
new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True)
self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def add_or_update_timer(self, diff, func, parameters):

Gottfried Gaisbauer
committed
timer = None
entry = parameters[0]
planned_timer = self.is_something_planned_at_time(entry.entry_start)

Gottfried Gaisbauer
committed
# if something is planned on entry.entry_start
if planned_timer:
planned_entry = planned_timer.entry
# check if the playlist_id's are different
if planned_entry.playlist_id != entry.playlist_id:
# if not stop the old timer and remove it from the list
self.stop_timer(planned_timer)
# and create a new one

Gottfried Gaisbauer
committed
timer = self.create_timer(diff, func, parameters, switcher=True)

Gottfried Gaisbauer
committed
# if the playlist id's do not differ => reuse the old timer and do nothing, they are the same

Gottfried Gaisbauer
committed
# if nothing is planned at given time, create a new timer

Gottfried Gaisbauer
committed
else:

Gottfried Gaisbauer
committed
timer = self.create_timer(diff, func, parameters, switcher=True)
if timer is None:
return planned_timer
return timer

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def stop_timer(self, timer):
# stop timer
timer.cancel()

Gottfried Gaisbauer
committed
if timer.entry.fadeintimer is not None:
timer.entry.fadeintimer.cancel()
self.message_timer.remove(timer.entry.fadeintimer)
if timer.entry.fadeouttimer is not None:
timer.entry.fadeouttimer.cancel()
self.message_timer.remove(timer.entry.fadeouttimer)

Gottfried Gaisbauer
committed
# and remove it from message queue

Gottfried Gaisbauer
committed
self.message_timer.remove(timer)

Gottfried Gaisbauer
committed
self.logger.critical("REMOVED TIMER for " + str(timer.entry))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False):
if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True")
t = CallFunctionTimer(diff, func, parameters, fadein, fadeout, switcher)

Gottfried Gaisbauer
committed
self.message_timer.append(t)
t.start()

Gottfried Gaisbauer
committed
return t

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def is_something_planned_at_time(self, given_time):

Gottfried Gaisbauer
committed
for t in self.message_timer:

Gottfried Gaisbauer
committed
if t.entry.entry_start == given_time:

Gottfried Gaisbauer
committed
return t
return False
# ------------------------------------------------------------------------------------------ #
def get_act_programme_as_string(self):
programme_as_string = ""

Gottfried Gaisbauer
committed
if self.programme is None or len(self.programme) == 0:
self.fetch_new_programme()
programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)

Gottfried Gaisbauer
committed
except Exception as e:
self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
traceback.print_exc()
return programme_as_string

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
message_queue = ""

Gottfried Gaisbauer
committed
for t in sorted(self.message_timer, key=attrgetter('diff')):
message_queue += str(t)+"\n"

Gottfried Gaisbauer
committed
return message_queue
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def fetch_new_programme(self):

Gottfried Gaisbauer
committed
self.logger.info("trying to fetch new programme")
if self.tried_fetching == self.fetch_max:

Gottfried Gaisbauer
committed
msg = "Cannot connect to PV or Tank! No Programme loaded!"
self.logger.error(msg)

Gottfried Gaisbauer
committed
self.tried_fetching = 0

Gottfried Gaisbauer
committed
return msg

Gottfried Gaisbauer
committed
self.tried_fetching += 1

Gottfried Gaisbauer
committed
acs = AuraCalendarService(self.config)
queue = acs.get_queue()

Gottfried Gaisbauer
committed
# start fetching thread
acs.start()

Gottfried Gaisbauer
committed
# wait for the end
response = queue.get()

Gottfried Gaisbauer
committed
self.load_programme_from_db()

Gottfried Gaisbauer
committed
if self.programme is not None and len(self.programme) > 0:
self.tried_fetching = 0
if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
self.logger.critical("Programme loaded from database has no entries!")

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
# return self.get_act_programme_as_string()

Gottfried Gaisbauer
committed
elif response.startswith("fetching_aborted"):

Gottfried Gaisbauer
committed
self.logger.warning("Fetching was being aborted from AuraCalendarService! Reason: " + response)
else:

Gottfried Gaisbauer
committed
self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def set_next_file_for(self, playlistname):

Gottfried Gaisbauer
committed
self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
self.logger.critical(str(self.get_active_entry()))

Gottfried Gaisbauer
committed
if playlistname == "station":

Gottfried Gaisbauer
committed
file = "/var/audio/fallback/eins.zwo.bombe.flac"

Gottfried Gaisbauer
committed
elif playlistname == "timeslot":

Gottfried Gaisbauer
committed
file = "/var/audio/fallback/ratm.killing.flac"

Gottfried Gaisbauer
committed
elif playlistname == "show":

Gottfried Gaisbauer
committed
file = "/var/audio/fallback/weezer.hash.pipe.flac"

Gottfried Gaisbauer
committed
else:
file = ""
self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
self.logger.info("Set next fallback file for " + playlistname + ": " + file)
self.redismessenger.set_next_file_for(playlistname, file)
return file

Gottfried Gaisbauer
committed
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# ------------------------------------------------------------------------------------------ #
def get_next_file_for(self, fallbackname):
self.logger.critical("HAVE TO SET NEXT FILE FOR: " + fallbackname)
(show, entry) = self.get_active_entry()
self.logger.critical(str(show) + " " + str(entry))
if fallbackname == "station":
file = "/var/audio/fallback/eins.zwo.bombe.flac"
elif fallbackname == "timeslot":
file = "/var/audio/fallback/ratm.killing.flac"
elif fallbackname == "show":
file = "/var/audio/fallback/weezer.hash.pipe.flac"
else:
file = ""
self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!")
#set_next_file_thread = SetNextFile(fallbackname, show)
#set_next_file_thread.start()
# self.logger.info("Set next fallback file for " + playlistname + ": " + file)
# self.redismessenger.set_next_file_for(playlistname, file)
return file
# ------------------------------------------------------------------------------------------ #
class SetNextFile(threading.Thread):
fallbackname = None
show = None
def __init__(self, fallbackname, show):
threading.Thread.__init__(self)
self.fallbackname = fallbackname
self.show = show
def run(self):
if self.fallbackname == "show":
self.detect_next_file_for(self.show.showfallback)
elif self.fallbackname == "timeslow":
self.detect_next_file_for(self.show.timeslotfallback)
elif self.fallbackname == "station":
self.detect_next_file_for(self.show.stationfallback)
def detect_next_file_for(self, playlist):
return ""
#if playlist.startswith("pool"):
# self.find_next_file_in_pool(playlist)
#def find_next_file_in_pool(self, pool):
# return ""
# ------------------------------------------------------------------------------------------ #
class CallFunctionTimer(threading.Timer):

Gottfried Gaisbauer
committed
logger = None
param = None

Gottfried Gaisbauer
committed
entry = None

Gottfried Gaisbauer
committed
diff = None

Gottfried Gaisbauer
committed
fadein = False
fadeout = False
switcher = False

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False):

Gottfried Gaisbauer
committed
threading.Timer.__init__(self, diff, func, param)

Gottfried Gaisbauer
committed
if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")

Gottfried Gaisbauer
committed
self.diff = diff

Gottfried Gaisbauer
committed
self.func = func
self.param = param

Gottfried Gaisbauer
committed
self.entry = param[0]

Gottfried Gaisbauer
committed
self.fadein = fadein
self.fadeout = fadeout
self.switcher = switcher

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
self.logger = logging.getLogger("AuraEngine")
self.logger.debug(str(self))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def __str__(self):

Gottfried Gaisbauer
committed
if self.fadein:
return "CallFunctionTimer starting in " + str(self.diff) + "s fading in source '" + str(self.entry)
elif self.fadeout:
return "CallFunctionTimer starting in " + str(self.diff) + "s fading out source '" + str(self.entry)
elif self.switcher:
return "CallFunctionTimer starting in " + str(self.diff) + "s switching to source '" + str(self.entry)
else:

Gottfried Gaisbauer
committed
return "CORRUPTED CallFunctionTimer around! How can that be?"