Newer
Older
# -*- coding: utf-8 -*-
#

Gottfried Gaisbauer
committed
# scheduler.py
#

Gottfried Gaisbauer
committed
# Copyright 2018 Radio FRO <https://fro.at>, Radio Helsinki <https://helsinki.at>, Radio Orange <https://o94.at>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; Version 3 of the License
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, the license can be downloaded here:
#
# http://www.gnu.org/licenses/gpl.html
# Meta

Gottfried Gaisbauer
committed
__version__ = '0.0.1'
__license__ = "GNU General Public License (GPL) Version 3"

Gottfried Gaisbauer
committed
__version_info__ = (0, 0, 1)
__author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>'
"""

Gottfried Gaisbauer
committed
Aura Scheduler
Is holding the eventqueue
"""
import time
import simplejson
import datetime

Gottfried Gaisbauer
committed
import decimal
import sqlalchemy

Gottfried Gaisbauer
committed
import logging
import threading
# Die eigenen Bibliotheken
from modules.communication.redis.messenger import RedisMessenger

Gottfried Gaisbauer
committed
from modules.scheduling.calendar import AuraCalendarService
from libraries.database.broadcasts import Schedule, ScheduleEntry, AuraDatabaseModel
from libraries.exceptions.exception_logger import ExceptionLogger

Gottfried Gaisbauer
committed
from libraries.enum.scheduleentrytype import ScheduleEntryType

Gottfried Gaisbauer
committed
def alchemyencoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return float(obj)
elif isinstance(obj, sqlalchemy.orm.state.InstanceState):
return ""

Gottfried Gaisbauer
committed
elif isinstance(obj, Schedule):
return simplejson.dumps([obj._asdict()], default=alchemyencoder)
else:

Gottfried Gaisbauer
committed
return str(obj)

Gottfried Gaisbauer
committed
class AuraScheduler(ExceptionLogger, threading.Thread):

Gottfried Gaisbauer
committed
"""
Aura Scheduler Class
Gets data from pv and importer, stores and fires events
"""
redismessenger = RedisMessenger()
liquidsoapcommunicator = None

Gottfried Gaisbauer
committed
schedule_entries = None

Gottfried Gaisbauer
committed
message_timer = []

Gottfried Gaisbauer
committed
job_result = {}
programme = None
client = None

Gottfried Gaisbauer
committed
active_entry = None

Gottfried Gaisbauer
committed
logger = None
config = None

Gottfried Gaisbauer
committed
def __init__(self, config):
"""
Constructor

Gottfried Gaisbauer
committed
@type config: ConfigReader
@param config: read engine.ini
"""

Gottfried Gaisbauer
committed
self.config = config
self.logger = logging.getLogger("AuraEngine")
# init threading
threading.Thread.__init__(self)
# init messenger.. probably not needed anymore
self.redismessenger.set_channel('scheduler')
self.redismessenger.set_section('execjob')
# load schedulerconfig...

Gottfried Gaisbauer
committed
self.schedulerconfig = self.config.get("scheduler_config_file")
# load error messages

Gottfried Gaisbauer
committed
error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js"

Gottfried Gaisbauer
committed
self.error_data = simplejson.load(open(error_file))
# init database ?
self.init_database()
self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
# start loading new programm every hour
self.start()
# ------------------------------------------------------------------------------------------ #
def init_database(self):
# check if tables do exist. if not create them
try:
ScheduleEntry.select_all()
except sqlalchemy.exc.ProgrammingError as e:
errcode = e.orig.args[0]
if errcode == 1146: # error for no such table
x = AuraDatabaseModel()
x.recreate_db()
def run(self):
while True:
self.fetch_new_programme()
time.sleep(3600)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
now_unix = time.mktime(datetime.datetime.now().timetuple())

Gottfried Gaisbauer
committed
lastentry = None
# load programme if necessary
if self.programme is None:

Gottfried Gaisbauer
committed
self.logger.debug("want to get active channel, but have to load programme first")

Gottfried Gaisbauer
committed
# get active source
for entry in self.programme:
# check if lastentry is set and if act entry is in the future

Gottfried Gaisbauer
committed
if lastentry is not None and entry.entry_start_unix > now_unix:
# return lastentry if so
return entry # actsource = entry.source
# break

Gottfried Gaisbauer
committed
lastentry = entry
# ------------------------------------------------------------------------------------------ #
def load_programme_from_db(self, silent=False):

Gottfried Gaisbauer
committed
self.logger.info("i am the scheduler and i am holding the following stuff")
# now in unixtime
now_unix = time.mktime(datetime.datetime.now().timetuple())
# switch to check if its the first stream in loaded programme
first_stream_in_programme = False
for entry in self.programme:
# since we get also programmes from act hour, filter these out
if entry.entry_start_unix > now_unix:

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
diff = diff/50000 # testing purpose

Gottfried Gaisbauer
committed
# create the activation threads and run them after <diff> seconds
self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
elif entry.type == ScheduleEntryType.STREAM:
if first_stream_in_programme:
self.liquidsoapcommunicator.next_stream_source(entry.source)
first_stream_in_programme = False
self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
elif entry.type == ScheduleEntryType.FILESYSTEM:
self.add_or_update_timer(entry, diff, self.liquidsoapcommunicator.activate)

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
self.logger.warning("Scheduler cannot understand source '" + entry.source + "' from " + str(entry))
self.logger.warning(" Not setting any activation Thread!")

Gottfried Gaisbauer
committed
self.logger.info(str(entry))

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def add_or_update_timer(self, entry, diff, func):
# check if something is planned at given time
planned_timer = self.is_something_planned_at_time(entry.entry_start)

Gottfried Gaisbauer
committed
# if something is planned on entry.entry_start
if planned_timer:
planned_entry = planned_timer.entry
# check if the playlist_id's are different
if planned_entry.playlist_id != entry.playlist_id:
# if not stop the old timer and remove it from the list
self.stop_timer(planned_timer)
# and create a new one

Gottfried Gaisbauer
committed
# if the playlist id's do not differ => do nothing, they are the same

Gottfried Gaisbauer
committed
# if nothing is planned at given time, create a new timer

Gottfried Gaisbauer
committed
else:

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def stop_timer(self, timer):
# stop timer
timer.cancel()

Gottfried Gaisbauer
committed
# and remove it from message queue

Gottfried Gaisbauer
committed
self.message_timer.remove(timer)
# ------------------------------------------------------------------------------------------ #
def create_timer(self, entry, diff, func):
t = CallFunctionTimer(diff, func, [entry])

Gottfried Gaisbauer
committed
self.message_timer.append(t)
t.start()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def is_something_planned_at_time(self, given_time):

Gottfried Gaisbauer
committed
for t in self.message_timer:

Gottfried Gaisbauer
committed
if t.entry.entry_start == given_time:

Gottfried Gaisbauer
committed
return t
return False
# ------------------------------------------------------------------------------------------ #
def find_entry_in_timers(self, entry):

Gottfried Gaisbauer
committed
# check if a playlist id is already planned

Gottfried Gaisbauer
committed
for t in self.message_timer:
if t.entry.playlist_id == entry.playlist_id and t.entry.entry_start == entry.entry_start:
return t
return False
# ------------------------------------------------------------------------------------------ #
def get_act_programme_as_string(self):
programme_as_string = ""

Gottfried Gaisbauer
committed
if self.programme is None or len(self.programme) == 0:
self.fetch_new_programme()
programme_as_string = simplejson.dumps([p._asdict() for p in self.programme], default=alchemyencoder)

Gottfried Gaisbauer
committed
except Exception as e:
self.logger.error("Cannot transform programme into JSON String. Reason: " + str(e))
traceback.print_exc()
return programme_as_string

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
message_queue = ""
for t in self.message_timer:
message_queue += t.get_info()+"\n"
return message_queue
# ------------------------------------------------------------------------------------------ #
def swap_playlist_entries(self, indexes):
from_entry = None
to_entry = None
from_idx = indexes["from_index"]
to_idx = indexes["to_index"]
for p in self.programme:
if p.programme_index == int(from_idx):
from_entry = p
if p.programme_index == int(to_idx):
to_entry = p
# break out of loop, if both entries found
if from_entry is not None and to_entry is not None:
break
if from_entry is None or to_entry is None:
return "From or To Entry not found!"
swap = from_entry.source
from_entry.source = to_entry.source
to_entry.source = swap
from_entry.store(add=False, commit=False)
to_entry.store(add=False, commit=True)
# and return the programme with swapped entries
return self.get_act_programme_as_string()
# ------------------------------------------------------------------------------------------ #
def delete_playlist_entry(self, index):
found = False
for p in self.programme:
if p.programme_index == int(index):
p.delete(True)
self.load_programme_from_db()
found = True
break
if not found:

Gottfried Gaisbauer
committed
self.logger.warning("Nothing to delete")
return self.get_act_programme_as_string()

Gottfried Gaisbauer
committed
# ------------------------------------------------------------------------------------------ #
def insert_playlist_entry(self, fromtime_source):
fromtime = fromtime_source["fromtime"]
source = fromtime_source["source"]
entry = ScheduleEntry()
entry.entry_start = fromtime
entry.source = source
entry.playlist_id = 0
entry.schedule_id = 0
entry.entry_num = ScheduleEntry.select_next_manual_entry_num()
entry.store(add=True, commit=True)

Gottfried Gaisbauer
committed
self.load_programme_from_db()
return self.get_act_programme_as_string()
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def fetch_new_programme(self):
acs = AuraCalendarService(self.config)
queue = acs.get_queue()

Gottfried Gaisbauer
committed
# start fetching thread
acs.start()

Gottfried Gaisbauer
committed
# wait for the end
response = queue.get()

Gottfried Gaisbauer
committed
if response == "fetching_finished":
self.load_programme_from_db()
return self.get_act_programme_as_string()
else:

Gottfried Gaisbauer
committed
self.logger.warning("Got an unknown response from AuraCalendarService: " + response)
# ------------------------------------------------------------------------------------------ #

Gottfried Gaisbauer
committed
def set_next_file_for(self, playlistname):
print(playlistname)
return ""
# ------------------------------------------------------------------------------------------ #
def start_recording(self, data):
"""
Aufnahme starten
"""
result = self.client.recorder_start()

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
if self.__check_result__(result):

Gottfried Gaisbauer
committed
self.success("start_recording", result, "00")
else:

Gottfried Gaisbauer
committed
self.error("start_recording", result, "01")
# ------------------------------------------------------------------------------------------ #
def stop_recording(self, data):
"""
Aufnahme anhalten
"""
result = self.client.recorder_stop()

Gottfried Gaisbauer
committed
if self.__check_result__(result):

Gottfried Gaisbauer
committed
self.success("stop_recording", result, "00")
else:

Gottfried Gaisbauer
committed
self.error("stop_recording", result, "01")
class CallFunctionTimer(threading.Timer):

Gottfried Gaisbauer
committed
logger = None

Gottfried Gaisbauer
committed
entry = None

Gottfried Gaisbauer
committed
debug = False
diff = None

Gottfried Gaisbauer
committed

Gottfried Gaisbauer
committed
def __init__(self, diff, func, param):

Gottfried Gaisbauer
committed
threading.Timer.__init__(self, diff, func, param)

Gottfried Gaisbauer
committed
self.diff = diff

Gottfried Gaisbauer
committed
self.func = func
self.entry = param[0]

Gottfried Gaisbauer
committed
self.logger = logging.getLogger("AuraEngine")

Gottfried Gaisbauer
committed
msg = "MessageTimer starting @ " + str(self.entry.entry_start) + " source '" + str(self.entry.source) + "' In seconds: " + str(self.diff)
self.logger.debug(msg)