Newer
Older
#
#
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# Meta

Gottfried Gaisbauer
committed
__version__ = '0.0.1'
__license__ = "GNU General Public License (GPL) Version 3"

Gottfried Gaisbauer
committed
__version_info__ = (0, 0, 1)
__author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
"""

Gottfried Gaisbauer
committed
Aura Scheduler
Is holding the eventqueue
"""
import time
import simplejson
import datetime

Gottfried Gaisbauer
committed
import decimal
import sqlalchemy

Gottfried Gaisbauer
committed
import logging
import threading
from modules.communication.redis.messenger import RedisMessenger

Gottfried Gaisbauer
committed
from modules.scheduling.calendar import AuraCalendarService
from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
from libraries.exceptions.exception_logger import ExceptionLogger
from libraries.enum.auraenumerations import ScheduleEntryType

Gottfried Gaisbauer
committed
def alchemyencoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return float(obj)
elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
return ""

Gottfried Gaisbauer
committed
elif isinstance(obj, Schedule):
return simplejson.dumps([obj._asdict()], default=alchemyencoder)
else:

Gottfried Gaisbauer
committed
return str(obj)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
class AuraScheduler(ExceptionLogger, threading.Thread):

Gottfried Gaisbauer
committed
"""
Aura Scheduler Class
Gets data from pv and importer, stores and fires events
"""

Gottfried Gaisbauer
committed
message_timer = []

Gottfried Gaisbauer
committed
job_result = {}
liquidsoapcommunicator = None
schedule_entries = None
active_entry = None

Gottfried Gaisbauer
committed
exit_event = None

Gottfried Gaisbauer
committed
programme = None
client = None

Gottfried Gaisbauer
committed
logger = None
config = None

Gottfried Gaisbauer
committed
tried_fetching = 0

Gottfried Gaisbauer
committed
fetch_max = 3

Gottfried Gaisbauer
committed
def __init__(self, config):
"""
Constructor

Gottfried Gaisbauer
committed
@type config: ConfigReader
@param config: read engine.ini
"""

Gottfried Gaisbauer
committed
self.config = config
self.logger = logging.getLogger("AuraEngine")
# init threading
threading.Thread.__init__(self)
# init messenger.. probably not needed anymore
self.redismessenger.set_channel('scheduler')
self.redismessenger.set_section('execjob')
# load error messages

Gottfried Gaisbauer
committed
error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
f = open(error_file)
self.error_data = simplejson.load(f)
f.close()
# init database ?
self.init_database()

Gottfried Gaisbauer
committed
#self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')

Gottfried Gaisbauer
committed
# create exit event

Gottfried Gaisbauer
committed
self.exit_event = threading.Event()
# start loading new programm every hour
self.start()
# ------------------------------------------------------------------------------------------ #
def init_database(self):
# check if tables do exist. if not create them
try:
ScheduleEntry.select_all()
except sqlalchemy.exc.ProgrammingError as e:
if errcode == 1146: # error for no such table
x = AuraDatabaseModel()
x.recreate_db()
# ------------------------------------------------------------------------------------------ #
def run(self):

Gottfried Gaisbauer
committed
#while True:
while not self.exit_event.is_set():
# set seconds to wait
seconds_to_wait = int(self.config.get("fetching_frequency"))
# calc next time
next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
# write to logger

Gottfried Gaisbauer
committed
self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))

Gottfried Gaisbauer
committed
# empty database
# self.logger.info("emptying database")
# ScheduleEntry.truncate()

Gottfried Gaisbauer
committed
self.fetch_new_programme()

Gottfried Gaisbauer
committed
self.exit_event.wait(seconds_to_wait)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def stop(self):
self.exit_event.set()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
now_unix = time.mktime(datetime.datetime.now().timetuple())
lastentry = None
# load programme if necessary
if self.programme is None:

Gottfried Gaisbauer
committed
self.logger.debug("want to get active channel, but have to load programme first")

Gottfried Gaisbauer
committed
# get active source
for entry in self.programme:
# check if lastentry is set and if act entry is in the future

Gottfried Gaisbauer
committed
if lastentry is not None and entry.entry_start_unix > now_unix:
# return entry if so
return entry # actsource = entry.source

Gottfried Gaisbauer
committed
lastentry = entry
# ------------------------------------------------------------------------------------------ #
def load_programme_from_db(self, silent=False):
#self.programme = ScheduleEntry.select_all()
self.programme = ScheduleEntry.select_act_programme()
# now in unixtime
now_unix = time.mktime(datetime.datetime.now().timetuple())
# switch to check if its the first stream in loaded programme
first_stream_in_programme = False
# fading times
fade_in_time = float(self.config.get("fade_in_time"))
fade_out_time = float(self.config.get("fade_out_time"))
# old entry for fading out
old_entry = None
for entry in self.programme:
# since we get also programmes from act hour, filter these out
if entry.entry_start_unix > now_unix:

Gottfried Gaisbauer
committed
diff = diff/1000 # testing purpose
self.enable_fading(fade_in_time, fade_out_time, old_entry, entry, diff)

Gottfried Gaisbauer
committed
# create the activation threads and run them after <diff> seconds
self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
elif entry.type == ScheduleEntryType.STREAM:
if first_stream_in_programme:
self.liquidsoapcommunicator.next_stream_source(entry.source)
first_stream_in_programme = False
self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
elif entry.type == ScheduleEntryType.FILESYSTEM:
self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry))
self.logger.warning(" Not setting any activation Thread!")
# store the old entry for fading out
old_entry = entry
self.logger.warning(self.print_message_queue())
# ------------------------------------------------------------------------------------------ #
def enable_fading(self, fade_in_time, fade_out_time, old_entry, new_entry, diff):
# enable fading when entry types are different
if old_entry is not None:
if old_entry.type != new_entry.type:
# enable fadeout if enabled
if fade_out_time != 0 and old_entry is not None:
params = [old_entry, -fade_out_time]
self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, params)
# same for fadein
if fade_in_time != 0:
params = [new_entry, fade_in_time]
self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, params)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def add_or_update_timer(self, diff, func, parameters):
length = len(parameters)
if length == 2:
entry = parameters[0]
fadingtime = parameters[1]
elif length == 1:
entry = parameters[0]
fadingtime = 0
# check if something is planned at given time
if fadingtime < 0:
planned_timer = self.is_something_planned_at_time(entry.entry_start + datetime.timedelta(seconds=fadingtime))
else:
planned_timer = self.is_something_planned_at_time(entry.entry_start)

Gottfried Gaisbauer
committed
# if something is planned on entry.entry_start
if planned_timer:
planned_entry = planned_timer.entry
# check if the playlist_id's are different
if planned_entry.playlist_id != entry.playlist_id:
# if not stop the old timer and remove it from the list
self.stop_timer(planned_timer)
# and create a new one
self.create_timer(diff, func, parameters)

Gottfried Gaisbauer
committed
# if the playlist id's do not differ => reuse the old timer and do nothing, they are the same

Gottfried Gaisbauer
committed
# if nothing is planned at given time, create a new timer

Gottfried Gaisbauer
committed
else:
self.create_timer(diff, func, parameters)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def stop_timer(self, timer):
# stop timer
timer.cancel()

Gottfried Gaisbauer
committed
# and remove it from message queue

Gottfried Gaisbauer
committed
self.message_timer.remove(timer)
# ------------------------------------------------------------------------------------------ #
def create_timer(self, diff, func, parameters):
t = CallFunctionTimer(diff, func, parameters)

Gottfried Gaisbauer
committed
self.message_timer.append(t)
t.start()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def is_something_planned_at_time(self, given_time):

Gottfried Gaisbauer
committed
for t in self.message_timer:

Gottfried Gaisbauer
committed
if t.entry.entry_start == given_time:

Gottfried Gaisbauer
committed
return t
return False
# ------------------------------------------------------------------------------------------ #
def get_act_programme_as_string(self):
programme_as_string = ""

Gottfried Gaisbauer
committed
if self.programme is None or len(self.programme) == 0:
self.fetch_new_programme()
programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)

Gottfried Gaisbauer
committed
except Exception as e:
self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
traceback.print_exc()
return programme_as_string

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
message_queue = ""
for t in self.message_timer:
message_queue += str(t)+"\n"

Gottfried Gaisbauer
committed
return message_queue
# ------------------------------------------------------------------------------------------ #
def swap_playlist_entries(self, indexes):
from_entry = None
to_entry = None
from_idx = indexes["from_index"]
to_idx = indexes["to_index"]
for p in self.programme:
if p.programme_index == int(from_idx):
from_entry = p
if p.programme_index == int(to_idx):
to_entry = p
# break out of loop, if both entries found
if from_entry is not None and to_entry is not None:
break
if from_entry is None or to_entry is None:
return "From or To Entry not found!"
swap = from_entry.source
from_entry.source = to_entry.source
to_entry.source = swap
from_entry.store(add=False, commit=False)
to_entry.store(add=False, commit=True)
# and return the programme with swapped entries
return self.get_act_programme_as_string()
# ------------------------------------------------------------------------------------------ #
def delete_playlist_entry(self, index):
found = False
for p in self.programme:
if p.programme_index == int(index):
p.delete(True)
self.load_programme_from_db()
found = True
break
if not found:

Gottfried Gaisbauer
committed
self.logger.warning("Nothing to delete")
return self.get_act_programme_as_string()

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def insert_playlist_entry(self, fromtime_source):
fromtime = fromtime_source["fromtime"]
source = fromtime_source["source"]
entry = ScheduleEntry()
entry.entry_start = fromtime
entry.source = source
entry.playlist_id = 0
entry.schedule_id = 0
entry.entry_num = ScheduleEntry.select_next_manual_entry_num()
entry.store(add=True, commit=True)

Gottfried Gaisbauer
committed
self.load_programme_from_db()
return self.get_act_programme_as_string()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def fetch_new_programme(self):

Gottfried Gaisbauer
committed
self.logger.info("trying to fetch new programme")
if self.tried_fetching == self.fetch_max:

Gottfried Gaisbauer
committed
msg = "Cannot connect to PV or Tank! No Programme loaded!"
self.logger.error(msg)

Gottfried Gaisbauer
committed
self.tried_fetching = 0

Gottfried Gaisbauer
committed
return msg

Gottfried Gaisbauer
committed
self.tried_fetching += 1

Gottfried Gaisbauer
committed
acs = AuraCalendarService(self.config)
queue = acs.get_queue()

Gottfried Gaisbauer
committed
# start fetching thread
acs.start()

Gottfried Gaisbauer
committed
# wait for the end
response = queue.get()

Gottfried Gaisbauer
committed
self.load_programme_from_db()

Gottfried Gaisbauer
committed
if self.programme is not None and len(self.programme) > 0:
self.tried_fetching = 0
if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
self.logger.critical("Programme loaded from database has no entries!")

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
# return self.get_act_programme_as_string()

Gottfried Gaisbauer
committed
elif response.startswith("fetching_aborted"):
self.logger.warning("Fetching was being aborted from AuraCalendarService! Are you connected? Reason: " + response)
else:

Gottfried Gaisbauer
committed
self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def set_next_file_for(self, playlistname):

Gottfried Gaisbauer
committed
self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
self.logger.critical(str(self.get_active_entry()))

Gottfried Gaisbauer
committed
if playlistname == "station":
file = "/var/audio/fallback/eins.zwo.bombe.mp3"
elif playlistname == "timeslot":
file = "/var/audio/fallback/ratm.killing.mp3"
elif playlistname == "show":
file = "/var/audio/fallback/weezer.hash.pipe.mp3"
else:
file = ""
self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
self.logger.info("Set next fallback file for " + playlistname + ": " + file)
self.redismessenger.set_next_file_for(playlistname, file)
return file
# ------------------------------------------------------------------------------------------ #
class CallFunctionTimer(threading.Timer):

Gottfried Gaisbauer
committed
logger = None
param = None

Gottfried Gaisbauer
committed
entry = None

Gottfried Gaisbauer
committed
diff = None

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
def __init__(self, diff, func, param):

Gottfried Gaisbauer
committed
threading.Timer.__init__(self, diff, func, param)

Gottfried Gaisbauer
committed
self.diff = diff

Gottfried Gaisbauer
committed
self.func = func
self.param = param

Gottfried Gaisbauer
committed
self.entry = param[0]

Gottfried Gaisbauer
committed
self.logger = logging.getLogger("AuraEngine")
self.logger.debug(str(self))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def __str__(self):
if len(self.param) >= 2:
# fading in
if self.param[1] > 0:
return "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " fading in source '" + str(self.entry.source) + "' in seconds: " + str(self.diff)
# fading out
else:
return "CallFunctionTimer starting @ " + str(self.entry.entry_start + datetime.timedelta(seconds=self.param[1])) + " fading out source '" + str(self.entry.source) + "' in seconds: " + str(self.diff+self.param[1])
else:
return "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " switching to source '" + str(self.entry.source) + "' in seconds: " + str(self.diff)