Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • dev-old
  • dev-old-david
  • develop
  • lars-tests
  • master
  • master-old
  • topic/filesystem-fallbacks
  • topic/tank_connection
  • topic/tank_connection_david
  • user/equinox/docker
10 results

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Select Git revision
  • 122-synchronized-ci
  • feat-use-docker-main-tag
  • fix-aura-sysuser
  • fix-broken-pipe-153
  • fix-docker-release
  • fix-push-latest-with-tag
  • fix-streamchannel-retries
  • gitlab-templates
  • improve-test-coverage-137
  • improve-test-coverage-143
  • main
  • orm-less-scheduling
  • remove-mailer
  • update-changelog-alpha3
  • virtual-timeslots-131
  • 1.0.0-alpha1
  • 1.0.0-alpha2
  • 1.0.0-alpha3
  • 1.0.0-alpha4
  • 1.0.0-alpha5
20 results
Show changes
Showing
with 4684 additions and 0 deletions
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import time
import redis
import logging
import threading
from datetime import datetime
from threading import Event
from modules.communication.redis.messenger import RedisMessenger
from modules.communication.redis.statestore import RedisStateStore
# from modules.communication.connection_tester import ConnectionTester
from modules.base.exceptions import RedisConnectionException
from modules.base.enum import RedisChannel, FallbackType
from modules.base.utils import TerminalColors
# ------------------------------------------------------------------------------------------ #
class ServerRedisAdapter(threading.Thread, RedisMessenger):
debug = False
pubsub = None
config = None
redisdb = None
channel = ""
scheduler = None
redisclient = None
# connection_tester = None
soundsystem = None
socket = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
threading.Thread.__init__(self)
RedisMessenger.__init__(self, config)
# init
#threading.Thread.__init__ (self)
self.config = config
self.shutdown_event = Event()
self.channel = RedisChannel.STANDARD.value
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
self.can_send = None
self.redisclient = ClientRedisAdapter(config)
# self.connection_tester = ConnectionTester()
# ------------------------------------------------------------------------------------------ #
def run(self):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"))
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(self.channel)
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
# listener loop
for item in self.pubsub.listen():
if item["type"] == "subscribe":
continue
self.logger.debug(TerminalColors.YELLOW.value + "received REDIS message: " + TerminalColors.ENDC.value + str(item))
item["channel"] = self.decode_if_needed(item["channel"])
item["data"] = self.decode_if_needed(item["data"])
try:
self.work(item)
except RedisConnectionException as rce:
self.logger.error(str(rce))
if not self.shutdown_event.is_set():
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
self.pubsub.unsubscribe()
if not self.shutdown_event.is_set():
self.logger.warning("unsubscribed from " + self.channel + " and finished")
# ------------------------------------------------------------------------------------------ #
def decode_if_needed(self, val):
if isinstance(val, bytes):
return val.decode("utf-8")
return val
# ------------------------------------------------------------------------------------------ #
def listen_for_one_message(self, channel, socket_timeout=2):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"), socket_timeout=socket_timeout)
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(channel)
try:
self.logger.debug("I am listening on channel '"+channel+"' for "+str(socket_timeout)+" seconds")
for item in self.pubsub.listen():
it = self.receive_message(item)
if it is not None:
break
except redis.exceptions.TimeoutError as te:
raise te
return item["data"]
# ------------------------------------------------------------------------------------------ #
def receive_message(self, item):
if item["type"] == "subscribe":
self.logger.info("i am subscribed to channel " + item["channel"].decode("utf-8"))
return None
item["channel"] = item["channel"].decode("utf-8")
if isinstance(item["data"], bytes):
item["data"] = item["data"].decode("utf-8")
self.pubsub.unsubscribe()
return item
# ------------------------------------------------------------------------------------------ #
def work(self, item):
if item["data"] == "fetch_new_programme":
#self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.fetch_new_programme)
self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "shutdown":
self.terminate()
elif item["data"] == "init_player":
self.execute(RedisChannel.IP_REPLY.value, self.soundsystem.init_player)
elif item["data"] == "get_act_programme":
self.execute(RedisChannel.GAP_REPLY.value, self.scheduler.get_act_programme_as_string)
# elif item["data"] == "get_connection_status":
# self.execute(RedisChannel.GCS_REPLY.value, self.connection_tester.get_connection_status)
elif item["data"] == "print_message_queue":
self.execute(RedisChannel.PMQ_REPLY.value, self.scheduler.print_message_queue)
elif item["data"].find("set_next_file") >= 0:
playlist = item["data"].split()[1]
playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.SNF_REPLY.value, self.scheduler.set_next_file_for, playlist)
elif item["data"].find("get_next_file") >= 0:
playlist = item["data"].split()[1]
#playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist)
elif item["data"].find("on_play") >= 0:
source = item["data"].split("on_play ")[1]
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.soundsystem.on_play, source)
elif item["data"] == "recreate_db":
self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database)
elif item["data"] == "status":
return True
else:
raise RedisConnectionException("ServerRedisAdapter Cannot understand command: " + item["data"])
# ------------------------------------------------------------------------------------------ #
def execute(self, channel, f, param1=None, param2=None, param3=None):
if param1 != None:
if param2 != None:
if param3 != None:
reply = f(param1, param2, param3)
else:
reply = f(param1, param2)
else:
reply = f(param1)
else:
reply = f()
if reply is None:
reply = ""
# sometimes the sender is faster than the receiver. redis messages would be lost
time.sleep(0.1)
self.logger.debug(TerminalColors.YELLOW.value + "replying REDIS message " + TerminalColors.ENDC.value + reply + TerminalColors.YELLOW.value + " on channel " + channel + TerminalColors.ENDC.value)
# publish
self.redisclient.publish(channel, reply)
# ------------------------------------------------------------------------------------------ #
def join_comm(self):
try:
while self.is_alive():
self.logger.debug(str(datetime.now())+" joining")
self.join()
self.logger.warning("join out")
except (KeyboardInterrupt, SystemExit):
# Dem Server den Shutdown event setzen
# server.shutdown_event.set()
# Der Server wartet auf Eingabe
# Daher einen Client initiieren, der eine Nachricht schickt
self.halt()
sys.exit('Terminated')
# ------------------------------------------------------------------------------------------ #
def halt(self):
"""
Stop the server
"""
if self.shutdown_event.is_set():
return
self.shutdown_event.set()
try:
self.socket.unbind("tcp://"+self.ip+":"+self.port)
except:
pass
# self.socket.close()
# ------------------------------------------------------------------------------------------ #
def send(self, message):
"""
Send a message to the client
:param message: string
"""
# FIXME Review logic
if not self.can_send:
self.logger.debug("sending a "+str(len(message))+" long message via REDIS.")
self.socket.send(message.encode("utf-8"))
self.can_send = False
else:
self.logger.warning("cannot send message via REDIS: "+str(message))
def terminate(self):
"""
Called when thread is stopped or a signal to terminate is received.
"""
self.shutdown_event.set()
self.scheduler.terminate()
self.pubsub.close()
self.logger.info("Shutdown event received. Bye bye ...")
# ------------------------------------------------------------------------------------------ #
class ClientRedisAdapter(RedisMessenger):
def __init__(self, config):
RedisMessenger.__init__(self, config)
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
if type(channel) == RedisChannel:
channel = channel.value
self.rstore.publish(channel, message)
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
import time
import logging
import datetime
from modules.communication.redis.statestore import RedisStateStore
from modules.communication.mail import AuraMailer
from modules.base.exceptions import PlaylistException
from modules.base.enum import RedisChannel
from modules.base.logger import AuraLogger
"""
Send and receive redis messages
"""
# ------------------------------------------------------------------------------------------ #
class RedisMessenger():
logger = None
rstore = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
super(RedisMessenger, self).__init__()
"""
Constructor
"""
self.logger = logging.getLogger("AuraEngine")
self.channel = RedisChannel.STANDARD
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Einen "Kanal" setzen - zb scheduling
@type channel: string
@param channel: Kanal/Name der Komponente
"""
self.channel = channel
if channel in self.components:
self.errnr = self.components[channel]
self.rstore.set_channel(channel)
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
@type section: string
@param section: Gültigkeitsbereich
"""
self.section = section
# # ------------------------------------------------------------------------------------------ #
# def set_mail_addresses(self, fromMail, adminMails):
# """
# Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
# @type section: string
# @param section: Gültigkeitsbereich
# """
# self.fromMail = fromMail
# self.adminMails = adminMails
# # ------------------------------------------------------------------------------------------ #
# def send(self, message, code, level, job, value='', section=''):
# """
# Eine Message senden
# @type message: string
# @param message: menschenverständliche Nachricht
# @type code: string
# @param code: Fehlercode - endet mit 00 bei Erfolg
# @type level: string
# @param level: Error-Level - info, warning, error, fatal
# @type job: string
# @param job: Name der ausgeführten Funktion
# @type value: string
# @param value: Ein Wert
# @type section: string
# @param section: Globale Sektion überschreiben
# """
# section = self.section if section == '' else section
# self.time = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
# self.utime = time.time()
# state = {'message':message.strip().replace("'","\\'"), 'code':self.errnr + str(code),'job':job,'value':value}
# self.rstore.set_section(section)
# self.rstore.store(level, state)
# if level == 'info' or level == 'success':
# self.logger.info(message)
# elif level == 'warning':
# self.logger.warning(message)
# elif level == 'error':
# self.logger.error(message)
# self.send_admin_mail(level, message, state)
# elif level == 'fatal':
# self.logger.critical(message)
# self.send_admin_mail(level, message, state)
# # ------------------------------------------------------------------------------------------ #
# def say_alive(self):
# """
# Soll alle 20 Sekunden von den Komponenten ausgeführt werden,
# um zu melden, dass sie am Leben sind
# """
# self.rstore.set_alive_state()
# # ------------------------------------------------------------------------------------------ #
# def get_alive_state(self, channel):
# """
# Live State abfragen
# @type channel: string
# @param channel: Channel/Komponente
# """
# return self.rstore.get_alive_state(channel)
# # ------------------------------------------------------------------------------------------ #
# def set_state(self, name, value, expires=None, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: Name des state
# @type value: string
# @param value: Wert
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.set_state(name, value, expires, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_add_event(self, name, eventtime, value, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: der Name des Events
# @type eventtime: string|datetime.datetime
# @param eventtime: Datum und Zeit des events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# if type(eventtime) == type(str()):
# eventtime_str = datetime.datetime.strptime(eventtime[0:16].replace(' ','T'), "%Y-%m-%dT%H:%M").strftime("%Y-%m-%dT%H:%M")
# elif type(eventtime) is datetime.datetime:
# eventtime_str = eventtime.strftime("%Y-%m-%dT%H:%M")
# else:
# raise TypeError('eventtime must be a datetime.date or a string, not a %s' % type(eventtime))
# self.rstore.queue_add_event(eventtime_str, name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_remove_events(self, name, channel=None):
# """
# Löscht Events
# @type name: string
# @param name: der Name des Events
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.queue_remove_events(name, channel)
# # ------------------------------------------------------------------------------------------ #
# def fire_event(self, name, value, channel=None):
# """
# Feuert einen Event
# @type name: string
# @param name: der Name des Events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.fire_event(name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def get_event_queue(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# queue = self.rstore.get_event_queue(name, channel)
# return queue
# # ------------------------------------------------------------------------------------------ #
# def get_events(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# events = self.rstore.get_events(name, channel)
# return events
# # ------------------------------------------------------------------------------------------ #
# def get_event(self, name=None, channel=None):
# """
# Holt event eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: dict
# @return: Event
# """
# events = self.rstore.get_events(name, channel)
# result = False
# if events:
# result = events.pop(0)
# return result
# # ------------------------------------------------------------------------------------------ #
# def send_admin_mail(self, level, message, state):
# """
# Sendent mail an Admin(s),
# @type message: string
# @param message: Die Message
# @type state: dict
# @param state: Der State
# @return result
# """
# # FIXME Make Mailer functional: Invalid constructor
# if self.fromMail and self.adminMails:
# #subject = "Possible comba problem on job " + state['job'] + " - " + level
# mailmessage = "Hi Admin,\n comba reports a possible problem\n\n"
# mailmessage = mailmessage + level + "!\n"
# mailmessage = mailmessage + message + "\n\n"
# mailmessage = mailmessage + "Additional information:\n"
# mailmessage = mailmessage + "##################################################\n"
# mailmessage = mailmessage + "Job:\t" + state['job'] + "\n"
# mailmessage = mailmessage + "Code:\t" + state['code'] + "\n"
# mailmessage = mailmessage + "Value:\t" + str(state['value']) + "\n"
# #mailer = AuraMailer(self.adminMails, self.fromMail)
# #mailer.send_admin_mail(subject, mailmessage)
# else:
# return False
# # ------------------------------------------------------------------------------------------ #
# def receive(self):
# """
# Bisher wird nichts empfangen
# """
# return ""
# # ------------------------------------------------------------------------------------------ #
# def get_next_file_for(self, playlisttype):
# next = self.rstore.db.get('next_'+playlisttype+'file')
# if next is None:
# next = b""
# return next.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def on_play(self, info):
# result = self.rstore.db.get('on_play')
# if result is None:
# result = b""
# return result.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def set_next_file_for(self, playlisttype, file):
# self.rstore.db.set("next_" + playlisttype + "file", file)
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
import redis
import time
import datetime
import json
import re
import uuid
class RedisStateStore(object):
"""Store and get Reports from redis"""
def __init__(self, config, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
self.channel = '*'
self.section = '*'
self.separator = '_'
self.daily = False
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Kanal setzen
@type channel: string
@param channel: Kanal
"""
self.channel = channel
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Sektion setzen
@type section: string
@param section: Sektion
"""
self.section = section
# ------------------------------------------------------------------------------------------ #
def set_alive_state(self):
"""
Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
"""
self.set_state('alive', 'Hi', 21)
# ------------------------------------------------------------------------------------------ #
def get_alive_state(self, channel):
"""
Alive Status eines Channels ermitteln
@type channel: string
@param channel: der Channel
@rtype: string/None
@return: Ein String, oder None, bei negativem Ergebnis
"""
return self.get_state('alive', channel)
# ------------------------------------------------------------------------------------------ #
def set_state(self, name, value, expires=None, channel=None):
"""
Setzt einen Status
@type name: string
@param name: Name des state
@type value: string
@param value: Wert
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
key = self.__create_key__(channel + 'State', name)
if value == "":
self.db.delete(key)
else:
# publish on channel
message = json.dumps({'eventname':name, 'value': value})
self.db.publish(channel + 'Publish', message)
# store in database
self.db.set(key, value)
if(expires):
self.db.expire(key, 21)
# ------------------------------------------------------------------------------------------ #
def get_state(self, name, channel):
"""
Holt einen Status
@type name: string
@param name: Name des state
@type channel: string
@param channel: Kanal (optional)
"""
key = self.__create_key__(channel + 'State', name)
return self.db.get(key)
# ------------------------------------------------------------------------------------------ #
def queue_add_event(self, eventtime, name, value, channel=None):
"""
Kündigt einen Event an
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)
# ------------------------------------------------------------------------------------------ #
def queue_remove_events(self, name=None, channel=None):
"""
Löscht Events
@type name: string
@param name: Name des Events
@type channel: string
@param channel: Kanal (optional)
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
for delkey in keys:
self.db.delete(delkey)
# ------------------------------------------------------------------------------------------ #
def fire_event(self, name, value, channel=None):
"""
Feuert einen Event
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)
# ------------------------------------------------------------------------------------------ #
def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
"""
Feuert einen Event
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
key = self.__create_key__(channel + type, eventtime, name)
value['starts'] = eventtime[0:16]
value['eventchannel'] = channel
value['eventname'] = name
self.db.hset(key, namespace, value)
self.db.expire(key, expire)
# ------------------------------------------------------------------------------------------ #
def get_event_queue(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'evqueue')
return entries
# ------------------------------------------------------------------------------------------ #
def get_events(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Event_' if channel else '*Event_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'events')
return entries
# ------------------------------------------------------------------------------------------ #
def get_next_event(self, name=None, channel=None):
"""
Holt den aktuellsten Event
@type channel: string
@param channel: Kanal (optional)
@rtype: dict/boolean
@return: ein Event oder False
"""
events = self.get_event_queue(name, channel)
if len(events) > 0:
result = events.pop(0)
else:
result = False
return result
# ------------------------------------------------------------------------------------------ #
def store(self, level, value):
"""
Hash speichern
@type level: string
@param level: der errorlevel
@type value: dict
@param value: Werte als dict
"""
microtime = str(time.time())
value['microtime'] = microtime
value['level'] = level
key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
self.db.hset(key, self.channel, value)
self.db.expire(key, 864000)
# ------------------------------------------------------------------------------------------ #
def __get_keys__(self, level ='*'):
"""
Redis-Keys nach Suchkriterium ermitteln
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Die Keys auf die das Suchkriterium zutrifft
"""
key = self.__create_key__(self.channel, self.section, level)
microtime = str(time.time())
search = microtime[0:4] + '*' if self.daily else '*'
return self.db.keys(key + self.separator + '*')
# ------------------------------------------------------------------------------------------ #
def __create_key__(self, *args):
"""
Key erschaffen - beliebig viele Argumente
@rtype: string
@return: Der key
"""
return self.separator.join(args)
def get_entries(self, level ='*'):
"""
Liste von Hashs nach Suchkriterium erhalten
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Redis Hashs
"""
def tsort(x,y):
if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
return 1
elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
return -1
else:
return 0
keys = self.__get_keys__(level)
keys.sort(tsort)
entries = self.__get_entries__(keys, self.channel)
entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
return entries
# ------------------------------------------------------------------------------------------ #
def __get_entries__(self, keys, channel):
entries = []
for key in keys:
entry = self.db.hget(key,channel)
entry = json.dumps(entry.decode('utf-8'))
if not (entry is None):
try:
entry = entry.decode('utf-8').replace('None','"None"')
entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
entry = json.loads(entry)
entry['key'] = key
entries.append(entry)
except:
pass
return entries
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)
if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
raise Exception("No subscriber! Is Aura daemon running?")
self.db.publish(channel, message)
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
import time
import logging
import json
from modules.communication.liquidsoap.playerclient import LiquidSoapPlayerClient
# from modules.communication.liquidsoap.recorderclient import LiquidSoapRecorderClient
from modules.core.startup import StartupThread
from modules.core.state import PlayerStateService
from modules.communication.mail import AuraMailer
from modules.base.enum import ChannelType, Channel, TransitionType
from modules.base.utils import TerminalColors, SimpleUtil
from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException
class SoundSystem():
"""
SoundSystem Class
Uses LiquidSoapClient, but introduces more complex commands, transactions and error handling.
"""
client = None
logger = None
transaction = 0
channels = None
scheduler = None
#error_data = None #FIXME Can be removed
auramailer = None
is_liquidsoap_running = False
connection_attempts = 0
disable_logging = False
fade_in_active = False
fade_out_active = False
# Active Channel & Entry Handling
active_channel_type = None
active_channel = None
player_state = None
# active_entries = None
def __init__(self, config):
"""
Initializes the communicator by establishing a Socket connection
to Liquidsoap.
Args:
config (AuraConfig): The configuration
"""
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.client = LiquidSoapPlayerClient(config, "engine.sock")
# self.lqcr = LiquidSoapRecorderClient(config, "record.sock")
self.auramailer = AuraMailer(self.config)
self.is_active()
# Initialize Default Channels
self.active_channel = {
ChannelType.FILESYSTEM: Channel.FILESYSTEM_A,
ChannelType.STREAM: Channel.STREAM_A,
ChannelType.LIVE: Channel.LIVE_0
}
# self.active_entries = {}
self.player_state = PlayerStateService(config)
def start(self):
"""
Starts the soundsystem.
"""
# Sleep needed, because the socket is created too slowly by Liquidsoap
time.sleep(1)
self.enable_transaction()
time.sleep(1)
self.mixer_start()
# Setting init params like a blank file
install_dir = self.config.get("install_dir")
channel = self.active_channel[ChannelType.FILESYSTEM]
self.playlist_push(channel, install_dir + "/configuration/blank.flac")
self.disable_transaction()
self.is_liquidsoap_running = True
self.logger.info(SimpleUtil.green("Engine Core ------[ connected ]-------- Liquidsoap"))
def is_ready(self):
"""
Returns `True` if the soundsystem is connected to Liquidsoap and is ready to be used.
"""
return self.is_liquidsoap_running
#
# MIXER : GENERAL
#
def mixer_start(self):
# Reset channels and reload them
channels = self.reload_channels()
# For all available channels
for c in channels:
# Set volume to zero
self.channel_volume(c, "0")
# And activate this channel
self.channel_activate(c, True)
# ------------------------------------------------------------------------------------------ #
# def set_volume(self, mixernumber, volume):
# #return self.client.command("mixer", 'volume', mixernumber, str(volume))
# return self.__send_lqc_command__(self.client, "mixer", "volume", mixernumber, volume)
# ------------------------------------------------------------------------------------------ #
def get_active_mixer(self):
"""
get active mixer in liquidsoap server
:return:
"""
activeinputs = []
# enable more control over the connection
self.enable_transaction()
inputs = self.get_all_channels()
cnt = 0
for input in inputs:
status = self.__get_mixer_status__(cnt)
if "selected=true" in status:
activeinputs.append(input)
cnt = cnt + 1
self.disable_transaction()
return activeinputs
# ------------------------------------------------------------------------------------------ #
def get_mixer_status(self):
inputstate = {}
self.enable_transaction()
inputs = self.get_all_channels()
cnt = 0
for input in inputs:
inputstate[input] = self.__get_mixer_status__(cnt)
cnt = cnt + 1
self.disable_transaction()
return inputstate
# ------------------------------------------------------------------------------------------ #
def get_mixer_volume(self, channel):
return False
# ------------------------------------------------------------------------------------------ #
def __get_mixer_status__(self, mixernumber):
return self.__send_lqc_command__(self.client, "mixer", "status", mixernumber)
#
# MIXER : CHANNELS
#
# FIXME Currently not used, except for test class
# def get_active_channel(self):
# """
# Retrieves the active channel from programme.
# Returns:
# (String): The channel type, empty string if no channel is active.
# """
# active_entry = self.scheduler.get_active_entry()
# if active_entry is None:
# return ""
# return active_entry.channel
# ------------------------------------------------------------------------------------------ #
def play(self, entry, transition):
"""
Plays a new `Entry`. In case of a new schedule (or some intented, immediate transition),
a clean channel is selected and transitions between old and new channel is performed.
Args:
entry (PlaylistEntry): The audio source to be played
transition (TransitionType): The type of transition to use e.g. fade-out.
queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so;
otherwise a new channel of the same type is activated
Raises:
(LQConnectionError): In case connecting to LiquidSoap isn't possible
"""
try:
self.enable_transaction()
# channel = self.active_channel[entry.type]
# prev_channel = channel
# already_active = False
#FIXME
# queue=False
# if self.active_channel_type == entry.type:
# msg = SimpleUtil.pink("Channel type %s already active!" % str(entry.type))
# self.logger.info(msg)
# already_active = True
self.player_state.set_active_entry(entry)
entry.channel = self.channel_swap(entry.type)
# entry.channel = channel
# PLAYLIST
if entry.type == ChannelType.FILESYSTEM:
# if not queue:
self.playlist_push(entry.channel, entry.filename)
# STREAM
elif entry.type == ChannelType.STREAM:
self.set_http_url(entry.channel, entry.source)
self.http_start_stop(entry.channel, True)
# LIVE
else:
# TODO Select correct LINE-OUT channels as per entry
pass
# if not already_active:
# self.channel_transition(prev_channel, channel, entry.volume, 0)
# Assign selected channel
# Move channel volume all the way up
if transition == TransitionType.FADE:
self.fade_in(entry)
else:
self.channel_volume(entry.channel, entry.volume)
# Update active channel and type
#self.active_channel_type = entry.type
self.active_channel[entry.type] = entry.channel
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__,
# but we do not want to execute this function further and pass the exception
pass
def on_play(self, source):
"""
Event Handler which is called by soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing.
"""
self.logger.info(SimpleUtil.pink("Source '%s' started playing" % source))
try:
self.player_state.store_trackservice_entry(source)
except NoActiveEntryException:
self.logger.warn(SimpleUtil.red("Currently there's nothing playing!"))
def stop(self, entry, transition):
"""
Stops the currently playing entry.
Args:
entry (Entry): The entry to stop playing
transition (TransitionType): The type of transition to use e.g. fade-out.
"""
try:
self.enable_transaction()
if not entry.channel:
self.logger.warn("Trying to stop entry %s, but it has no channel assigned" % entry)
return
if transition == TransitionType.FADE:
self.fade_out(entry)
else:
self.channel_volume(entry.channel, 0)
# self.playlist_clear(entry.channel)
self.logger.info(SimpleUtil.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry)))
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__,
# but we do not want to execute this function further and pass the exception
pass
# def channel_transition(self, source_channel, target_channel, target_volume=100, transition_type=0):
# # Default: target_channel = 100% volume, source_channel = 0% volume
# if transition_type == 0:
# # Set volume of channel
# self.channel_volume(target_channel, target_volume)
# # Mute source channel
# if target_channel != source_channel:
# self.channel_volume(source_channel, 0)
# # Set other channels to zero volume
# # others = self.all_inputs_but(target_channel)
# # self.logger.info("Setting Volume=0 for channels: %s" % str(others))
# # for o in others:
# # self.channel_volume(o, 0)
def channel_swap(self, channel_type):
active_channel = self.active_channel[channel_type]
channel = None
msg = None
if channel_type == ChannelType.FILESYSTEM:
if active_channel == Channel.FILESYSTEM_A:
channel = Channel.FILESYSTEM_B
msg = "Swapped filesystem channel from A > B"
else:
channel = Channel.FILESYSTEM_A
msg = "Swapped filesystem channel from B > A"
elif channel_type == ChannelType.STREAM:
if active_channel == Channel.STREAM_A:
channel = Channel.STREAM_B
msg = "Swapped stream channel from A > B"
else:
channel = Channel.STREAM_A
msg = "Swapped stream channel from B > A"
if msg: self.logger.info(SimpleUtil.pink(msg))
# self.active_channel[channel_type] = channel
return channel
# ------------------------------------------------------------------------------------------ #
def all_inputs_but(self, input_type):
try:
activemixer_copy = self.get_all_channels().copy()
activemixer_copy.remove(input_type)
except ValueError as e:
self.logger.error("Requested channel (" + input_type + ") not in channellist. Reason: " + str(e))
except AttributeError:
self.logger.critical("Channellist is None")
return activemixer_copy
# ------------------------------------------------------------------------------------------ #
def get_all_channels(self):
if self.channels is None or len(self.channels) == 0:
self.channels = self.__send_lqc_command__(self.client, "mixer", "inputs")
return self.channels
# ------------------------------------------------------------------------------------------ #
def reload_channels(self):
self.channels = None
return self.get_all_channels()
# ------------------------------------------------------------------------------------------ #
def channel_activate(self, channel, activate):
channels = self.get_all_channels()
try:
index = channels.index(channel)
if len(channel) < 1:
self.logger.critical("Cannot activate channel. There are no channels!")
else:
message = self.__send_lqc_command__(self.client, "mixer", "select", index, activate)
return message
except Exception as e:
self.logger.critical("Ran into exception when activating channel. Reason: " + str(e))
# ------------------------------------------------------------------------------------------ #
def channel_volume(self, channel, volume):
"""
Set volume of a channel
Args:
channel (Channel): The channel
volume (Integer) Volume between 0 and 100
"""
channel = str(channel)
try:
if str(volume) == "100":
channels = self.get_all_channels()
index = channels.index(channel)
else:
channels = self.get_all_channels()
index = channels.index(channel)
except ValueError as e:
msg = SimpleUtil.red("Cannot set volume of channel " + channel + " to " + str(volume) + "!. Reason: " + str(e))
self.logger.error(msg)
self.logger.info("Available channels: %s" % str(channels))
return
try:
if len(channel) < 1:
msg = SimpleUtil.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.")
self.logger.warning(msg)
else:
message = self.__send_lqc_command__(self.client, "mixer", "volume", str(index), str(int(volume)))
if not self.disable_logging:
if message.find('volume=' + str(volume) + '%'):
self.logger.info(SimpleUtil.pink("Set volume of channel '%s' to %s" % (channel, str(volume))))
else:
msg = SimpleUtil.red("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message)
self.logger.warning(msg)
return message
except AttributeError as e: #(LQConnectionError, AttributeError):
self.disable_transaction(force=True)
msg = SimpleUtil.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e))
self.logger.error(msg)
#
# Channel Type - Stream
#
def stream_start(self, url):
try:
self.enable_transaction()
self.__send_lqc_command__(self.client, "http", "url", url)
self.__send_lqc_command__(self.client, "http", "start")
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
pass
def stream_stop(self, url):
try:
self.enable_transaction()
self.__send_lqc_command__(self.client, "http", "start")
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
pass
def http_start_stop(self, start):
if start:
cmd = "start"
else:
cmd = "stop"
try:
self.enable_transaction()
self.__send_lqc_command__(self.client, "http", cmd)
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
pass
# ------------------------------------------------------------------------------------------ #
def set_http_url(self, uri):
return self.__send_lqc_command__(self.client, "http", "url", uri)
#
# Channel Type - Playlist
#
# FIXME
# def playlist_activate(self, playlist, cue_in=0.0):
# """
# Activates a new Playlist.
# Args:
# new_entry (Playlist): The playlist to be played
# cue_in (Float): Start/cue-time of track (For some reason Liquidsoap doesn't acknowledge this yet)
# Raises:
# (LQConnectionError): In case connecting to LiquidSoap isn't possible
# """
# # Grab the actual active entry
# # active_entry = self.scheduler.get_active_entry()
# # Set default channel, if no previous track is available
# current_channel = self.active_channel[ChannelType.FILESYSTEM]
# # if active_entry:
# # current_channel = active_entry.channel
# try:
# # FIXME clearing creates some serious timing issues
# # To activate this feature we'd need some more sophisticated
# # Liquidsoap logic, such as >= 2 filesystem channels and
# # possiblities to pause pre-queued channels or cleaning them
# # after each completed schedule.
# # self.enable_transaction()
# # #if active_entry:
# # #self.fade_out(active_entry)
# # res = self.playlist_clear(current_channel)
# # self.logger.info("Clear Queue Response: "+res)
# # self.disable_transaction()
# self.enable_transaction()
# self.reload_channels()
# # self.fade_in(playlist.entries[0])
# # FIXME rework
# for new_entry in playlist.entries:
# if current_channel == new_entry.channel:
# self.activate_same_channel(new_entry, cue_in)
# else:
# self.activate_different_channel(new_entry, cue_in, current_channel)
# current_channel = new_entry.channel
# self.disable_transaction()
# # self.logger.critical("FIXME: Implement TrackService")
# #self.scheduler.update_track_service(new_entry)
# except LQConnectionError:
# # we already caught and handled this error in __send_lqc_command__,
# # but we do not want to execute this function further and pass the exception
# pass
def playlist_push(self, channel, uri):
"""
Adds an filesystem URI to the given `ChannelType.FILESYSTEM` channel.
Args:
uri (String): The URI of the file
Returns:
LiquidSoap Response
"""
if channel not in ChannelType.FILESYSTEM.channels:
raise InvalidChannelException
self.logger.info(SimpleUtil.pink("playlist.push('%s', '%s'" % (channel, uri)))
return self.__send_lqc_command__(self.client, channel, "playlist_push", uri)
def playlist_seek(self, channel, seconds_to_seek):
"""
Forwards the player of the given `ChannelType.FILESYSTEM` channel by (n) seconds.
Args:
seconds_to_seeks (Float): The seconds to skip
"""
if channel not in ChannelType.FILESYSTEM.channels:
raise InvalidChannelException
return self.__send_lqc_command__(self.client, channel, "playlist_seek", str(seconds_to_seek))
def playlist_clear(self, channel):
"""
Removes all tracks currently queued in the given `ChannelType.FILESYSTEM` channel.
"""
if channel not in ChannelType.FILESYSTEM.channels:
raise InvalidChannelException
self.logger.info(SimpleUtil.pink("Clearing filesystem queue '%s'!" % channel))
return self.__send_lqc_command__(self.client, channel, "playlist_clear")
#
# Fading
#
def fade_in(self, entry):
"""
Performs a fade-in for the given `entry` to the `entry.volume` loudness
at channel `entry.channel`.
"""
try:
fade_in_time = float(self.config.get("fade_in_time"))
if fade_in_time > 0:
self.fade_in_active = True
target_volume = entry.volume
step = fade_in_time / target_volume
msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \
(entry.channel, str(step), str(target_volume))
self.logger.info(SimpleUtil.pink(msg))
# Enable logging, which might have been disabled in a previous fade-out
self.disable_logging = True
self.client.disable_logging = True
for i in range(target_volume):
self.channel_volume(entry.channel.value, i + 1)
time.sleep(step)
msg = "Finished with fading-in '%s'." % entry.channel
self.logger.info(SimpleUtil.pink(msg))
self.fade_in_active = False
if not self.fade_out_active:
self.disable_logging = False
self.client.disable_logging = False
except LQConnectionError as e:
self.logger.critical(str(e))
return True
def fade_out(self, entry):
"""
Performs a fade-out for the given `entry` at channel `entry.channel`.
"""
try:
fade_out_time = float(self.config.get("fade_out_time"))
if fade_out_time > 0:
step = abs(fade_out_time) / entry.volume
msg = "Starting to fading-out '%s'. Step is %ss." % (entry.channel, str(step))
self.logger.info(SimpleUtil.pink(msg))
# Disable logging... it is going to be enabled again after fadein and -out is finished
self.disable_logging = True
self.client.disable_logging = True
for i in range(entry.volume):
self.channel_volume(entry.channel.value, entry.volume-i-1)
time.sleep(step)
msg = "Finished with fading-out '%s'" % entry.channel
self.logger.info(SimpleUtil.pink(msg))
# Enable logging again
self.fade_out_active = False
if not self.fade_in_active:
self.disable_logging = False
self.client.disable_logging = False
except LQConnectionError as e:
self.logger.critical(str(e))
return True
#
# Recording
#
# ------------------------------------------------------------------------------------------ #
def recorder_stop(self):
self.enable_transaction()
for i in range(5):
if self.config.get("rec_" + str(i)) == "y":
self.__send_lqc_command__(self.client, "recorder_" + str(i), "stop")
self.disable_transaction()
# ------------------------------------------------------------------------------------------ #
def recorder_start(self, num=-1):
if not self.is_liquidsoap_running:
if num==-1:
msg = "Want to start recorder, but LiquidSoap is not running"
else:
msg = "Want to start recorder " + str(num) + ", but LiquidSoap is not running"
self.logger.warning(msg)
return False
self.enable_transaction()
if num == -1:
self.recorder_start_all()
else:
self.recorder_start_one(num)
self.disable_transaction()
# ------------------------------------------------------------------------------------------ #
def recorder_start_all(self):
if not self.is_liquidsoap_running:
self.logger.warning("Want to start all recorder, but LiquidSoap is not running")
return False
self.enable_transaction()
for i in range(5):
self.recorder_start_one(i)
self.disable_transaction()
# ------------------------------------------------------------------------------------------ #
def recorder_start_one(self, num):
if not self.is_liquidsoap_running:
return False
if self.config.get("rec_" + str(num)) == "y":
returnvalue = self.__send_lqc_command__(self.client, "recorder", str(num), "status")
if returnvalue == "off":
self.__send_lqc_command__(self.client, "recorder", str(num), "start")
# ------------------------------------------------------------------------------------------ #
def get_recorder_status(self):
self.enable_transaction(self.client)
recorder_state = self.__send_lqc_command__(self.client, "record", "status")
self.disable_transaction(self.client)
return recorder_state
#
# Basic Methods
#
def init_player(self):
"""
Initializes the LiquidSoap Player after startup of the engine.
Returns:
(String): Message that the player is started.
"""
t = StartupThread(self)
t.start()
return "Engine Core startup done!"
# ------------------------------------------------------------------------------------------ #
def __send_lqc_command__(self, lqs_instance, namespace, command, *args):
"""
Ein Kommando an Liquidsoap senden
@type lqs_instance: object
@param lqs_instance: Instance of LiquidSoap Client
@type namespace: string
@param namespace: Namespace of function
@type command: string
@param command: Function name
@type args: list
@param args: List of parameters
@rtype: string
@return: Response from LiquidSoap
"""
try:
if not self.disable_logging:
if namespace == "recorder":
self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args))
else:
if command == "":
self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + str(args))
else:
self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args))
# call wanted function ...
# FIXME REFACTOR all calls in a common way
if command in ["playlist_push", "playlist_seek", "playlist_clear"]:
func = getattr(lqs_instance, command)
result = func(str(namespace), *args)
else:
func = getattr(lqs_instance, namespace)
result = func(command, *args)
if not self.disable_logging:
self.logger.debug("LiquidSoapCommunicator got response " + str(result))
self.connection_attempts = 0
return result
except LQConnectionError as e:
self.logger.error("Connection Error when sending " + str(namespace) + "." + str(command) + str(args))
if self.try_to_reconnect():
time.sleep(0.2)
self.connection_attempts += 1
if self.connection_attempts < 5:
# reconnect
self.__open_conn(self.client)
self.logger.info("Trying to resend " + str(namespace) + "." + str(command) + str(args))
# grab return value
retval = self.__send_lqc_command__(lqs_instance, namespace, command, *args)
# disconnect
self.__close_conn(self.client)
# return the val
return retval
else:
if command == "":
msg = "Rethrowing Exception while trying to send " + str(namespace) + str(args)
else:
msg = "Rethrowing Exception while trying to send " + str(namespace) + "." + str(command) + str(args)
self.logger.info(msg)
self.disable_transaction(socket=self.client, force=True)
raise e
else:
# also store when was last admin mail sent with which content...
# FIXME implement admin mail sending
self.logger.critical("SEND ADMIN MAIL AT THIS POINT")
raise e
def is_active(self):
"""
Checks if Liquidsoap is running
"""
try:
self.uptime()
self.is_liquidsoap_running = True
except LQConnectionError as e:
self.logger.info("Liquidsoap is not running so far")
self.is_liquidsoap_running = False
except Exception as e:
self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e))
self.is_liquidsoap_running = False
return self.is_liquidsoap_running
def engine_state(self):
"""
Retrieves the state of all inputs and outputs.
"""
state = self.__send_lqc_command__(self.client, "engine", "state")
return state
def liquidsoap_help(self):
"""
Retrieves the Liquidsoap help.
"""
data = self.__send_lqc_command__(self.client, "help", "")
if not data:
self.logger.warning("Could not get Liquidsoap's help")
else:
self.logger.debug("Got Liquidsoap's help")
return data
def version(self):
"""
Get the version of Liquidsoap.
"""
data = self.__send_lqc_command__(self.client, "version", "")
self.logger.debug("Got Liquidsoap's version")
return data
def uptime(self):
"""
Retrieves the uptime of Liquidsoap.
"""
data = self.__send_lqc_command__(self.client, "uptime", "")
self.logger.debug("Got Liquidsoap's uptime")
return data
#
# Connection and Transaction Handling
#
# ------------------------------------------------------------------------------------------ #
def try_to_reconnect(self):
self.enable_transaction()
return self.transaction > 0
# ------------------------------------------------------------------------------------------ #
def enable_transaction(self, socket=None):
# set socket to playout if nothing else is given
if socket is None:
socket = self.client
self.transaction = self.transaction + 1
self.logger.debug(TerminalColors.WARNING.value + "Enabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)
if self.transaction > 1:
return
try:
self.__open_conn(socket)
except FileNotFoundError:
self.disable_transaction(socket=socket, force=True)
msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?"
self.logger.critical(TerminalColors.RED.value + msg + TerminalColors.ENDC.value)
self.auramailer.send_admin_mail("CRITICAL Exception when connecting to Liquidsoap", msg)
# ------------------------------------------------------------------------------------------ #
def disable_transaction(self, socket=None, force=False):
if not force:
# nothing to disable
if self.transaction == 0:
return
# decrease transaction counter
self.transaction = self.transaction - 1
# debug msg
self.logger.debug(TerminalColors.WARNING.value + "DISabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)
# return if connection is still needed
if self.transaction > 0:
return
else:
self.logger.debug(TerminalColors.WARNING.value + "Forcefully DISabling transaction! " + TerminalColors.ENDC.value)
# close conn and set transactioncounter to 0
self.__close_conn(socket)
self.transaction = 0
# ------------------------------------------------------------------------------------------ #
def __open_conn(self, socket):
# already connected
if self.transaction > 1:
return
self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value)
# try to connect
socket.connect()
# ------------------------------------------------------------------------------------------ #
def __close_conn(self, socket):
# set socket to playout
if socket is None:
socket = self.client
# do not disconnect if a transaction is going on
if self.transaction > 0:
return
# say bye
socket.byebye()
# debug msg
self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value)
#
# Aura Engine
#
# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import urllib
import logging
import json
import os.path
import threading
from os import path
from enum import Enum
from socket import socket, AF_INET, SOCK_DGRAM
from time import time, ctime, sleep
import meta
from modules.communication.redis.adapter import ClientRedisAdapter
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
class Monitoring:
"""
Engine Monitoring is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
logger = None
soundsystem = None
status = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, config, soundsystem):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.status = dict()
self.status["engine"] = dict()
self.status["soundsystem"] = dict()
self.status["api"] = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
self.status["api"]["engine"] = dict()
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = config.get("heartbeat_server")
self.heartbeat_port = config.get("heartbeat_port")
self.heartbeat_frequency = config.get("heartbeat_frequency")
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieves the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Checks if the current status is valid to run engine. By default it
does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the
`update_vital` parameter.
Args:
update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
"""
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
if self.status["soundsystem"]["active"] \
and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis_ready"] \
and self.status["audio_store"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value
return True
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return False
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return False
#
# PRIVATE METHODS
#
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status()
#self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["api"]["steering"]["url"] = self.config.get("api_steering_status")
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["url"] = self.config.get("api_tank_status")
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url"))
self.update_vitality_status()
def update_vitality_status(self):
"""
Refreshes the vital status info which are required for the engine to survive.
"""
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["redis_ready"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
# After first update start the Heartbeat Monitior
if not self.heartbeat_running:
self.heartbeat_running = True
if self.config.get("heartbeat_frequency") > 0:
self.heartbeat()
def heartbeat(self):
"""
Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
as heartbeat is sent to the configured server.
"""
if self.has_valid_status(True):
self.heartbeat_socket.sendto(b"OK", (self.heartbeat_server, self.heartbeat_port))
threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
def get_io_state(self):
"""
Retrieves all input and outputs provided by the engine.
"""
ios = self.soundsystem.engine_state()
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
return status
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
return json.loads(data, strict=False)
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
return MonitorResponseCode.INVALID_STATE.value
#
# Aura Engine
#
# Copyright (C) 2017-2020
# David Trattnig <david.trattnig@subsquare.at>
# Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import logging
import datetime
import threading
import meta
import json
from modules.base.exceptions import NoActiveScheduleException, EngineMalfunctionException
from modules.base.enum import Channel, ChannelType
from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil
from modules.core.monitor import Monitoring
class StartupThread(threading.Thread):
"""
StartupThread class.
Boots the mixer and starts playing the current schedule.
"""
logger = None
active_entry = None
soundsystem = None
scheduler = None
monitoring = None
def __init__(self, soundsystem):
"""
Initialize the thread.
"""
threading.Thread.__init__(self)
self.logger = logging.getLogger("AuraEngine")
self.soundsystem = soundsystem
self.scheduler = soundsystem.scheduler
self.monitoring = Monitoring(soundsystem.config, soundsystem)
def run(self):
"""
Boots the soundsystem.
"""
try:
self.soundsystem.start()
is_valid = self.monitoring.has_valid_status(False)
status = self.monitoring.get_status()
self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
if not is_valid:
self.logger.info("Engine Status: " + SimpleUtil.red(status["engine"]["status"]))
raise EngineMalfunctionException
else:
self.logger.info("Engine Status: " + SimpleUtil.green("OK"))
self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__))
self.scheduler.on_ready()
except NoActiveScheduleException as e:
self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.")
except Exception as e:
self.logger.error(SimpleUtil.red("Error while initializing the sound-system: " + str(e)))
#
# Aura Engine
#
# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from collections import deque
from modules.base.exceptions import NoActiveEntryException
from modules.base.utils import SimpleUtil
from modules.database.model import SingleEntry, SingleEntryMetaData, PlaylistEntry, PlaylistEntryMetaData, TrackService
class PlayerStateService:
"""
PlayerStateService keeps a short history of currently playing entries. It stores the recent
active entries to a local cache `entry_history` being able to manage concurrently playing entries.
It also is in charge of storing relevant meta information of the currently playing entry to
the TrackService table.
"""
config = None
logger = None
entry_history = None
def __init__(self, config):
"""
Constructor
Args:
config (AuraConfig): Holds the engine configuration
"""
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.entry_history = deque([None, None, None])
#
# PUBLIC METHODS
#
def set_active_entry(self, entry):
"""
Saves the currently playing entry to the local cache.
"""
self.entry_history.pop()
self.entry_history.appendleft(entry)
self.logger.info("Active entry history:\n"+str(self.entry_history))
def get_active_entry(self):
"""
Retrieves the currently playing `Entry` from the local cache.
"""
return self.entry_history[0]
def store_trackservice_entry(self, source):
"""
Stores the given entry the Track Service.
Args:
source (String): The URI of the currently playing source
Raises:
(NoActiveEntryException): In case currently nothing is playing
"""
active_entry = self.get_active_entry()
if not active_entry:
raise NoActiveEntryException
if active_entry.filename == source:
trackservice = TrackService(active_entry)
trackservice.store(add=True, commit=True)
active_entry.trackservice_id = trackservice.id
active_entry.store(add=False, commit=True)
self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice))
else:
msg = "Active entry source '%s' != '%s' activated source." % (active_entry.filename, source)
self.logger.critical(SimpleUtil.red(msg))
# def adapt_trackservice_title(self, filename):
# """
# Updates the track-service entry with the info from a fallback track/playlist.
# """
# liquidsoap_offset = int(self.config.lqs_delay_offset)
# scheduled_entry = self.get_active_entry(liquidsoap_offset)
# entry = SingleEntry()
# meta = SingleEntryMetaData()
# # # Validate artist and title
# # if not title:
# # title = self.config.get("fallback_title_not_available")
# # Create Entry
# entry.filename = filename
# entry.duration = self.fallback_manager.get_track_duration(filename)
# if not entry.duration:
# self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry)))
# # Create track service log for local station fallback (type=4)
# trackservice = TrackService(entry, 4)
# trackservice.store(add=True, commit=True)
# entry.store(add=True, commit=True)
# # Create Meta
# meta.artist = "----FIXME"
# meta.album = ""
# meta.title = "----TODO"
# meta.single_entry_id = entry.id
# meta.store(add=True, commit=True)
# # Reference each other
# entry.meta_data_id = meta.id
# entry.trackservice_id = trackservice.id
# entry.store(add=False, commit=True)
# msg = "Track Service active track '%s' updated with fallback source '%s - %s'!" % (scheduled_entry, meta.artist, meta.title)
# self.logger.info(msg)
# return msg
#
# PRIVATE METHODS
#
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import time
import logging
import datetime
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm, func, BigInteger, Boolean, Column, DateTime, Integer, String, ForeignKey, ForeignKeyConstraint
from sqlalchemy.orm import relationship
from sqlalchemy.sql.expression import false, true
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from modules.base.enum import Channel, ChannelType
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil, EngineUtil
# Init Config
config = AuraConfig()
# Initialize DB Model and session
engine = sa.create_engine(config.get_database_uri())
Base = declarative_base()
Base.metadata.bind = engine
class DB():
session = orm.scoped_session(orm.sessionmaker())(bind=engine)
Model = Base
class AuraDatabaseModel():
"""
AuraDataBaseModel.
Holding all tables and relationships for the engine.
"""
logger = None
def __init__(self):
"""
Constructor.
"""
self.logger = logging.getLogger("AuraEngine")
def store(self, add=False, commit=False):
"""
Store to the database
"""
if add:
DB.session.add(self)
if commit:
DB.session.commit()
def delete(self, commit=False):
"""
Delete from the database
"""
DB.session.delete(self)
if commit:
DB.session.commit()
def _asdict(self):
return self.__dict__
@staticmethod
def recreate_db(systemexit = False):
"""
Re-creates the database for developments purposes.
"""
manualschedule = Schedule()
manualschedule.schedule_id = 0
manualschedule.show_name = "Manual Show"
Base.metadata.drop_all()
Base.metadata.create_all()
# self.logger.debug("inserting manual scheduling possibility and fallback trackservice schedule")
# DB.session.add(manualschedule)
# db.session.add(fallback_trackservice_schedule)
# self.logger.debug("all created. commiting...")
DB.session.commit()
if systemexit:
sys.exit(0)
#
# SCHEDULES & PLAYLISTS
#
class Schedule(DB.Model, AuraDatabaseModel):
"""
One specific Schedule for a show on a timeslot.
Holding references to playlists and fallback-playlists.
"""
__tablename__ = 'schedule'
# Primary keys
id = Column(Integer, primary_key=True, autoincrement=True)
schedule_start = Column(DateTime, unique=True, index=True)
schedule_end = Column(DateTime, unique=True, index=True)
schedule_id = Column(Integer, unique=True)
show_id = Column(Integer)
show_name = Column(String(256))
show_hosts = Column(String(256))
funding_category = Column(String(256))
comment = Column(String(512))
languages = Column(String(256))
type = Column(String(256))
category = Column(String(256))
topic = Column(String(256))
musicfocus = Column(String(256))
is_repetition = Column(Boolean())
playlist_id = Column(Integer) #, ForeignKey("playlist.playlist_id"))
schedule_fallback_id = Column(Integer)
show_fallback_id = Column(Integer)
station_fallback_id = Column(Integer)
playlist = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.playlist_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
schedule_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.schedule_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
show_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.show_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
station_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.station_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
@staticmethod
def select_show_on_datetime(datetime):
return DB.session.query(Schedule).filter(Schedule.schedule_start == datetime).first()
@staticmethod
def select_programme(date_from=datetime.date.today()):
"""
Select all schedules starting from `date_from` or from today if no
parameter is passed.
Args:
date_from (datetime): Select schedules from this date and time on
Returns:
([Schedule]): List of schedules
"""
schedules = DB.session.query(Schedule).\
filter(Schedule.schedule_start >= date_from).\
order_by(Schedule.schedule_start).all()
return schedules
@staticmethod
def select_upcoming(n):
"""
Selects the (`n`) upcoming schedules.
"""
now = datetime.datetime.now()
DB.session.commit() # Required since independend session is used.
schedules = DB.session.query(Schedule).\
filter(Schedule.schedule_start > str(now)).\
order_by(Schedule.schedule_start.asc()).limit(n).all()
return schedules
@hybrid_property
def start_unix(self):
"""
Start time of the schedule in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple())
@hybrid_property
def end_unix(self):
"""
End time of the schedule in UNIX time.
"""
return time.mktime(self.schedule_end.timetuple())
def as_dict(self):
"""
Returns the schedule as a dictionary for serialization.
"""
playlist = self.playlist
return {
"schedule_id": self.schedule_id,
"schedule_start": self.schedule_start.isoformat(),
"schedule_end": self.schedule_end.isoformat(),
"topic": self.topic,
"musicfocus": self.musicfocus,
"funding_category": self.funding_category,
"is_repetition": self.is_repetition,
"category": self.category,
"languages": self.languages,
"comment": self.comment,
"playlist_id": self.playlist_id,
"schedule_fallback_id": self.schedule_fallback_id,
"show_fallback_id": self.show_fallback_id,
"station_fallback_id": self.station_fallback_id,
"show": {
"name": self.show_name,
"type": self.type,
"host": self.show_hosts
},
"playlist": playlist
}
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
return "ID#%s [Show: %s, ShowID: %s | %s - %s ]" % (str(self.schedule_id), self.show_name, str(self.show_id), time_start, time_end)
class Playlist(DB.Model, AuraDatabaseModel):
"""
The playlist containing playlist entries.
"""
__tablename__ = 'playlist'
# pk,fk
artificial_id = Column(Integer, primary_key=True)
schedule_start = Column(DateTime, ForeignKey("schedule.schedule_start"))
# relationships
schedule = relationship("Schedule", uselist=False, back_populates="playlist")
entries = relationship("PlaylistEntry", back_populates="playlist")
# data
playlist_id = Column(Integer, autoincrement=False) # , ForeignKey("schedule.playlist_id"))
show_name = Column(String(256))
fallback_type = Column(Integer)
entry_count = Column(Integer)
@staticmethod
def select_all():
"""
Fetches all entries
"""
all_entries = DB.session.query(Playlist).filter(Playlist.fallback_type == 0).all()
cnt = 0
for entry in all_entries:
entry.programme_index = cnt
cnt = cnt + 1
return all_entries
@staticmethod
def select_playlist_for_schedule(datetime, playlist_id):
"""
Retrieves the playlist for the given schedule identified by `start_date` and `playlist_id`
Args:
start_date (datetime): Date and time when the playlist is scheduled
playlist_id (Integer): The ID of the playlist
Returns:
(Playlist): The playlist, if existing for schedule
Raises:
Exception: In case there a inconsistent database state, such es multiple playlists for given date/time.
"""
playlist = None
playlists = DB.session.query(Playlist).filter(Playlist.schedule_start == datetime).all()
# FIXME There are unknown issues with the native SQL query by ID
# playlists = DB.session.query(Playlist).filter(Playlist.schedule_start == datetime and Playlist.playlist_id == playlist_id).all()
for p in playlists:
if p.playlist_id == playlist_id:
playlist = p
# if playlists and len(playlists) > 1:
# raise Exception("Inconsistent Database State: Multiple playlists for given schedule '%s' and playlist id#%d available!" % (str(datetime), playlist_id))
# if not playlists:
# return None
# return playlists[0]
return playlist
@staticmethod
def select_playlist(playlist_id):
"""
Retrieves all paylists for that given playlist ID.
Args:
playlist_id (Integer): The ID of the playlist
Returns:
(Array<Playlist>): An array holding the playlists
"""
return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id).order_by(Playlist.schedule_start).all()
@hybrid_property
def start_unix(self):
"""
Start time of the playlist in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple())
@hybrid_property
def end_unix(self):
"""
End time of the playlist in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple()) + self.duration
@hybrid_property
def duration(self):
"""
Returns the total length of the playlist in seconds.
Returns:
(Integer): Length in seconds
"""
total = 0
for entry in self.entries:
total += entry.duration
return total
def as_dict(self):
"""
Returns the playlist as a dictionary for serialization.
"""
entries = []
for e in self.entries:
entries.append(e.as_dict())
playlist = {
"playlist_id": self.playlist_id,
"fallback_type": self.fallback_type,
"entry_count": self.entry_count,
"entries": entries
}
return playlist
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
return "ID#%s [items: %s | %s - %s]" % (str(self.playlist_id), str(self.entry_count), str(time_start), str(time_end))
class PlaylistEntry(DB.Model, AuraDatabaseModel):
"""
Playlist entries are the individual items of a playlist such as audio files.
"""
__tablename__ = 'playlist_entry'
# primary keys
artificial_id = Column(Integer, primary_key=True)
# foreign keys
artificial_playlist_id = Column(Integer, ForeignKey("playlist.artificial_id"))
entry_num = Column(Integer) # , primary_key=True)
uri = Column(String(1024))
duration = Column(BigInteger)
filename = Column(String(1024))
entry_start = Column(DateTime)
queue_state = None # Assigned when entry is about to be queued
channel = None # Assigned when entry is actually played
# relationships
playlist = relationship("Playlist", uselist=False, back_populates="entries")
meta_data = relationship("PlaylistEntryMetaData", uselist=False, back_populates="entry")
@staticmethod
def select_playlistentry_for_playlist(artificial_playlist_id, entry_num):
return DB.session.query(PlaylistEntry).filter(PlaylistEntry.artificial_playlist_id == artificial_playlist_id, PlaylistEntry.entry_num == entry_num).first()
@hybrid_property
def entry_end(self):
return self.entry_start + datetime.timedelta(seconds=self.duration)
@hybrid_property
def start_unix(self):
return time.mktime(self.entry_start.timetuple())
@hybrid_property
def end_unix(self):
return time.mktime(self.entry_end.timetuple())
@hybrid_property
def volume(self):
return 100 # FIXME Make DB Column
@hybrid_property
def type(self):
return EngineUtil.get_channel_type(self.uri)
def get_prev_entries(self):
"""
Retrieves all previous entries as part of the current entry's playlist.
Returns:
(List): List of PlaylistEntry
"""
prev_entries = []
for entry in self.playlist.entries:
if entry.entry_start < self.entry_start:
prev_entries.append(entry)
return prev_entries
def get_next_entries(self, schedule_sensitive=True):
"""
Retrieves all following entries as part of the current entry's playlist.
Args:
schedule_sensitive (Boolean): If `True` entries which start after \
the end of the schedule are excluded
Returns:
(List): List of PlaylistEntry
"""
next_entries = []
for entry in self.playlist.entries:
if entry.entry_start > self.entry_start:
if schedule_sensitive:
if entry.entry_start < self.playlist.schedule.schedule_end:
next_entries.append(entry)
else:
next_entries.append(entry)
return next_entries
def as_dict(self):
"""
Returns the entry as a dictionary for serialization.
"""
if self.meta_data:
return {
"id": self.artificial_id,
"duration": self.duration,
"artist": self.meta_data.artist,
"album": self.meta_data.album,
"title": self.meta_data.title
}
return None
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
track = self.filename[-25:]
return "PlaylistEntry #%s [%s - %s | %ssec | Track: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track)
class PlaylistEntryMetaData(DB.Model, AuraDatabaseModel):
"""
Metadata for a playlist entry such as the artist and track name.
"""
__tablename__ = "playlist_entry_metadata"
artificial_id = Column(Integer, primary_key=True)
artificial_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"))
artist = Column(String(256))
title = Column(String(256))
album = Column(String(256))
entry = relationship("PlaylistEntry", uselist=False, back_populates="meta_data")
@staticmethod
def select_metadata_for_entry(artificial_playlistentry_id):
return DB.session.query(PlaylistEntry).filter(PlaylistEntryMetaData.artificial_entry_id == artificial_playlistentry_id).first()
#
# TRACK SERVICE
#
class TrackService(DB.Model, AuraDatabaseModel):
"""
TrackService holding track-service items consisting of
"""
__tablename__ = 'trackservice'
# Primary keys
id = Column(Integer, primary_key=True, autoincrement=True)
# Foreign keys
track_start = Column(DateTime)
track_end = Column(DateTime) # Currently not used, maybe later for timing checks and multi-entry avoidance.
artificial_schedule_id = Column(Integer, ForeignKey("schedule.id"))
artificial_playlist_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"), nullable=True)
single_entry_id = Column(Integer, ForeignKey("single_entry.id"), nullable=True)
# Data
schedule = relationship("Schedule", foreign_keys=[artificial_schedule_id], lazy="joined")
playlist_entry = relationship("PlaylistEntry", primaryjoin="and_(TrackService.artificial_playlist_entry_id==PlaylistEntry.artificial_id)", lazy="joined")
single_entry = relationship("SingleEntry", foreign_keys=[single_entry_id], lazy="joined")
fallback_type = Column(Integer, default=0)
def __init__(self, entry, fallback_type=0):
"""
Initializes a trackservice entry based on a playlist entry.
"""
self.track_start = datetime.datetime.now()
# if entry.duration:
# self.track_end = self.track_start + datetime.timedelta(seconds=entry.duration)
self.fallback_type = fallback_type
if fallback_type < 4:
self.schedule_start = entry.playlist.schedule_start
self.artificial_playlist_entry_id = entry.artificial_id
self.playlist_entry = entry
self.schedule = entry.playlist.schedule
else:
self.single_entry = entry
@hybrid_property
def track(self):
"""
Retrieves the track information as a dictionary.
Depending on possible fallback scenarios either `playlist_entry` or `single_entry` is used as a basis:
- Scenario 1: No fallback, all info is gathered via the playlist entry
- Scenario 2: Fallback-type > 0, info is also gathered via the defined playlist entry
- Scenario 3: This type of fallback didn't get scheduled; a single entry is played
"""
if self.playlist_entry:
return self.playlist_entry.as_dict()
elif self.single_entry:
return self.single_entry.as_dict()
else:
return None
@hybrid_property
def show(self):
"""
Retrieves show information based on the related schedule. If no schedule
is available (e.g. when the engine is in a fallback state), then the default
show properties from `AuraConfig` are returned.
"""
show_info = {}
if self.schedule:
show_info["name"] = self.schedule.show_name
show_info["type"] = self.schedule.type
show_info["host"] = self.schedule.show_hosts
elif self.fallback_type == 4:
show_info["name"] = config.get("fallback_show_name")
show_info["type"] = config.get("fallback_show_type")
show_info["host"] = config.get("fallback_show_host")
return show_info
@staticmethod
def select_one(id):
"""
Select one specific track-service item by ID.
"""
DB.session.commit() # Required since independend session is used.
track = DB.session.query(TrackService).filter(TrackService.id == id).first()
return track
@staticmethod
def select_current():
"""
Selects the currently playing track.
"""
now = datetime.datetime.now()
DB.session.commit() # Required since independend session is used.
track = DB.session.query(TrackService).\
filter(TrackService.track_start <= str(now)).\
order_by(TrackService.track_start.desc()).first()
return track
@staticmethod
def select_last_hours(n):
"""
Selects the tracks playing in the past (`n`) hours.
"""
last_hours = datetime.datetime.today() - datetime.timedelta(hours=n)
DB.session.commit() # Required since independend session is used.
tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(last_hours)).all()
for track in tracks:
track = TrackService.select_one(track.id)
return tracks
@staticmethod
def select_by_day(day):
"""
Select the track-service items for a day.
"""
day_plus_one = day + datetime.timedelta(days=1)
DB.session.commit() # Required since independend session is used.
tracks = DB.session.query(TrackService).\
filter(TrackService.track_start >= str(day), TrackService.track_start < str(day_plus_one)).\
order_by(TrackService.track_start.desc()).all()
res = []
for item in tracks:
if item.track: res.append(item)
return res
@staticmethod
def select_by_range(from_day, to_day):
"""
Selects the track-service items for a day range.
"""
DB.session.commit()
tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(from_day),
TrackService.track_start < str(to_day)).all()
return tracks
def __str__(self):
"""
Convert to String.
"""
return "TrackID: #%s [track_start: %s, artificial_playlist_entry_id: %s]" % (str(self.id), str(self.track_start), str(self.artificial_playlist_entry_id))
class SingleEntry(DB.Model, AuraDatabaseModel):
"""
An entry played in case of e.g. a local fallback or custom programming without a playlist nor schedule.
"""
__tablename__ = 'single_entry'
# Primary keys
id = Column(Integer, primary_key=True)
# Relationships
trackservice_id = Column(Integer) #, ForeignKey("trackservice.id"))
meta_data_id = Column(Integer) #, ForeignKey("trackservice.id"))
trackservice = relationship("TrackService", uselist=False, back_populates="single_entry")
meta_data = relationship("SingleEntryMetaData", uselist=False, back_populates="entry")
# Data
uri = Column(String(1024))
duration = Column(BigInteger)
filename = Column(String(1024))
entry_start = Column(DateTime)
@hybrid_property
def entry_end(self):
return self.entry_start + datetime.timedelta(seconds=self.duration)
@hybrid_property
def start_unix(self):
return time.mktime(self.entry_start.timetuple())
@hybrid_property
def end_unix(self):
return time.mktime(self.entry_start.timetuple()) + self.duration
@hybrid_property
def volume(self):
return 100
@hybrid_property
def type(self):
return EngineUtil.get_channel_type(self.uri)
@hybrid_property
def channel(self):
type = EngineUtil.get_channel_type(self.uri)
if type == ChannelType.FILESYSTEM:
return Channel.FILESYSTEM_A
elif type == ChannelType.STREAM:
return Channel.STREAM_A
else:
return "foo:bar"
#FIXME Extend & finalize!!
def as_dict(self):
"""
Returns the entry as a dictionary for serialization.
"""
if self.meta_data:
return {
"duration": self.duration,
"artist": self.meta_data.artist,
"album": self.meta_data.album,
"title": self.meta_data.title
}
return None
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
track = self.filename[-25:]
return "SingleEntry #%s [%s - %s | %ssec | Track: ...%s]" % (str(self.id), time_start, time_end, self.duration, track)
class SingleEntryMetaData(DB.Model, AuraDatabaseModel):
"""
Metadata for a autonomous entry such as the artist and track name.
"""
__tablename__ = "single_entry_metadata"
id = Column(Integer, primary_key=True)
single_entry_id = Column(Integer, ForeignKey("single_entry.id"))
artist = Column(String(256))
title = Column(String(256))
album = Column(String(256))
entry = relationship("SingleEntry", uselist=False, back_populates="meta_data")
@staticmethod
def select_metadata_for_entry(single_entry_id):
return DB.session.query(SingleEntry).filter(SingleEntryMetaData.id == single_entry_id).first()
#
# LEGACY CLASSES
#
# ------------------------------------------------------------------------------------------ #
# class Schedule(DB.Model, AuraDatabaseModel):
# """
# One specific Schedule for a show on a timeslot
# """
# __tablename__ = 'schedule'
#
# # primary and foreign keys
# schedule_start = Column(DateTime, primary_key=True)
#
# schedule_end = Column(DateTime)
# schedule_id = Column(Integer) #, primary_key=True, autoincrement=False)
# show_id = Column(Integer) # well, in fact not needed..
# show_name = Column(String(256))
# show_hosts = Column(String(256))
# funding_category = Column(String(256))
# comment = Column(String(512))
# languages = Column(String(256))
# type = Column(String(256))
# category = Column(String(256))
# topic = Column(String(256))
# musicfocus = Column(String(256))
#
# is_repetition = Column(Boolean())
#
# playlist_id = Column(Integer, ForeignKey("playlist.playlist_id"))
# timeslot_fallback_id = Column(Integer)
# show_fallback_id = Column(Integer)
# station_fallback_id = Column(Integer)
#
# playlist = relationship("Playlist", foreign_keys=[playlist_id], lazy="joined")
# # timeslot_fallback = relationship("Playlist", foreign_keys=[timeslot_fallback_id], lazy="joined")
# # show_fallback = relationship("Playlist", foreign_keys=[show_fallback_id], lazy="joined")
# # station_fallback = relationship("Playlist", foreign_keys=[station_fallback_id], lazy="joined")
#
# @staticmethod
# def select_all():
# # fetching all entries
# all_entries = DB.session.query(Schedule).filter().order_by(Schedule.schedule_start).all()
# return all_entries
#
# @staticmethod
# def select_by_id(id):
# entry = DB.session.query(Schedule).filter(Schedule.schedule_id == id).first()
# return entry
# @staticmethod
# def select_act_programme():
# #DB.session.query(Schedule).filter
# # fetching all from today to ..
# today = datetime.date.today()
# all_entries = DB.session.query(Schedule).filter(Schedule.schedule_start >= today).order_by(Schedule.schedule_start).all()
#
# return all_entries
#
#
# @staticmethod
# def drop_the_future(timedelta):
# then = datetime.datetime.now() + timedelta
#
# # is this really necessary?
# future_entries = DB.session.query(Schedule).filter(Schedule.schedule_start > then)
# for e in future_entries:
# e.delete()
# DB.session.commit()
#
# def get_length(self):
# sec1 = int(datetime.datetime.strptime(self.start[0:16].replace(" ", "T"), "%Y-%m-%dT%H:%M").strftime("%s"))
# sec2 = int(datetime.datetime.strptime(self.end[0:16].replace(" ", "T"), "%Y-%m-%dT%H:%M").strftime("%s"))
# len = sec2 - sec1
# return len
#
# # ------------------------------------------------------------------------------------------ #
# def __str__(self):
# return "ScheduleID: #" + str(self.schedule_id) + " Showname: " + self.show_name + " starts @ " + str(self.schedule_start)
# ------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------ #
#class PlaylistEntry(DB.Model, AuraDatabaseModel):
# __tablename__ = 'playlist_entry'
#
# # primary and foreign keys
# playlist_id = Column(Integer, ForeignKey("playlist.playlist_id"), primary_key=True, nullable=False, autoincrement=True)
# entry_num = Column(Integer, primary_key=True, nullable=False, autoincrement=False)
#
# uri = Column(String(1024))
#
# filename = ""
# cleansource = ""
# cleanprotocol = ""
# type = None
# fadeintimer = None
# fadeouttimer = None
# switchtimer = None
#
# meta_data = relationship("PlaylistEntryMetaData", primaryjoin="and_(PlaylistEntry.playlist_id==PlaylistEntryMetaData.playlist_id, PlaylistEntry.entry_num==PlaylistEntryMetaData.entry_num)", lazy="joined")
#
# # normal constructor
# def __init__(self, **kwargs):
# super(PlaylistEntry, self).__init__(**kwargs)
# self.calc_unix_times()
# self.define_clean_source()
#
# # constructor like - called from sqlalchemy
# @orm.reconstructor
# def reconstructor(self):
# self.calc_unix_times()
# self.define_clean_source()
# self.set_entry_type()
#
# def define_clean_source(self):
# if self.uri is None:
# return None
#
# if self.uri.startswith("http"):
# self.cleanprotocol = self.uri[:7]
# self.cleansource = self.uri
#
# elif self.uri.startswith("linein"):
# self.cleanprotocol = self.uri[:9]
# self.cleansource = self.uri[9:]
#
# elif self.uri.startswith("pool") or self.uri.startswith("file") or self.uri.startswith("live"):
# self.cleanprotocol = self.uri[:7]
# self.cleansource = self.uri[7:]
#
# elif self.uri.startswith("playlist"):
# self.cleanprotocol = self.uri[:11]
# self.cleansource = self.uri[11:]
#
# else:
# self.logger.error("Unknown source protocol")
#
# def set_entry_type(self):
# if self.uri.startswith("http"):
# self.type = ScheduleEntryType.STREAM
# if self.uri.startswith("pool") or self.uri.startswith("playlist") or self.uri.startswith("file"):
# self.type = ScheduleEntryType.FILESYSTEM
# if self.uri.startswith("live") or self.uri.startswith("linein"):
# if self.cleansource == "0":
# self.type = ScheduleEntryType.LIVE_0
# elif self.cleansource == "1":
# self.type = ScheduleEntryType.LIVE_1
# elif self.cleansource == "2":
# self.type = ScheduleEntryType.LIVE_2
# elif self.cleansource == "3":
# self.type = ScheduleEntryType.LIVE_3
# elif self.cleansource == "4":
# self.type = ScheduleEntryType.LIVE_4
# def calc_unix_times(self):
# if self.entry_start is not None:
# self.entry_start_unix = time.mktime(self.entry_start.timetuple())
#
#
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_all():
# # fetching all entries
# all_entries = DB.session.query(Playlist).filter(Playlist.fallback_type == 0).order_by(Playlist.entry_start).all()
#
# cnt = 0
# for entry in all_entries:
# entry.programme_index = cnt
# cnt = cnt + 1
#
# return all_entries
#
# @staticmethod
# def select_act_programme(include_act_playing = True):
# # fetching all from today to ..
# today = datetime.date.today()
# all_entries = DB.session.query(Playlist).filter(Playlist.entry_start >= today, Playlist.fallback_type == 0).order_by(Playlist.entry_start).all()
#
# cnt = 0
# for entry in all_entries:
# entry.programme_index = cnt
# cnt = cnt + 1
#
# return all_entries
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def truncate():
# all_entries = DB.session.query(Playlist).filter().order_by(Playlist.entry_start).all()
#
# for a in all_entries:
# a.delete()
# DB.session.commit()
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_next_manual_entry_num():
#
# max_manual_entry_num = DB.session.query(func.max(Playlist.entry_num)).filter(Playlist.schedule_id == 0).first()
#
# if max_manual_entry_num[0] is None:
# return 0
# else:
# return int(max_manual_entry_num[0])+1
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_upcoming(datefrom=datetime.datetime.now()):
# upcomingtracks = DB.session.query(Playlist).filter(Playlist.entry_start > datefrom).order_by(Playlist.entry_start).all()
# return upcomingtracks
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_one(playlist_id, entry_num):
# return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id, Playlist.entry_num == entry_num).first()
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_one_playlist_entry_for_show(schedule_id, playlist_type, entry_num):
# return DB.session.query(Playlist).filter(Playlist.schedule_id == schedule_id, Playlist.fallback_type == playlist_type, Playlist.entry_num == entry_num).first()
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_playlist(playlist_id):
# return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id).order_by(Playlist.entry_start).all()
#
# @staticmethod
# def drop_the_future(timedelta):
# then = datetime.datetime.now() + timedelta
# #DB.session.delete(ScheduleEntry).filter(ScheduleEntry.entry_start >= then)
#
# # is this really necessary?
# future_entries = DB.session.query(Playlist).filter(Playlist.entry_start > then)
# for e in future_entries:
# e.delete()
# DB.session.commit()
#
# def getChannel(self):
# if self.type == self.type.FILESYSTEM:
# return "fs"
#
# if self.type == self.type.LIVE_0 or self.type == self.type.LIVE_1 or self.type == self.type.LIVE_2 or self.type == self.type.LIVE_3 or self.type == self.type.LIVE_4:
# return "aura_linein_"+self.cleansource # .cleanprotocol[8]
#
# if self.type == self.type.STREAM:
# return "http"
#
#
# # ------------------------------------------------------------------------------------------ #
# def __str__(self):
# return "Showentry starts @ " + str(self.entry_start) + " and plays " + self.source
# class ScheduleEntryFile(DB.Model, AuraDatabaseModel):
# __tablename__ = 'schedule_entry_file'
#
# # primary and foreign keys
# file_id = Column(Integer, primary_key=True, nullable=False, autoincrement=False)
# playlist_id = Column(Integer) #, ForeignKey("schedule_entry.playlist_id")) # primary_key=True, nullable=False, autoincrement=False)
# entry_num = Column(Integer) # , ForeignKey("schedule_entry.entry_num")) # primary_key=True, nullable=False, autoincrement=False)
#
# ForeignKeyConstraint(["playlist_id", "entry_num"], ["schedule_entry.playlist_id", "schedule_entry.entry_num"])
#
# show = Column(String(512))
# size = Column(Integer)
# duration = Column(Integer)
#
# class ScheduleEntryFileMetaData(DB.Model, AuraDatabaseModel):
# __tablename__ = "schedule_entry_file_metadata"
#
# metadata_id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
# file_id = Column(Integer, ForeignKey("schedule_entry_file.file_id"))
#
# artist = Column(String(256))
# title = Column(String(256))
# album = Column(String(256))
#
# # ------------------------------------------------------------------------------------------ #
# class TrackService(DB.Model, AuraDatabaseModel):
# __tablename__ = 'trackservice'
#
# trackservice_id = Column(Integer, primary_key=True, autoincrement=True)
# schedule_entry_id = Column(Integer, ForeignKey("schedule_entry.id"))
# playlist_id = Column(Integer, nullable=False)
# entry_num = Column(Integer, nullable=False)
#
# source = Column(String(255), nullable=False)
# start = Column(DateTime, nullable=False, default=func.now())
# __table_args__ = (
# ForeignKeyConstraint(['playlist_id', 'entry_num'], ['schedule_entry.playlist_id', 'schedule_entry.entry_num']),
# )
# schedule_entry = relationship("ScheduleEntry", primaryjoin="and_(TrackService.playlist_id==ScheduleEntry.playlist_id, TrackService.entry_num==ScheduleEntry.entry_num)", lazy="joined")
#schedule = relationship("Schedule", foreign_keys=[schedule_id], lazy="joined")
# trackservice_entry = relationship("ScheduleEntry", foreign_keys=[playlist_id, entry_num], lazy="joined")
# schedule_entry = relationship("ScheduleEntry", primaryjoin="and_(TrackService.schedule_entry_id==ScheduleEntry.id)", lazy="joined")
#
# @staticmethod
# # ------------------------------------------------------------------------------------------ #
# def select_one(trackservice_id):
# return DB.session.query(TrackService).filter(TrackService.trackservice_id == trackservice_id).first()
#
# @staticmethod
# # ------------------------------------------------------------------------------------------ #
# def select_by_day(day):
# day_plus_one = day + datetime.timedelta(days=1)
# tracks = DB.session.query(TrackService).filter(TrackService.start >= str(day), TrackService.start < str(day_plus_one)).all()
# return tracks
#
# @staticmethod
# # ------------------------------------------------------------------------------------------ #
# def select_by_range(from_day, to_day):
# tracks = DB.session.query(TrackService).filter(TrackService.start >= str(from_day),
# TrackService.start < str(to_day)).all()
# return tracks
#
# # ------------------------------------------------------------------------------------------ #
# def __str__(self):
# return "TrackServiceID: #" + str(self.trackservice_id) + " playlist_id: " + str(self.playlist_id) + " started @ " + str(self.start) + " and played " + self.source
# ------------------------------------------------------------------------------------------ #
# class TrackServiceSchedule(db.Model, AuraDatabaseModel):
# """
# Trackservice is tracking every schedule.
# """
# __tablename__ = 'trackservice_schedule'
#
# # primary and foreign keys
# ts_schedule_id = Column(Integer, primary_key=True, autoincrement=True)
# schedule_id = Column(Integer, ForeignKey("schedule.schedule_id"))
#
# schedule = relationship("Schedule", foreign_keys=[schedule_id], lazy="joined")
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_one(schedule_id):
# # damn BAND-AID
# # db.session.commit()
#
# return db.session.query(ScheduleEntry).filter(TrackServiceSchedule.schedule_id == schedule_id).first()
#
# # ------------------------------------------------------------------------------------------ #
# class TrackServiceScheduleEntry(db.Model, AuraDatabaseModel):
# """
# And a schedule can have multiple entries
# """
# __tablename__ = 'trackservice_entry'
#
# # primary and foreign keys. the foreign keys here can be null, because of fallback stuff
# ts_entry_id = Column(Integer, primary_key=True, autoincrement=True)
# ts_schedule_id = Column(Integer, ForeignKey("trackservice_schedule.ts_schedule_id"), nullable=True)
# playlist_id = Column(Integer, nullable=True)
# entry_num = Column(Integer, nullable=True)
#
# fallback = Column(Boolean, default=False)
# fallback_start = Column(DateTime, nullable=True, default=None)
# source = Column(String(256), nullable=True, default=None)
#
# # foreign key definitions
# __table_args__ = (
# ForeignKeyConstraint(['playlist_id', 'entry_num'], ['schedule_entry.playlist_id', 'schedule_entry.entry_num']),
# )
#
# trackservice_schedule = relationship("TrackServiceSchedule", foreign_keys=[ts_schedule_id], lazy="joined")
# #trackservice_entry = relationship("ScheduleEntry", foreign_keys=[playlist_id, entry_num], lazy="joined")
# trackservice_entry = relationship("ScheduleEntry", primaryjoin="and_(TrackServiceScheduleEntry.playlist_id==ScheduleEntry.playlist_id, TrackServiceScheduleEntry.entry_num==ScheduleEntry.entry_num)" , lazy="joined")
#
# @staticmethod
# def select_all():
# return db.session.query(TrackServiceScheduleEntry).filter().all()
# AuraDatabaseModel.recreate_db(systemexit=True)
__author__ = 'gg'
#!/usr/bin/liquidsoap
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
icecast_vorbis_metadata = false
inputs = ref []
# load settings from ini file
%include "settings.liq"
# include some functions
%include "library.liq"
# include fallback functions
%include "fallback.liq"
#################
# create inputs #
#################
# enable play from filesystem
%include "in_filesystem.liq"
# enable stream overtakes
%include "in_stream.liq"
# enabled line in from soundcard
%include "in_soundcard.liq"
# fill the mixer
mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_http_0, input_http_1], !inputs))
# mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_filesystem_2, input_filesystem_3, input_filesystem_4, input_http_0, input_http_1], !inputs))
# output source with fallbacks
stripped_stream = strip_blank(id='strip_blank', track_sensitive=false, max_blank=fallback_max_blank, min_noise=fallback_min_noise, threshold=fallback_threshold, mixer)
# enable fallback
# output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)])
#output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)])
output_source = mksafe(stripped_stream)
ignore(timeslot_fallback)
ignore(show_fallback)
ignore(station_fallback)
##################
# create outputs #
##################
# create soundcard output
%include "out_soundcard.liq"
# recording output
%include "out_filesystem.liq"
# stream output
%include "out_stream.liq"
# enable socket functions
%include "serverfunctions.liq"
########################
# start initialization #
########################
system('#{list.assoc(default="", "install_dir", ini)}/guru.py --init-player --quiet')
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# Crossfade between tracks,
# taking the respective volume levels
# into account in the choice of the
# transition.
# @category Source / Track Processing
# @param ~start_next Crossing duration, if any.
# @param ~fade_in Fade-in duration, if any.
# @param ~fade_out Fade-out duration, if any.
# @param ~width Width of the volume analysis window.
# @param ~conservative Always prepare for
# a premature end-of-track.
# @param s The input source.
def crossfade (~start_next=5.,~fade_in=3.,
~fade_out=3., ~width=2.,
~conservative=false,s)
high = -20.
medium = -32.
margin = 4.
fade.out = fade.out(type="sin",duration=fade_out)
fade.in = fade.in(type="sin",duration=fade_in)
add = fun (a,b) -> add(normalize=false,[b,a])
log = log(label="crossfade")
def transition(a,b,ma,mb,sa,sb)
list.iter(fun(x)->
log(level=4,"Before: #{x}"),ma)
list.iter(fun(x)->
log(level=4,"After : #{x}"),mb)
if
# If A and B and not too loud and close,
# fully cross-fade them.
a <= medium and
b <= medium and
abs(a - b) <= margin
then
log("Transition: crossed, fade-in, fade-out.")
add(fade.out(sa),fade.in(sb))
elsif
# If B is significantly louder than A,
# only fade-out A.
# We don't want to fade almost silent things,
# ask for >medium.
b >= a + margin and a >= medium and b <= high
then
log("Transition: crossed, fade-out.")
add(fade.out(sa),sb)
elsif
# Do not fade if it's already very low.
b >= a + margin and a <= medium and b <= high
then
log("Transition: crossed, no fade-out.")
add(sa,sb)
elsif
# Opposite as the previous one.
a >= b + margin and b >= medium and a <= high
then
log("Transition: crossed, fade-in.")
add(sa,fade.in(sb))
# What to do with a loud end and
# a quiet beginning ?
# A good idea is to use a jingle to separate
# the two tracks, but that's another story.
else
# Otherwise, A and B are just too loud
# to overlap nicely, or the difference
# between them is too large and
# overlapping would completely mask one
# of them.
log("No transition: just sequencing.")
sequence([sa, sb])
end
end
cross(width=width, duration=start_next,
conservative=conservative,
transition,s)
end
# Custom crossfade to deal with jingles.
# def smarter_crossfade (~start_next=5.,~fade_in=3.,~fade_out=3.,
# ~default=(fun (a,b) -> sequence([a, b])),
# ~high=-15., ~medium=-32., ~margin=4.,
# ~width=2.,~conservative=false,s)
# fade.out = fade.out(type="sin",duration=fade_out)
# fade.in = fade.in(type="sin",duration=fade_in)
# add = fun (a,b) -> add(normalize=false,[b, a])
# log = log(label="smarter_crossfade")
# def transition(a,b,ma,mb,sa,sb)
# list.iter(fun(x)-> log(level=4,"Before: #{x}"),ma)
# list.iter(fun(x)-> log(level=4,"After : #{x}"),mb)
# if ma["type"] == "jingles" or mb["type"] == "jingles" then
# log("Old or new file is a jingle: sequenced transition.")
# sequence([sa, sb])
# elsif
# # If A and B are not too loud and close, fully cross-fade them.
# a <= medium and b <= medium and abs(a - b) <= margin
# then
# log("Old <= medium, new <= medium and |old-new| <= margin.")
# log("Old and new source are not too loud and close.")
# log("Transition: crossed, fade-in, fade-out.")
# add(fade.out(sa),fade.in(sb))
# elsif
# # If B is significantly louder than A, only fade-out A.
# # We don't want to fade almost silent things, ask for >medium.
# b >= a + margin and a >= medium and b <= high
# then
# log("new >= old + margin, old >= medium and new <= high.")
# log("New source is significantly louder than old one.")
# log("Transition: crossed, fade-out.")
# add(fade.out(sa),sb)
# elsif
# # Opposite as the previous one.
# a >= b + margin and b >= medium and a <= high
# then
# log("old >= new + margin, new >= medium and old <= high")
# log("Old source is significantly louder than new one.")
# log("Transition: crossed, fade-in.")
# add(sa,fade.in(sb))
# elsif
# # Do not fade if it's already very low.
# b >= a + margin and a <= medium and b <= high
# then
# log("new >= old + margin, old <= medium and new <= high.")
# log("Do not fade if it's already very low.")
# log("Transition: crossed, no fade.")
# add(sa,sb)
# # What to do with a loud end and a quiet beginning ?
# # A good idea is to use a jingle to separate the two tracks,
# # but that's another story.
# else
# # Otherwise, A and B are just too loud to overlap nicely,
# # or the difference between them is too large and overlapping would
# # completely mask one of them.
# log("No transition: using default.")
# default(sa, sb)
# end
# end
# #smart_cross(width=width, duration=start_next, conservative=conservative, transition, s)
# smart_crossfade(duration=start_next, fade_in=fade_in, fade_out=fade_out, width=width, conservative=conservative, transition, s)
# end
# create a pool
def fallback_create(~skip=true, name, requestor)
log("Creating channel #{name}")
# Create the request.dynamic source
# Set conservative to true to queue several songs in advance
#source = request.dynamic(conservative=true, length=50., id="pool_"^name, requestor, timeout=60.)
source = request.dynamic(length=50., id="pool_"^name, requestor, timeout=60.)
# Apply normalization using replaygain information
source = amplify(1., override="replay_gain", source)
# Skip blank when asked to
source =
if skip then
skip_blank(max_blank=fallback_max_blank, min_noise=fallback_min_noise, threshold=fallback_threshold, source)
else
source
end
# Tell the system when a new track is played
def do_meta(meta) =
filename = meta["filename"]
# artist = meta["artist"]
# title = meta["title"]
system('#{list.assoc(default="", "install_dir", ini)}/guru.py --on_play "#{filename}"')
end
source = on_metadata(do_meta, source)
log("channel created")
# Finally apply a smart crossfading
#smarter_crossfade(source)
crossfade(source)
end
def create_dynamic_playlist(next)
request.create(list.hd(default="", next))
end
def create_playlist() =
log("requesting next song for PLAYLIST")
result = get_process_lines('#{list.assoc(default="", "install_dir", ini)}/guru.py --get-next-file-for "playlist" --quiet')
create_dynamic_playlist(result)
end
def create_station_fallback() =
result = get_process_lines('#{list.assoc(default="", "install_dir", ini)}/guru.py --get-next-file-for station --quiet')
log("next song for STATION fallback is: #{result}")
create_dynamic_playlist(result)
end
def create_show_fallback() =
result = get_process_lines('#{list.assoc(default="", "install_dir", ini)}/guru.py --get-next-file-for show --quiet')
log("next song for SHOW fallback is: #{result}")
create_dynamic_playlist(result)
end
def create_timeslot_fallback() =
result = get_process_lines('#{list.assoc(default="", "install_dir", ini)}/guru.py --get-next-file-for timeslot --quiet')
log("next song for TIMESLOT fallback is: #{result}")
create_dynamic_playlist(result)
end
# create fallbacks
timeslot_fallback = fallback_create(skip=true, "timeslot_fallback", create_timeslot_fallback)
station_fallback = fallback_create(skip=true, "station_fallback", create_station_fallback)
show_fallback = fallback_create(skip=true, "show_fallback", create_show_fallback)
\ No newline at end of file
__author__ = 'michel'
import os
import sys
#!/bin/bash
pack_int(){ printf "%08X\n" $1 | sed 's/\([0-9A-F]\{2\}\)\([0-9A-F]\{2\}\)\([0-9A-F]\{2\}\)\([0-9A-F]\{2\}\)/\\\\\\x\4\\\\\\x\3\\\\\\x\2\\\\\\x\1/I' | xargs printf; }
pack_short(){ printf "%04X\n" $1 | sed 's/\([0-9A-F]\{2\}\)\([0-9A-F]\{2\}\)/\\\\\\x\2\\\\\\x\1/I' | xargs printf; }
duration=1800
if [[ $# -eq 1 ]]; then
duration=$1
fi
channels=2
bps=16
sample=44100
Subchunk1Size=18
Subchunk2Size=$(echo "$duration*$sample*$channels*$bps/8" | bc)
ChunkSize=$((20 + $Subchunk1Size + $Subchunk2Size))
echo -n RIFF
pack_int $ChunkSize
echo -n "WAVEfmt "
pack_int $Subchunk1Size
pack_short 1
pack_short $channels
pack_int $sample
pack_int $((bps/8 * channels * sample))
pack_short $((bps/8 * channels))
pack_short $bps
pack_short 0
echo -n data
pack_int $Subchunk2Size
dd if=/dev/zero bs=1 count=$Subchunk2Size 2>/dev/null
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
input_filesystem_0 = request.equeue(id="in_filesystem_0")
input_filesystem_1 = request.equeue(id="in_filesystem_1")
# input_filesystem_2 = request.equeue(id="in_filesystem_2")
# input_filesystem_3 = request.equeue(id="in_filesystem_3")
# input_filesystem_4 = request.equeue(id="in_filesystem_4")
#input_fs = cue_cut(mksafe(request.equeue(id="fs")))
# Update Trackservice
def do_meta_filesystem(meta) =
filename = meta["filename"]
# artist = meta["artist"]
# title = meta["title"]
system('#{list.assoc(default="", "install_dir", ini)}/guru.py --on_play "#{filename}"')
end
input_filesystem_0 = on_metadata(id="in_filesystem_0", do_meta_filesystem, input_filesystem_0)
input_filesystem_1 = on_metadata(id="in_filesystem_1", do_meta_filesystem, input_filesystem_1)
# def clear_queue(s) =
# ret = server.execute("fs.queue")
# #ret = request.equeue(id="fs")
# ret = list.hd(default="",ret)
# ret = string.split(separator=" ",ret)
# #print("input FS.list: #{ret}")
# list.iter(fun(x) -> begin
# print("IGNORE: #{x}")
# ignore(server.execute("fs.ignore #{x}"))
# end, ret)
# res = source.skip(s)
# #(0.5)
# print("SKIP RES: #{res}")
# res = source.skip(s)
# print("SKIP RES: #{res}")
# end
# server.register(namespace="fs",
# description="Flush queue and stop request source.",
# usage="stop",
# "stop",
# fun (s) -> begin clear_queue(input_fs) "Done." end)
def clear_items(ns) =
ret = server.execute("#{source.id(ns)}.primary_queue")
ret = list.hd(default="", ret)
if ret == "" then
log("Queue cleared.")
(-1.)
else
log("There are still items in the queue, trying skip ...")
source.skip(ns)
(0.1)
end
end
def clear_queue(ns) =
add_timeout(fast=false, 0.5, {clear_items(ns)})
end
# Clear Queue 0
server.register(namespace=source.id(input_filesystem_0),
description="Clear all items of the filesystem Queue A.",
usage="clear",
"clear_filesystem_0",
fun (s) -> begin clear_queue(input_filesystem_0) "Done." end)
# Clear Queue 1
server.register(namespace=source.id(input_filesystem_1),
description="Clear all items of the filesystem Queue B.",
usage="clear",
"clear_filesystem_1",
fun (s) -> begin clear_queue(input_filesystem_1) "Done." end)
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
if a0_in != "" then
# we can ignore the result, since it is stored in the list 'inputs'
set_input(a0_in, "linein_0")
end
if a1_in != "" then
ignore(set_input(a1_in, "linein_1"))
end
if a2_in != "" then
ignore(set_input(a2_in, "linein_2"))
end
if a3_in != "" then
ignore(set_input(a3_in, "linein_3"))
end
if a4_in != "" then
ignore(set_input(a4_in, "linein_4"))
# input_4 = ref output.dummy(blank())
# set_input(input_4, a4_in, "linein_4")
# inputs := list.append([!input_4], !inputs)
end
\ No newline at end of file
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# this is overwritten as soon as a streamovertake is programmed, but liquidsoap needs it to initialize this input
#starturl = "http://stream.fro.at/fro-128.ogg"
#starturl = "http://trance.out.airtime.pro:8000/trance_a"
starturl = "http://chill.out.airtime.pro:8000/chill_a"
input_http_0 = input.http(id="in_http_0", buffer=10.0, max=60.0, timeout=60.0, starturl)
input_http_1 = input.http(id="in_http_1", buffer=10.0, max=60.0, timeout=60.0, starturl)
# Route input stream to an dummy output to avoid buffer-overrun messages
output.dummy(id="SPAM_STREAM_OUTPUT_0", fallible=true, input_http_0)
output.dummy(id="SPAM_STREAM_OUTPUT_1", fallible=true, input_http_1)
\ No newline at end of file
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
#####################
# stream to icecast #
#####################
def stream_to_icecast(id, encoding, bitrate, host, port, pass, mount_point, url, description, genre, user, stream, streamnumber, connected, name, channels) =
source = ref stream
def on_error(msg)
connected := "false"
log(msg)
5.
end
def on_connect()
connected := "true"
log("Successfully connected to stream_#{streamnumber}")
end
stereo = (int_of_string(channels) >= 2)
user_ref = ref user
if user == "" then
user_ref := "source"
end
# dumbass liquidsoap cannot handle one output definition for mono and stereo
output_icecast_mono = output.icecast(id = id, host = host, port = port, password = pass, mount = mount_point, fallible = true, url = url, description = description, name = name, genre = genre, user = !user_ref, on_error = on_error, on_connect = on_connect)
output_icecast_stereo = output.icecast(id = id, host = host, port = port, password = pass, mount = mount_point, fallible = true, url = url, description = description, name = name, genre = genre, user = !user_ref, on_error = on_error, on_connect = on_connect)
# %ifencoder %aac
# if encoding == "aac" then
# log("ENABLING AAC to ICECAST")
# %include "outgoing_streams/aac.liq"
# end
# %endif
#
# %ifencoder %flac
# if encoding == "flac" then
# log("ENABLING FLAC to ICECAST")
# %include "outgoing_streams/flac.liq"
# end
# %endif
if encoding == "mp3" then
log("ENABLING Mp3 to ICECAST")
%include "outgoing_streams/mp3.liq"
end
if encoding == "ogg" then
log("ENABLING OGG to ICECAST")
%include "outgoing_streams/ogg.liq"
end
# %ifencoder %opus
# if encoding == "opus" then
# log("ENABLING OPUS to ICECAST")
# %include "outgoing_streams/opus.liq"
# end
# %endif
end
###########
# line in #
###########
def set_input(device, name) =
if use_alsa == true then
alsa_in = input.alsa(id=name, device=a0_in, clock_safe=false, bufferize = false)
inputs := list.append([alsa_in], !inputs)
elsif use_jack == true then
jack_in = input.jack(id=name, clock_safe=false)
inputs := list.append([jack_in], !inputs)
else
pulse_in = input.pulseaudio(id=name, client="AuraEngine Line IN")
inputs := list.append([pulse_in], !inputs)
end
end
############
# line out #
############
def get_output(source, device, name) =
if device != "" then
if use_alsa == true then
log("--- Set ALSA Output ---")
if device == "default" then
output.alsa(id="lineout", bufferize = false, source)
else
output.alsa(id=name, device=device, bufferize = false, source)
end
elsif use_jack == true then
log("--- Set JACK AUDIO Output ---")
output.jack(id=name, source)
else
log("--- Set PULSE AUDIO Output ---")
output.pulseaudio(id=name, client="AuraEngine Line OUT", source)
end
else
log("OUTPUT DUMMY")
output.dummy(id=name^"_DUMMY", blank())
end
end
########################
# record to filesystem #
########################
# shows current file and how many bytes were written so far
def currecording(recfile)
if recfile != "" then
bytes_written = list.hd(default="", get_process_lines("echo $(($(stat -c%s "^recfile^")))"))
"#{recfile}, #{bytes_written}B"
else
""
end
end
def start_recorder(folder, duration, encoding, bitrate, channels, filenamepattern, is_recording, stream, recorder_number) =
source = ref stream
stereo = (int_of_string(channels) >= 2)
# define on_start, on_close (good case) and on_stop (error case)
recfile = ref ''
def on_start()
is_recording := true
recfile := list.hd(default="", get_process_lines("date +#{filenamepattern}"))
end
def on_close(filename)
is_recording := false
recfile := list.hd(default="", get_process_lines("date +#{filenamepattern}"))
end
def on_stop()
is_recording := false
end
# register server function
server.register(namespace="recorder_"^recorder_number, description="Show current file.", usage="curfile", "curfile", fun (s) -> currecording(!recfile) )
# dumbass liquidsoap cannot handle one output definition for mono and stereo
output_filesystem_mono = output.file(id="recorder_"^recorder_number, perm = 0o664, on_start=on_start, on_close=on_close, on_stop=on_stop, reopen_when={ int_of_float(gettimeofday()/60.) mod duration == 0 })
output_filesystem_stereo = output.file(id="recorder_"^recorder_number, perm = 0o664, on_start=on_start, on_close=on_close, on_stop=on_stop, reopen_when={ int_of_float(gettimeofday()/60.) mod duration == 0 })
# %ifencoder %aac
# if encoding == "aac" then
# log("ENABLING aac recorder to filesystem")
# %include "outgoing_recordings/aac.liq"
# end
# %endif
# %ifencoder %flac
if encoding == "flac" then
log("ENABLING flac recorder to filesystem")
%include "outgoing_recordings/flac.liq"
end
# %endif
# %ifencoder %mp3
if encoding == "mp3" then
log("ENABLING mp3 recorder to filesystem")
%include "outgoing_recordings/mp3.liq"
end
# %endif
# %ifencoder %vorbis
if encoding == "ogg" then
log("ENABLING ogg recorder to filesystem")
%include "outgoing_recordings/ogg.liq"
end
# %endif
# %ifencoder %opus
if encoding == "opus" then
log("ENABLING opus recorder to filesystem")
%include "outgoing_recordings/opus.liq"
end
# %endif
# %ifencoder %wav
if encoding == "wav" then
log("ENABLING wav recorder to filesystem")
%include "outgoing_recordings/wav.liq"
end
# %endif
end
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
r0_enable = list.assoc(default="", "rec_0", ini) == "y"
r1_enable = list.assoc(default="", "rec_1", ini) == "y"
r2_enable = list.assoc(default="", "rec_2", ini) == "y"
r3_enable = list.assoc(default="", "rec_3", ini) == "y"
r4_enable = list.assoc(default="", "rec_4", ini) == "y"
r0_folder = list.assoc(default="", "rec_0_folder", ini)
r0_duration = int_of_string(list.assoc(default="", "rec_0_duration", ini))
r0_encoding = list.assoc(default="", "rec_0_encoding", ini)
r0_bitrate = int_of_string(list.assoc(default="", "rec_0_bitrate", ini))
r0_channels = list.assoc(default="", "rec_0_channels", ini)
r0_filenamepattern = r0_folder^"/%Y-%m-%d/%Y-%m-%d-%H-%M."^r0_encoding
r1_folder = list.assoc(default="", "rec_1_folder", ini)
r1_duration = int_of_string(list.assoc(default="", "rec_1_duration", ini))
r1_encoding = list.assoc(default="", "rec_1_encoding", ini)
r1_bitrate = int_of_string(list.assoc(default="", "rec_1_bitrate", ini))
r1_channels = list.assoc(default="", "rec_1_channels", ini)
r1_filenamepattern = r1_folder^"/%Y-%m-%d/%Y-%m-%d-%H-%M."^r1_encoding
r2_folder = list.assoc(default="", "rec_2_folder", ini)
r2_duration = int_of_string(list.assoc(default="", "rec_2_duration", ini))
r2_encoding = list.assoc(default="", "rec_2_encoding", ini)
r2_bitrate = int_of_string(list.assoc(default="", "rec_2_bitrate", ini))
r2_channels = list.assoc(default="", "rec_2_channels", ini)
r2_filenamepattern = r2_folder^"/%Y-%m-%d/%Y-%m-%d-%H-%M."^r2_encoding
r3_folder = list.assoc(default="", "rec_3_folder", ini)
r3_duration = int_of_string(list.assoc(default="", "rec_3_duration", ini))
r3_encoding = list.assoc(default="", "rec_3_encoding", ini)
r3_bitrate = int_of_string(list.assoc(default="", "rec_3_bitrate", ini))
r3_channels = list.assoc(default="", "rec_3_channels", ini)
r3_filenamepattern = r3_folder^"/%Y-%m-%d/%Y-%m-%d-%H-%M."^r3_encoding
r4_folder = list.assoc(default="", "rec_4_folder", ini)
r4_duration = int_of_string(list.assoc(default="", "rec_4_duration", ini))
r4_encoding = list.assoc(default="", "rec_4_encoding", ini)
r4_bitrate = int_of_string(list.assoc(default="", "rec_4_bitrate", ini))
r4_channels = list.assoc(default="", "rec_4_channels", ini)
r4_filenamepattern = r4_folder^"/%Y-%m-%d/%Y-%m-%d-%H-%M."^r4_encoding
r0_is_recording = ref false
r1_is_recording = ref false
r2_is_recording = ref false
r3_is_recording = ref false
r4_is_recording = ref false
if r0_enable == true then
# enable recording status for that recorder
server.register(namespace="out_filesystem_0", "recording", fun (s) -> begin if !r0_is_recording == false then "false" else "true" end end)
# start the recorder
start_recorder(r0_folder, r0_duration, r0_encoding, r0_bitrate, r0_channels, r0_filenamepattern, r0_is_recording, output_source, "0")
end
if r1_enable == true then
server.register(namespace="out_filesystem_1", "recording", fun (s) -> begin if !r1_is_recording == false then "false" else "true" end end)
start_recorder(r1_folder, r1_duration, r1_encoding, r1_bitrate, r1_channels, r1_filenamepattern, r1_is_recording, output_source, "1")
end
if r2_enable == true then
server.register(namespace="out_filesystem_2", "recording", fun (s) -> begin if !r2_is_recording == false then "false" else "true" end end)
start_recorder(r2_folder, r2_duration, r2_encoding, r2_bitrate, r2_channels, r2_filenamepattern, r2_is_recording, output_source, "2")
end
if r3_enable == true then
server.register(namespace="out_filesystem_3", "recording", fun (s) -> begin if !r3_is_recording == false then "false" else "true" end end)
start_recorder(r3_folder, r3_duration, r3_encoding, r3_bitrate, r3_channels, r3_filenamepattern, r3_is_recording, output_source, "3")
end
if r4_enable == true then
server.register(namespace="out_filesystem_4", "recording", fun (s) -> begin if !r4_is_recording == false then "false" else "true" end end)
start_recorder(r4_folder, r4_duration, r4_encoding, r4_bitrate, r4_channels, r4_filenamepattern, r4_is_recording, output_source, "4")
end
\ No newline at end of file
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
if a0_out != "" then
ignore(get_output(output_source, a0_out, "lineout_0"))
end
if a1_out != "" then
ignore(get_output(output_source, a1_out, "lineout_1"))
end
if a2_out != "" then
ignore(get_output(output_source, a2_out, "lineout_2"))
end
if a3_out != "" then
ignore(get_output(output_source, a3_out, "lineout_3"))
end
if a4_out != "" then
ignore(get_output(output_source, a4_out, "lineout_4"))
#output_4 = ref output.dummy(blank())
#get_output(output_4, output_source, a4_out, "lineout_4")
#output_4 := get_output(output_source, a4_out, "lineout_4")
#get_output(output_source, a4_out, "aura_lineout_4")
end
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
# Output streaming settings
# What a mess...
s0_enable = list.assoc(default="", "stream_0", ini) == "y"
s1_enable = list.assoc(default="", "stream_1", ini) == "y"
s2_enable = list.assoc(default="", "stream_2", ini) == "y"
s3_enable = list.assoc(default="", "stream_3", ini) == "y"
s4_enable = list.assoc(default="", "stream_4", ini) == "y"
s0_encoding = list.assoc(default="", "stream_0_encoding", ini)
s0_bitrate = int_of_string(list.assoc(default="", "stream_0_bitrate", ini))
s0_host = list.assoc(default="", "stream_0_host", ini)
s0_port = int_of_string(list.assoc(default="", "stream_0_port", ini))
s0_user = list.assoc(default="", "stream_0_user", ini)
s0_pass = list.assoc(default="", "stream_0_password", ini)
s0_mount = list.assoc(default="", "stream_0_mountpoint", ini)
s0_url = list.assoc(default="", "stream_0_displayurl", ini)
s0_desc = list.assoc(default="", "stream_0_description", ini)
s0_genre = list.assoc(default="", "stream_0_genre", ini)
s0_name = list.assoc(default="", "stream_0_name", ini)
s0_channels = list.assoc(default="", "stream_0_channels", ini)
s1_encoding = list.assoc(default="", "stream_1_encoding", ini)
s1_bitrate = int_of_string(list.assoc(default="", "stream_1_bitrate", ini))
s1_host = list.assoc(default="", "stream_1_host", ini)
s1_port = int_of_string(list.assoc(default="", "stream_1_port", ini))
s1_user = list.assoc(default="", "stream_1_user", ini)
s1_pass = list.assoc(default="", "stream_1_password", ini)
s1_mount = list.assoc(default="", "stream_1_mountpoint", ini)
s1_url = list.assoc(default="", "stream_1_displayurl", ini)
s1_desc = list.assoc(default="", "stream_1_description", ini)
s1_genre = list.assoc(default="", "stream_1_genre", ini)
s1_name = list.assoc(default="", "stream_1_name", ini)
s1_channels = list.assoc(default="", "stream_1_channels", ini)
s2_encoding = list.assoc(default="", "stream_2_encoding", ini)
s2_bitrate = int_of_string(list.assoc(default="", "stream_2_bitrate", ini))
s2_host = list.assoc(default="", "stream_2_host", ini)
s2_port = int_of_string(list.assoc(default="", "stream_2_port", ini))
s2_user = list.assoc(default="", "stream_2_user", ini)
s2_pass = list.assoc(default="", "stream_2_password", ini)
s2_mount = list.assoc(default="", "stream_2_mountpoint", ini)
s2_url = list.assoc(default="", "stream_2_displayurl", ini)
s2_desc = list.assoc(default="", "stream_2_description", ini)
s2_genre = list.assoc(default="", "stream_2_genre", ini)
s2_name = list.assoc(default="", "stream_2_name", ini)
s2_channels = list.assoc(default="", "stream_2_channels", ini)
s3_encoding = list.assoc(default="", "stream_3_encoding", ini)
s3_bitrate = int_of_string(list.assoc(default="", "stream_3_bitrate", ini))
s3_host = list.assoc(default="", "stream_3_host", ini)
s3_port = int_of_string(list.assoc(default="", "stream_3_port", ini))
s3_user = list.assoc(default="", "stream_3_user", ini)
s3_pass = list.assoc(default="", "stream_3_password", ini)
s3_mount = list.assoc(default="", "stream_3_mountpoint", ini)
s3_url = list.assoc(default="", "stream_3_displayurl", ini)
s3_desc = list.assoc(default="", "stream_3_description", ini)
s3_genre = list.assoc(default="", "stream_3_genre", ini)
s3_name = list.assoc(default="", "stream_3_name", ini)
s3_channels = list.assoc(default="", "stream_3_channels", ini)
s4_encoding = list.assoc(default="", "stream_4_encoding", ini)
s4_bitrate = int_of_string(list.assoc(default="", "stream_4_bitrate", ini))
s4_host = list.assoc(default="", "stream_4_host", ini)
s4_port = int_of_string(list.assoc(default="", "stream_4_port", ini))
s4_user = list.assoc(default="", "stream_4_user", ini)
s4_pass = list.assoc(default="", "stream_4_password", ini)
s4_mount = list.assoc(default="", "stream_4_mountpoint", ini)
s4_url = list.assoc(default="", "stream_4_displayurl", ini)
s4_desc = list.assoc(default="", "stream_4_description", ini)
s4_genre = list.assoc(default="", "stream_4_genre", ini)
s4_name = list.assoc(default="", "stream_4_name", ini)
s4_channels = list.assoc(default="", "stream_4_channels", ini)
s0_connected = ref ''
s1_connected = ref ''
s2_connected = ref ''
s3_connected = ref ''
s4_connected = ref ''
if s0_enable == true then
# enable connection status for that stream
server.register(namespace="out_http_0", "connected", fun (s) -> begin !s0_connected end)
# aaand stream
stream_to_icecast("out_http_0", s0_encoding, s0_bitrate, s0_host, s0_port, s0_pass, s0_mount, s0_url, s0_desc, s0_genre, s0_user, output_source, "0", s0_connected, s0_name, s0_channels)
end
if s1_enable == true then
server.register(namespace="out_http_1", "connected", fun (s) -> begin !s1_connected end)
stream_to_icecast("out_http_1", s1_encoding, s1_bitrate, s1_host, s1_port, s1_pass, s1_mount, s1_url, s1_desc, s1_genre, s1_user, output_source, "1", s1_connected, s1_name, s1_channels)
end
if s2_enable == true then
server.register(namespace="out_http_2", "connected", fun (s) -> begin !s2_connected end)
stream_to_icecast("out_http_2", s2_encoding, s2_bitrate, s2_host, s2_port, s2_pass, s2_mount, s2_url, s2_desc, s2_genre, s2_user, output_source, "2", s2_connected, s2_name, s2_channels)
end
if s3_enable == true then
server.register(namespace="out_http_3", "connected", fun (s) -> begin !s3_connected end)
stream_to_icecast("out_http_3", s3_encoding, s3_bitrate, s3_host, s3_port, s3_pass, s3_mount, s3_url, s3_desc, s3_genre, s3_user, output_source, "3", s3_connected, s3_name, s3_channels)
end
if s4_enable == true then
server.register(namespace="out_http_4", "connected", fun (s) -> begin !s4_connected end)
stream_to_icecast("out_http_4", s4_encoding, s4_bitrate, s4_host, s4_port, s4_pass, s4_mount, s4_url, s4_desc, s4_genre, s4_user, output_source, "4", s4_connected, s4_name, s4_channels)
end