Newer
Older
#
#
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# Meta

Gottfried Gaisbauer
committed
__version__ = '0.0.1'
__license__ = "GNU General Public License (GPL) Version 3"

Gottfried Gaisbauer
committed
__version_info__ = (0, 0, 1)
__author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
"""

Gottfried Gaisbauer
committed
Aura Scheduler
Is holding the eventqueue
"""
import time
import simplejson
import datetime

Gottfried Gaisbauer
committed
import decimal
import sqlalchemy

Gottfried Gaisbauer
committed
import logging
import threading
# Die eigenen Bibliotheken
from modules.communication.redis.messenger import RedisMessenger

Gottfried Gaisbauer
committed
from modules.scheduling.calendar import AuraCalendarService
from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
from libraries.exceptions.exception_logger import ExceptionLogger
from libraries.enum.auraenumerations import ScheduleEntryType

Gottfried Gaisbauer
committed
def alchemyencoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return float(obj)
elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
return ""

Gottfried Gaisbauer
committed
elif isinstance(obj, Schedule):
return simplejson.dumps([obj._asdict()], default=alchemyencoder)
else:

Gottfried Gaisbauer
committed
return str(obj)

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
class AuraScheduler(ExceptionLogger, threading.Thread):

Gottfried Gaisbauer
committed
"""
Aura Scheduler Class
Gets data from pv and importer, stores and fires events
"""

Gottfried Gaisbauer
committed
message_timer = []

Gottfried Gaisbauer
committed
job_result = {}
liquidsoapcommunicator = None
schedule_entries = None
active_entry = None

Gottfried Gaisbauer
committed
exit_event = None

Gottfried Gaisbauer
committed
programme = None
client = None

Gottfried Gaisbauer
committed
logger = None
config = None

Gottfried Gaisbauer
committed
tried_fetching = 0

Gottfried Gaisbauer
committed
fetch_max = 3

Gottfried Gaisbauer
committed
def __init__(self, config):
"""
Constructor

Gottfried Gaisbauer
committed
@type config: ConfigReader
@param config: read engine.ini
"""

Gottfried Gaisbauer
committed
self.config = config
self.logger = logging.getLogger("AuraEngine")
# init threading
threading.Thread.__init__(self)
# init messenger.. probably not needed anymore
self.redismessenger.set_channel('scheduler')
self.redismessenger.set_section('execjob')
# load schedulerconfig...

Gottfried Gaisbauer
committed
self.schedulerconfig = self.config.get("scheduler_config_file")
# load error messages

Gottfried Gaisbauer
committed
error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"
f = open(error_file)
self.error_data = simplejson.load(f)
f.close()
# init database ?
self.init_database()
self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')

Gottfried Gaisbauer
committed
# create exit event

Gottfried Gaisbauer
committed
self.exit_event = threading.Event()
# start loading new programm every hour
self.start()
# ------------------------------------------------------------------------------------------ #
def init_database(self):
# check if tables do exist. if not create them
try:
ScheduleEntry.select_all()
except sqlalchemy.exc.ProgrammingError as e:
if errcode == 1146: # error for no such table
x = AuraDatabaseModel()
x.recreate_db()
# ------------------------------------------------------------------------------------------ #
def run(self):
seconds_to_wait = int(self.config.get("fetching_frequency"))

Gottfried Gaisbauer
committed
#while True:
while not self.exit_event.is_set():
# calc next time
next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
# write to logger

Gottfried Gaisbauer
committed
self.logger.info("Fetch new programmes every " + str(seconds_to_wait) + "s started. Going to start next time " + str(next_time))

Gottfried Gaisbauer
committed
# empty database
# self.logger.info("emptying database")
# ScheduleEntry.truncate()

Gottfried Gaisbauer
committed
self.fetch_new_programme()

Gottfried Gaisbauer
committed
self.exit_event.wait(seconds_to_wait)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def stop(self):
self.exit_event.set()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
now_unix = time.mktime(datetime.datetime.now().timetuple())
lastentry = None
# load programme if necessary
if self.programme is None:

Gottfried Gaisbauer
committed
self.logger.debug("want to get active channel, but have to load programme first")

Gottfried Gaisbauer
committed
# get active source
for entry in self.programme:
# check if lastentry is set and if act entry is in the future

Gottfried Gaisbauer
committed
if lastentry is not None and entry.entry_start_unix > now_unix:
# return lastentry if so
return entry # actsource = entry.source

Gottfried Gaisbauer
committed
lastentry = entry
# ------------------------------------------------------------------------------------------ #
def load_programme_from_db(self, silent=False):
#self.programme = ScheduleEntry.select_all()
self.programme = ScheduleEntry.select_act_programme()
# now in unixtime
now_unix = time.mktime(datetime.datetime.now().timetuple())
# switch to check if its the first stream in loaded programme
first_stream_in_programme = False
for entry in self.programme:
# since we get also programmes from act hour, filter these out
if entry.entry_start_unix > now_unix:

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
diff = diff/1000 # testing purpose

Gottfried Gaisbauer
committed
# create the activation threads and run them after <diff> seconds
self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
elif entry.type == ScheduleEntryType.STREAM:
if first_stream_in_programme:
self.liquidsoapcommunicator.next_stream_source(entry.source)
first_stream_in_programme = False
self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
elif entry.type == ScheduleEntryType.FILESYSTEM:
self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry))
self.logger.warning(" Not setting any activation Thread!")

Gottfried Gaisbauer
committed
self.logger.info(str(entry))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def add_or_update_timer(self, entry, diff, func):
# check if something is planned at given time
planned_timer = self.is_something_planned_at_time(entry.entry_start)

Gottfried Gaisbauer
committed
# if something is planned on entry.entry_start
if planned_timer:
planned_entry = planned_timer.entry
# check if the playlist_id's are different
if planned_entry.playlist_id != entry.playlist_id:
# if not stop the old timer and remove it from the list
self.stop_timer(planned_timer)
# and create a new one

Gottfried Gaisbauer
committed
# if the playlist id's do not differ => do nothing, they are the same

Gottfried Gaisbauer
committed
# if nothing is planned at given time, create a new timer

Gottfried Gaisbauer
committed
else:

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def stop_timer(self, timer):
# stop timer
timer.cancel()

Gottfried Gaisbauer
committed
# and remove it from message queue

Gottfried Gaisbauer
committed
self.message_timer.remove(timer)
# ------------------------------------------------------------------------------------------ #
def create_timer(self, entry, diff, func):
t = CallFunctionTimer(diff, func, [entry])

Gottfried Gaisbauer
committed
self.message_timer.append(t)
t.start()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def is_something_planned_at_time(self, given_time):

Gottfried Gaisbauer
committed
for t in self.message_timer:

Gottfried Gaisbauer
committed
if t.entry.entry_start == given_time:

Gottfried Gaisbauer
committed
return t
return False
# ------------------------------------------------------------------------------------------ #
def find_entry_in_timers(self, entry):

Gottfried Gaisbauer
committed
# check if a playlist id is already planned

Gottfried Gaisbauer
committed
for t in self.message_timer:
if t.entry.playlist_id == entry.playlist_id and t.entry.entry_start == entry.entry_start:
return t
return False
# ------------------------------------------------------------------------------------------ #
def get_act_programme_as_string(self):
programme_as_string = ""

Gottfried Gaisbauer
committed
if self.programme is None or len(self.programme) == 0:
self.fetch_new_programme()
programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)

Gottfried Gaisbauer
committed
except Exception as e:
self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
traceback.print_exc()
return programme_as_string

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
message_queue = ""
for t in self.message_timer:
message_queue += t.get_info()+"\n"
return message_queue
# ------------------------------------------------------------------------------------------ #
def swap_playlist_entries(self, indexes):
from_entry = None
to_entry = None
from_idx = indexes["from_index"]
to_idx = indexes["to_index"]
for p in self.programme:
if p.programme_index == int(from_idx):
from_entry = p
if p.programme_index == int(to_idx):
to_entry = p
# break out of loop, if both entries found
if from_entry is not None and to_entry is not None:
break
if from_entry is None or to_entry is None:
return "From or To Entry not found!"
swap = from_entry.source
from_entry.source = to_entry.source
to_entry.source = swap
from_entry.store(add=False, commit=False)
to_entry.store(add=False, commit=True)
# and return the programme with swapped entries
return self.get_act_programme_as_string()
# ------------------------------------------------------------------------------------------ #
def delete_playlist_entry(self, index):
found = False
for p in self.programme:
if p.programme_index == int(index):
p.delete(True)
self.load_programme_from_db()
found = True
break
if not found:

Gottfried Gaisbauer
committed
self.logger.warning("Nothing to delete")
return self.get_act_programme_as_string()

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def insert_playlist_entry(self, fromtime_source):
fromtime = fromtime_source["fromtime"]
source = fromtime_source["source"]
entry = ScheduleEntry()
entry.entry_start = fromtime
entry.source = source
entry.playlist_id = 0
entry.schedule_id = 0
entry.entry_num = ScheduleEntry.select_next_manual_entry_num()
entry.store(add=True, commit=True)

Gottfried Gaisbauer
committed
self.load_programme_from_db()
return self.get_act_programme_as_string()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def fetch_new_programme(self):

Gottfried Gaisbauer
committed
self.logger.info("trying to fetch new programme")
if self.tried_fetching == self.fetch_max:

Gottfried Gaisbauer
committed
msg = "Cannot connect to PV or Tank! No Programme loaded!"
self.logger.error(msg)

Gottfried Gaisbauer
committed
self.tried_fetching = 0

Gottfried Gaisbauer
committed
return msg

Gottfried Gaisbauer
committed
self.tried_fetching += 1

Gottfried Gaisbauer
committed
acs = AuraCalendarService(self.config)
queue = acs.get_queue()

Gottfried Gaisbauer
committed
# start fetching thread
acs.start()

Gottfried Gaisbauer
committed
# wait for the end
response = queue.get()

Gottfried Gaisbauer
committed
self.load_programme_from_db()

Gottfried Gaisbauer
committed
if self.programme is not None and len(self.programme) > 0:
self.tried_fetching = 0
if len(self.programme) == 0 and self.tried_fetching == self.fetch_max:
self.logger.critical("Programme loaded from database has no entries!")

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
# return self.get_act_programme_as_string()

Gottfried Gaisbauer
committed
elif response.startswith("fetching_aborted"):
self.logger.warning("Fetching was being aborted from AuraCalendarService! Are you connected? Reason: " + response)
else:

Gottfried Gaisbauer
committed
self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def set_next_file_for(self, playlistname):

Gottfried Gaisbauer
committed
self.logger.critical("HAVE TO SET NEXT FILE FOR: " + playlistname)
self.logger.critical(str(self.get_active_entry()))

Gottfried Gaisbauer
committed
if playlistname == "station":
file = "/var/audio/fallback/eins.zwo.bombe.mp3"
elif playlistname == "timeslot":
file = "/var/audio/fallback/ratm.killing.mp3"
elif playlistname == "show":
file = "/var/audio/fallback/weezer.hash.pipe.mp3"
else:
file = ""
self.logger.critical("Should set next fallback file for " + playlistname + ", but this playlist is unknown!")
self.logger.info("Set next fallback file for " + playlistname + ": " + file)
self.redismessenger.set_next_file_for(playlistname, file)
return file
# ------------------------------------------------------------------------------------------ #
class CallFunctionTimer(threading.Timer):

Gottfried Gaisbauer
committed
logger = None

Gottfried Gaisbauer
committed
entry = None

Gottfried Gaisbauer
committed
debug = False
diff = None

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
def __init__(self, diff, func, param):

Gottfried Gaisbauer
committed
threading.Timer.__init__(self, diff, func, param)

Gottfried Gaisbauer
committed
self.diff = diff

Gottfried Gaisbauer
committed
self.func = func
self.entry = param[0]

Gottfried Gaisbauer
committed
self.logger = logging.getLogger("AuraEngine")

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
msg = "CallFunctionTimer starting @ " + str(self.entry.entry_start) + " source '" + str(self.entry.source) + "' In seconds: " + str(self.diff)
self.logger.debug(msg)