Newer
Older
#
#
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# Meta

Gottfried Gaisbauer
committed
__version__ = '0.0.1'
__license__ = "GNU General Public License (GPL) Version 3"

Gottfried Gaisbauer
committed
__version_info__ = (0, 0, 1)
__author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
import time
import datetime

Gottfried Gaisbauer
committed
import decimal
import sqlalchemy

Gottfried Gaisbauer
committed
import logging
import threading

Gottfried Gaisbauer
committed
from operator import attrgetter
from modules.communication.redis.messenger import RedisMessenger

Gottfried Gaisbauer
committed
from modules.scheduling.calendar import AuraCalendarService

Gottfried Gaisbauer
committed
from libraries.database.broadcasts import Schedule, Playlist, AuraDatabaseModel
from libraries.exceptions.exception_logger import ExceptionLogger

Gottfried Gaisbauer
committed
from libraries.enum.auraenumerations import ScheduleEntryType, TimerType

Gottfried Gaisbauer
committed
def alchemyencoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return float(obj)
elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
return ""

Gottfried Gaisbauer
committed
elif isinstance(obj, Schedule):
return json.dumps([obj._asdict()], default=alchemyencoder)
else:

Gottfried Gaisbauer
committed
return str(obj)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
class AuraScheduler(ExceptionLogger, threading.Thread):

Gottfried Gaisbauer
committed
"""
Aura Scheduler Class
- Gets data from Steering and Tanks
- Stores and fires events for LiquidSoap
Attributes:
config (AuraConfig): Holds the Engine Configuration
logger: The logger
exit_event(threading.Event): Used to exit the thread if requested
liquidsoapcommunicator: Stores the connection to LiquidSoap
last_successful_fetch (datetime): Stores the last time a fetch from Steering/Tank was successful
programme: The current radio programme to be played as defined in the local engine database
active_entry(Show, Track): This is a Tuple consisting of the currently played `Show` and `Track`
message_timer(Array<threading.Timer>): The message queue of tracks to be played

Gottfried Gaisbauer
committed
"""

Gottfried Gaisbauer
committed
job_result = {}
config = None
logger = None
exit_event = None
last_successful_fetch = None

Gottfried Gaisbauer
committed
programme = None
active_entry = None
message_timer = []
#schedule_entries = None
client = None

Gottfried Gaisbauer
committed
def __init__(self, config):
"""
Constructor
Args:
config (AuraConfig): Reads the engine configuration
"""

Gottfried Gaisbauer
committed
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.init_error_messages()

Gottfried Gaisbauer
committed
self.init_database()
self.redismessenger = RedisMessenger(config)
# init threading
threading.Thread.__init__(self)
# init messenger.. probably not needed anymore
self.redismessenger.set_channel('scheduler')
self.redismessenger.set_section('execjob')

Gottfried Gaisbauer
committed
#self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')

Gottfried Gaisbauer
committed
self.exit_event = threading.Event()
# Start thread to load new programme info every hour
self.start()
def run(self):
Called when thread is started via `start()`. It calls `self.fetch_new_programme()`
periodically depending on the `fetching_frequency` define engine configuration.
"""

Gottfried Gaisbauer
committed
while not self.exit_event.is_set():
seconds_to_wait = int(self.config.get("fetching_frequency"))
next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
self.logger.info("Fetch new programmes every %ss. Next fetching in %ss." % (str(seconds_to_wait), str(next_time)))

Gottfried Gaisbauer
committed
# empty database
# self.logger.info("emptying database")
# ScheduleEntry.truncate()

Gottfried Gaisbauer
committed
self.fetch_new_programme()

Gottfried Gaisbauer
committed
self.exit_event.wait(seconds_to_wait)
#
# PUBLIC METHODS
#
"""
Retrieves the current `Show` and `Track` tuple being played.
Externally called via `LiquidSoapCommunicator`.

Gottfried Gaisbauer
committed
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
Returns:
(Show, Entry): The show and track to be played next.
"""
# now_unix = time.mktime(datetime.datetime.now().timetuple())
# lastentry = None
# # Load programme if necessary
# if self.programme is None:
# self.logger.info("Next track requested: Need to load programme from database first.")
# self.load_programme_from_db()
# # Get the entry currently being played
# for show in self.programme:
# for entry in show.playlist:
# # check if lastentry is set and if act entry is in the future
# if lastentry is not None and entry.start_unix > now_unix:
# # return entry if so
# return (show,entry) # actsource = entry.source
# lastentry = entry
# return None, None
# FIXME active_entry logic
if not self.active_entry:
self.logger.warning("No active entry set! Is currently nothing or a fallback playing?")
return (None, None)
else:
return self.active_entry

Gottfried Gaisbauer
committed
def get_act_programme_as_string(self):
"""
Fetches the latest programme and returns it as `String`.
Also used by `ServerRedisAdapter`.
Return:
(String): Programme
Raises:
(Exception): In case the programme cannot be converted to String
"""
programme_as_string = ""
if self.programme is None or len(self.programme) == 0:
try:
programme_as_string = json.dumps([p._asdict() for p in self.programme], default=alchemyencoder)
# FIXME Change to more specific exception
except Exception as e:
self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
traceback.print_exc()
return programme_as_string
def print_message_queue(self):
"""
Prints the current message queue i.e. tracks in the queue to be played.
"""
message_queue = ""
messages = sorted(self.message_timer, key=attrgetter('diff'))
if not messages:
self.logger.warning("There's nothing in the Message Queue!")
else:
for msg in messages:
message_queue += str(msg)+"\n"
self.logger.info("Message Queue: " + message_queue)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def set_next_file_for(self, playlistname):
self.logger.critical("HAVE TO <SET> NEXT FILE FOR: " + playlistname)
self.logger.critical(str(self.get_active_entry()))
if playlistname == "station":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
elif playlistname == "timeslot":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
elif playlistname == "show":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
else:
file = ""
self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
self.logger.info("Set next fallback file for " + playlistname + ": " + file)
self.redismessenger.set_next_file_for(playlistname, file)
return file
def get_next_file_for(self, fallbackname):
"""
Evaluates the next fallback file to be played for a given fallback-type.
Valid fallback-types are:
* timeslot
* show
* station
Returns:
(String): Absolute path to the file to be played as a fallback.
"""
self.logger.critical("HAVE TO <GET> NEXT FILE FOR: " + fallbackname)
(show, entry) = self.get_active_entry()
self.logger.critical(str(show) + " " + str(entry))
if fallbackname == "timeslot":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
elif fallbackname == "show":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
elif fallbackname == "station":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
else:
file = ""
self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!")
#set_next_file_thread = SetNextFile(fallbackname, show)
#set_next_file_thread.start()
self.logger.info("Got next fallback file for '" + fallbackname + "': " + file)
# self.redismessenger.set_next_file_for(playlistname, file)
return file
#
# PRIVATE METHODS
#
def fetch_new_programme(self):
"""
Fetch the latest programme from `AuraCalendarService`.
In case no programme is successfully returned, it is tried
to retrieve the programme from Engine's database.
"""
self.logger.info("Trying to fetch new program...")
acs = AuraCalendarService(self.config)
queue = acs.get_queue()
acs.start() # start fetching thread
response = queue.get() # wait for the end
# Reset last successful fetch state
lsf = self.last_successful_fetch
self.last_successful_fetch = None
if response is None:
self.logger.warning("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.")
elif type(response) is list:
self.programme = response
if self.programme is not None and len(self.programme) > 0:
self.last_successful_fetch = datetime.datetime.now()
if len(self.programme) == 0:
self.logger.critical("Programme fetched from Steering/Tank has no entries!")
# return self.get_act_programme_as_string()
elif response.startswith("fetching_aborted"):
# TODO Check why the 16th entry is logged only
self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:])
else:
self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response)
# if somehow the programme could not be fetched => try to load it from database
#if self.last_successful_fetch is None:
self.last_successful_fetch = lsf
self.load_programme_from_db()
def load_programme_from_db(self):
"""
Loads the programme from Engine's database and enables
them via `self.enable_entries(..)`. After that, the
current message queue is printed to the console.
"""

Gottfried Gaisbauer
committed
self.programme = Schedule.select_act_programme()
if self.programme is None or len(self.programme) == 0:
self.logger.critical("Could not load programme from database. We are in big trouble my friend!")
return

Gottfried Gaisbauer
committed
planned_entries = []
for schedule in self.programme:
# playlist to play

Gottfried Gaisbauer
committed
schedule.playlist = Playlist.select_playlist(schedule.playlist_id)

Gottfried Gaisbauer
committed
# show fallback is played when playlist fails

Gottfried Gaisbauer
committed
schedule.showfallback = Playlist.select_playlist(schedule.show_fallback_id)

Gottfried Gaisbauer
committed
# timeslot fallback is played when show fallback fails

Gottfried Gaisbauer
committed
schedule.timeslotfallback = Playlist.select_playlist(schedule.timeslot_fallback_id)

Gottfried Gaisbauer
committed
# station fallback is played when timeslot fallback fails

Gottfried Gaisbauer
committed
schedule.stationfallback = Playlist.select_playlist(schedule.station_fallback_id)

Gottfried Gaisbauer
committed
for p in schedule.playlist:
planned_entries.append(p)
self.enable_entries(planned_entries)
self.print_message_queue()

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def enable_entries(self, playlist):
# now in unixtime
now_unix = time.mktime(datetime.datetime.now().timetuple())
# switch to check if its the first stream in loaded programme

Gottfried Gaisbauer
committed
first_stream_in_programme = True
# old entry for fading out
old_entry = None
# FIXME Correct timing behaviour
time_marker = playlist[0].start_unix
for entry in playlist[0].entries:
track_len = (entry.duration / 1000000 / 60)
time_marker += track_len

Gottfried Gaisbauer
committed
# since we get also programmes from the past, filter these out
if time_marker > now_unix:
diff = time_marker - now_unix

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
diff = diff/100 # testing purpose

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
# enable the three timer
self.enable_timer(diff, entry, old_entry)
# store the old entry for fading out
old_entry = entry
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def enable_timer(self, diff, entry, old_entry):
# create the activation threads and run them after <diff> seconds
self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry))
entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
self.enable_fading(diff, entry, old_entry)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def enable_fading(self, diff, new_entry, old_entry):
# fading times
fade_out_time = float(self.config.get("fade_out_time"))
# enable fading when entry types are different
if old_entry is not None:
if old_entry.type != new_entry.type:

Gottfried Gaisbauer
committed
#self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry])
old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True)
self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry))

Gottfried Gaisbauer
committed
# same for fadein except old_entry can be None

Gottfried Gaisbauer
committed
#self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry])
new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True)
self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def add_or_update_timer(self, diff, func, parameters):

Gottfried Gaisbauer
committed
timer = None
entry = parameters[0]
planned_timer = self.is_something_planned_at_time(entry.schedule_start)

Gottfried Gaisbauer
committed
# if something is planned on entry.entry_start

Gottfried Gaisbauer
committed
if planned_timer:
planned_entry = planned_timer.entry
# check if the playlist_id's are different
if planned_entry.playlist_id != entry.playlist_id:
# if not stop the old timer and remove it from the list
self.stop_timer(planned_timer)
# and create a new one

Gottfried Gaisbauer
committed
timer = self.create_timer(diff, func, parameters, switcher=True)

Gottfried Gaisbauer
committed
# if the playlist id's do not differ => reuse the old timer and do nothing, they are the same

Gottfried Gaisbauer
committed
# if nothing is planned at given time, create a new timer

Gottfried Gaisbauer
committed
else:

Gottfried Gaisbauer
committed
timer = self.create_timer(diff, func, parameters, switcher=True)
if timer is None:
return planned_timer
return timer

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def stop_timer(self, timer):
# stop timer
timer.cancel()

Gottfried Gaisbauer
committed
if timer.entry.fadeintimer is not None:
timer.entry.fadeintimer.cancel()
self.message_timer.remove(timer.entry.fadeintimer)
if timer.entry.fadeouttimer is not None:
timer.entry.fadeouttimer.cancel()
self.message_timer.remove(timer.entry.fadeouttimer)

Gottfried Gaisbauer
committed
# and remove it from message queue

Gottfried Gaisbauer
committed
self.message_timer.remove(timer)

Gottfried Gaisbauer
committed
self.logger.critical("REMOVED TIMER for " + str(timer.entry))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False):
if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True")
t = CallFunctionTimer(diff, func, parameters, fadein, fadeout, switcher)

Gottfried Gaisbauer
committed
self.message_timer.append(t)
t.start()

Gottfried Gaisbauer
committed
return t

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def is_something_planned_at_time(self, given_time):

Gottfried Gaisbauer
committed
for t in self.message_timer:

Gottfried Gaisbauer
committed
if t.entry.entry_start == given_time:

Gottfried Gaisbauer
committed
return t
return False
def init_error_messages(self):
"""
Load error messages
"""
error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
f = open(error_file)
self.error_data = json.load(f)
f.close()

Gottfried Gaisbauer
committed
def init_database(self):
"""
Initializes the database.
Raises:
sqlalchemy.exc.ProgrammingError: In case the DB model is invalid
"""
if self.config.get("recreate_db") is not None:
AuraDatabaseModel.recreate_db(systemexit=True)
# Check if tables exists, if not create them
try:
Playlist.select_all()
except sqlalchemy.exc.ProgrammingError as e:
errcode = e.orig.args[0]

Gottfried Gaisbauer
committed
if errcode == 1146: # Error for no such table
x = AuraDatabaseModel()
x.recreate_db()
else:
raise

Gottfried Gaisbauer
committed
def stop(self):
"""
Called when thread is stopped.
"""
self.exit_event.set()

Gottfried Gaisbauer
committed
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# ------------------------------------------------------------------------------------------ #
class SetNextFile(threading.Thread):
fallbackname = None
show = None
def __init__(self, fallbackname, show):
threading.Thread.__init__(self)
self.fallbackname = fallbackname
self.show = show
def run(self):
if self.fallbackname == "show":
self.detect_next_file_for(self.show.showfallback)
elif self.fallbackname == "timeslow":
self.detect_next_file_for(self.show.timeslotfallback)
elif self.fallbackname == "station":
self.detect_next_file_for(self.show.stationfallback)
def detect_next_file_for(self, playlist):
return ""
#if playlist.startswith("pool"):
# self.find_next_file_in_pool(playlist)
#def find_next_file_in_pool(self, pool):
# return ""
# ------------------------------------------------------------------------------------------ #
class CallFunctionTimer(threading.Timer):

Gottfried Gaisbauer
committed
logger = None
param = None

Gottfried Gaisbauer
committed
entry = None

Gottfried Gaisbauer
committed
diff = None

Gottfried Gaisbauer
committed
fadein = False
fadeout = False
switcher = False

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False):

Gottfried Gaisbauer
committed
threading.Timer.__init__(self, diff, func, param)

Gottfried Gaisbauer
committed
if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")

Gottfried Gaisbauer
committed
self.diff = diff

Gottfried Gaisbauer
committed
self.func = func
self.param = param

Gottfried Gaisbauer
committed
self.entry = param[0]

Gottfried Gaisbauer
committed
self.fadein = fadein
self.fadeout = fadeout
self.switcher = switcher

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
self.logger = logging.getLogger("AuraEngine")
self.logger.debug(str(self))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def __str__(self):

Gottfried Gaisbauer
committed
if self.fadein:
return "CallFunctionTimer starting in " + str(self.diff) + "s fading in source '" + str(self.entry)
elif self.fadeout:
return "CallFunctionTimer starting in " + str(self.diff) + "s fading out source '" + str(self.entry)
elif self.switcher:
return "CallFunctionTimer starting in " + str(self.diff) + "s switching to source '" + str(self.entry)
else:

Gottfried Gaisbauer
committed
return "CORRUPTED CallFunctionTimer around! How can that be?"