Skip to content
Snippets Groups Projects
Commit 97b13ddc authored by David Trattnig's avatar David Trattnig
Browse files

Refactoring to remove Redis. #39

parent 9906da35
No related branches found
No related tags found
No related merge requests found
Showing
with 101 additions and 1675 deletions
...@@ -6,7 +6,7 @@ stages: ...@@ -6,7 +6,7 @@ stages:
before_script: before_script:
- apt-get -qq update - apt-get -qq update
- apt-cache search libmariadb - apt-cache search libmariadb
- apt-get install -y python3-virtualenv virtualenv opam redis-server redis-tools libev4 libev-dev libsndfile1 quelcom # mariadb-server libmariadbclient-dev - apt-get install -y python3-virtualenv virtualenv opam libev4 libev-dev libsndfile1 quelcom # mariadb-server libmariadbclient-dev
- /usr/bin/virtualenv venv - /usr/bin/virtualenv venv
- . venv/bin/activate - . venv/bin/activate
# - opam init --disable-sandboxing -y # - opam init --disable-sandboxing -y
......
...@@ -7,7 +7,6 @@ LABEL maintainer="David Trattnig <david.trattnig@subsquare.at>" ...@@ -7,7 +7,6 @@ LABEL maintainer="David Trattnig <david.trattnig@subsquare.at>"
RUN apt-get update && apt-get -y install \ RUN apt-get update && apt-get -y install \
apt-utils \ apt-utils \
redis-server \
ffmpeg \ ffmpeg \
quelcom \ quelcom \
build-essential \ build-essential \
......
...@@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---" ...@@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---"
db_host="localhost" db_host="localhost"
db_charset="utf8" db_charset="utf8"
[redis]
redis_host="localhost"
redis_port=6379
redis_db=0
[monitoring] [monitoring]
mail_server="mail.example.com" mail_server="mail.example.com"
......
...@@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---" ...@@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---"
db_host="127.0.0.1" db_host="127.0.0.1"
db_charset="utf8" db_charset="utf8"
[redis]
redis_host="127.0.0.1"
redis_port=6379
redis_db=0
[monitoring] [monitoring]
mail_server="mail.example.com" mail_server="mail.example.com"
......
...@@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---" ...@@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---"
db_host="localhost" db_host="localhost"
db_charset="utf8" db_charset="utf8"
[redis]
redis_host="localhost"
redis_port=6379
redis_db=0
[monitoring] [monitoring]
mail_server="mail.example.com" mail_server="mail.example.com"
......
...@@ -53,7 +53,6 @@ class AuraEngine: ...@@ -53,7 +53,6 @@ class AuraEngine:
AuraEngine does the following: AuraEngine does the following:
1. Initialize the engine and scheduler 1. Initialize the engine and scheduler
2. Initialize Redis
3. Start Liquidsoap in a separate thread which connects to the engine 3. Start Liquidsoap in a separate thread which connects to the engine
""" """
...@@ -86,38 +85,30 @@ class AuraEngine: ...@@ -86,38 +85,30 @@ class AuraEngine:
""" """
from modules.scheduling.scheduler import AuraScheduler from modules.scheduling.scheduler import AuraScheduler
from modules.core.engine import Engine from modules.core.engine import Engine
from modules.cli.redis.adapter import ServerRedisAdapter
# If Liquidsoap should be started automatically # If Liquidsoap should be started automatically
self.lqs_startup = lqs_startup self.lqs_startup = lqs_startup
# Check if the database has to be re-created # Check if the database has to be re-created
if self.config.get("recreate_db") is not None: if self.config.get("recreate_db") is not None:
AuraScheduler(self.config, None, None) AuraScheduler.init_database()
# Create scheduler and Liquidsoap communicator # Create scheduler and Liquidsoap communicator
self.engine = Engine(self.config) self.engine = Engine()
self.scheduler = AuraScheduler(self.config, self.engine, self.on_initialized)
# Sleep needed, because the socket is created too slowly by Liquidsoap
# Create the Redis adapter # time.sleep(1)
self.messenger = ServerRedisAdapter(self.config)
self.messenger.scheduler = self.scheduler # def on_initialized():
self.messenger.engine = self.engine # """
# Called when the engine is initialized, before the Liquidsoap connection is established."
# And finally wait for redis message / start listener thread # """
self.messenger.start() # self.logger.info(SU.green("Engine Core initialized - Waiting for Liquidsoap connection ..."))
# if self.lqs_startup:
# self.start_lqs(False, False)
# else:
# self.logger.info(SU.yellow("Please note, Liquidsoap needs to be started manually."))
def on_initialized(self):
"""
Called when the engine is initialized, before the Liquidsoap connection is established."
"""
self.logger.info(SimpleUtil.green("Engine Core initialized - Waiting for Liquidsoap connection ..."))
if self.lqs_startup:
self.start_lqs(False, False)
else:
self.logger.info(SimpleUtil.yellow("Please note, Liquidsoap needs to be started manually."))
def start_lqs(self, debug_output, verbose_output): def start_lqs(self, debug_output, verbose_output):
......
#!/bin/sh
''''which python3.8 >/dev/null 2>&1 && exec python3.8 "$0" "$@" # '''
''''which python3.7 >/dev/null 2>&1 && exec python3.7 "$0" "$@" # '''
''''exec echo "Error: Snaaakey Python, where are you?" # '''
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import sys
import redis
from argparse import ArgumentParser
from modules.cli.padavan import Padavan
from modules.base.exceptions import PlaylistException
from modules.base.config import AuraConfig
class Guru():
"""
Command Line Interface (CLI) for Aura Engine.
"""
# config_path = "%s/configuration/engine.ini" % Path(__file__).parent.absolute()
config = AuraConfig()
parser = None
args = None
# ------------------------------------------------------------------------------------------ #
def __init__(self):
self.init_argument_parser()
self.handle_arguments()
def handle_arguments(self):
if self.args.stoptime:
start = time.time()
if not self.args.quiet:
print("Guru thinking...")
try:
p = Padavan(self.args, self.config)
p.meditate()
except PlaylistException as pe:
# typically there is no next file found
if not self.args.quiet:
print(pe)
else:
print("")
exit(4)
except redis.exceptions.TimeoutError:
print("Timeout when waiting for redis message. Is AURA daemon running? Exiting...")
exit(3)
if not self.args.quiet:
print("...result: ")
if p.stringreply != "":
#print(p.stringreply)
if p.stringreply[len(p.stringreply)-1] == "\n":
print(p.stringreply[0:len(p.stringreply) - 1])
else:
print(p.stringreply[0:len(p.stringreply)])
if self.args.stoptime:
end = time.time()
exectime = end-start
print("execution time: "+str(exectime)+"s")
def init_argument_parser(self):
try:
self.create_parser()
self.args = self.parser.parse_args()
except (ValueError, TypeError) as e:
if self.parser is not None:
self.parser.print_help()
print()
print(e)
exit(1)
def create_parser(self):
self.parser = ArgumentParser()
# options
self.parser.add_argument("-sep", "--stop-execution-time", action="store_true", dest="stoptime", default=False, help="Prints the execution time at the end of the skript")
self.parser.add_argument("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Just the result will outputed to stout")
self.parser.add_argument("-rd", "--recreate-database", action="store_true", dest="recreatedb", default=False, help="Do you want to recreate the database?")
# getter
self.parser.add_argument("-pcs", "--print-connection-status", action="store_true", dest="get_connection_status", default=False, help="Prints the status of the connection to liquidsoap, pv and tank")
self.parser.add_argument("-gam", "--get-active-mixer", action="store_true", dest="mixer_channels_selected",default=False, help="Which mixer channels are selected?")
self.parser.add_argument("-pms", "--print-mixer-status", action="store_true", dest="mixer_status", default=False, help="Prints all mixer sources and their states")
self.parser.add_argument("-pap", "--print-act-programme", action="store_true", dest="get_act_programme", default=False, help="Prints the actual Programme, the controller holds")
self.parser.add_argument("-s", "--status", action="store_true", dest="get_status", default=False, help="Returns the Engine Status as JSON")
# liquid manipulation
self.parser.add_argument("-am", "--select-mixer", action="store", dest="select_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?")
self.parser.add_argument("-dm", "--de-select-mixer", action="store", dest="deselect_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?")
self.parser.add_argument("-vm", "--volume", action="store", dest="set_volume", default=0, metavar=("MIXERNUM", "VOLUME"), nargs=2, help="Set volume of a mixer source", type=int)
# shutdown server
self.parser.add_argument("-sd", "--shutdown", action="store_true", dest="shutdown", default=False, help="Shutting down aura server")
# playlist in/output
self.parser.add_argument("-fnp", "--fetch-new-programmes", action="store_true", dest="fetch_new_programme", default=False, help="Fetch new programmes from api_steering_calendar in engine.ini")
self.parser.add_argument("-pmq", "--print-message-queue", action="store_true", dest="print_message_queue", default=False, help="Prints message queue")
# send a redis message
self.parser.add_argument("-rm", "--redis-message", action="store", dest="redis_message", default=False, metavar=("CHANNEL", "MESSAGE"), nargs=2, help="Send a redis message to the Listeners")
# calls from liquidsoap
self.parser.add_argument("-gnf", "--get-next-file-for", action="store", dest="get_file_for", default=False, metavar="PLAYLISTTYPE", help="For which type you wanna GET a next audio file?")
self.parser.add_argument("-snf", "--set-next-file-for", action="store", dest="set_file_for", default=False, metavar=("PLAYLISTTYPE", "FILE"), nargs=2, help="For which type you wanna SET a next audio file?")
self.parser.add_argument("-np", "--now-playing", action="store_true", dest="now_playing", default=False, help="Which source is now playing")
self.parser.add_argument("-ip", "--init-player", action="store_true", dest="init_player", default=False, help="Reset liquidsoap volume and mixer activations?")
self.parser.add_argument("-ts", "--on_play", action="store", dest="on_play", default=False, metavar="INFO", help="Event handling when some entry started playing")
if len(sys.argv) == 1:
raise ValueError("No Argument passed!")
def valid_playlist_entry(argument):
from datetime import datetime
try:
index = int(argument[0])
fromtime = datetime.strptime(argument[1], "%Y-%m-%d")
source = argument[2]
return index, fromtime, source
except:
msg = "Not a valid date: '{0}'.".format(argument[0])
print(msg)
raise
# # ## ## ## ## ## # #
# # ENTRY FUNCTION # #
# # ## ## ## ## ## # #
def main():
Guru()
# # ## ## ## ## ## ## # #
# # End ENTRY FUNCTION # #
# # ## ## ## ## ## ## # #
if __name__ == "__main__":
main()
...@@ -51,6 +51,3 @@ class LQConnectionError(Exception): ...@@ -51,6 +51,3 @@ class LQConnectionError(Exception):
class LQStreamException(Exception): class LQStreamException(Exception):
pass pass
class RedisConnectionException(Exception):
pass
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from modules.cli.redis.channels import RedisChannel
from modules.base.utils import TerminalColors
from modules.cli.redis.adapter import ClientRedisAdapter, ServerRedisAdapter
from modules.base.models import AuraDatabaseModel
class Padavan:
args = None
config = None
ss = None
zmqclient = None
redisclient = None
stringreply = ""
# ------------------------------------------------------------------------------------------ #
def __init__(self, args, config):
self.args = args
self.config = config
# ------------------------------------------------------------------------------------------ #
def meditate(self):
if self.args.fetch_new_programme:
self.fetch_new_programme()
elif self.args.mixer_channels_selected:
self.mixer_channels_selected()
elif self.args.mixer_status:
self.mixer_status()
# elif self.args.get_act_programme:
# self.get_act_programme()
elif self.args.get_status:
self.get_status()
elif self.args.get_connection_status:
self.get_connection_status()
elif self.args.shutdown:
self.shutdown()
elif self.args.redis_message:
self.redis_message(self.args.redis_message[0], self.args.redis_message[1])
elif self.args.select_mixer != -1:
self.select_mixer(self.args.select_mixer)
elif self.args.deselect_mixer != -1:
self.select_mixer(self.args.deselect_mixer, False)
elif self.args.set_volume:
self.set_volume(self.args.set_volume[0], self.args.set_volume[1])
elif self.args.print_message_queue:
self.print_message_queue()
elif self.args.get_file_for:
self.get_next_file(self.args.get_file_for)
elif self.args.set_file_for:
self.set_next_file(self.args.set_file_for[0], self.args.set_file_for[1])
elif self.args.now_playing:
print("")
elif self.args.init_player:
self.init_player()
elif self.args.on_play:
self.on_play(self.args.on_play)
elif self.args.recreatedb:
self.recreatedb()
# else:
# raise Exception("")
# init liquid => faster exec time, when loading at runtime just what is needed
# ------------------------------------------------------------------------------------------ #
def init_liquidsoap_communication(self):
# import
from modules.core.engine import SoundSystem
# init liquidsoap communication
self.ss = SoundSystem(self.config)
# enable connection
self.ss.enable_transaction()
# ------------------------------------------------------------------------------------------ #
def destroy_liquidsoap_communication(self):
# enable connection
self.ss.disable_transaction()
# ------------------------------------------------------------------------------------------ #
def init_redis_communication(self, with_server=False):
self.redisclient = ClientRedisAdapter(self.config)
if with_server:
self.redisserver = ServerRedisAdapter(self.config)
# ------------------------------------------------------------------------------------------ #
def send_redis(self, channel, message):
self.init_redis_communication()
self.redisclient.publish(channel, message)
# ------------------------------------------------------------------------------------------ #
def send_and_wait_redis(self, channel, message, reply_channel):
self.init_redis_communication(True)
self.redisclient.publish(channel, message)
return self.redisserver.listen_for_one_message(reply_channel.value)
# ------------------------------------------------------------------------------------------ #
def shutdown(self):
self.send_redis("aura", "shutdown")
self.stringreply = "Shutdown message sent!"
# ------------------------------------------------------------------------------------------ #
def fetch_new_programme(self):
json_reply = self.send_and_wait_redis("aura", "fetch_new_programme", RedisChannel.FNP_REPLY)
if json_reply != "":
actprogramme = json.loads(json_reply)
# self.print_programme(actprogramme)
else:
print("No programme fetched")
# ------------------------------------------------------------------------------------------ #
# def get_act_programme(self):
# json_reply = self.send_and_wait_redis("aura", "get_act_programme", RedisChannel.GAP_REPLY)
# actprogramme = json.loads(json_reply)
# self.print_programme(actprogramme)
def get_status(self):
"""
Retrieves the Engine's status information.
"""
json_reply = self.send_and_wait_redis("aura", "get_status", RedisChannel.GS_REPLY)
# status = json.loads(json_reply)
self.stringreply = json_reply
# ------------------------------------------------------------------------------------------ #
def get_connection_status(self):
json_reply = self.send_and_wait_redis("aura", "get_connection_status", RedisChannel.GCS_REPLY)
connection_status = json.loads(json_reply)
self.print_connection_status(connection_status)
# ------------------------------------------------------------------------------------------ #
# def print_programme(self, programme):
# cnt = 1
# for show in programme:
# for entry in show["playlist"]:
# self.stringreply += str(cnt) + \
# " --- schedule id #" + str(show["schedule_id"]) + "." + str(entry["entry_num"]) + \
# " - show: " + show["show_name"] + \
# " - starts @ " + entry["entry_start"] + \
# " - plays " + str(entry["source"]) + "\n"
# cnt = cnt + 1
# ------------------------------------------------------------------------------------------ #
def print_connection_status(self, connection_status):
if connection_status["pv"]:
self.stringreply = "Connection to pv: " + TerminalColors.GREEN.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value
else:
self.stringreply = "Connection to pv: " + TerminalColors.RED.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value
if connection_status["db"]:
self.stringreply += "\nConnection to db: " + TerminalColors.GREEN.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to db: " + TerminalColors.RED.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value
if connection_status["lqs"]:
self.stringreply += "\nConnection to lqs: " + TerminalColors.GREEN.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to lqs: " + TerminalColors.RED.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value
if connection_status["lqsr"]:
self.stringreply += "\nConnection to lqsr: " + TerminalColors.GREEN.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to lqsr: " + TerminalColors.RED.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value
if connection_status["tank"]:
self.stringreply += "\nConnection to tank: " + TerminalColors.GREEN.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to tank: " + TerminalColors.RED.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value
if connection_status["redis"]:
self.stringreply += "\nConnection to redis: " + TerminalColors.GREEN.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to redis: " + TerminalColors.RED.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value
# ------------------------------------------------------------------------------------------ #
def init_player(self):
"""
Initializes the player on Liquidsaop startup.
"""
self.stringreply = self.send_and_wait_redis("aura", "init_player", RedisChannel.IP_REPLY)
def on_play(self, info):
"""
Event handler to be called when some entry started playing.
"""
self.stringreply = self.send_and_wait_redis("aura", "on_play " + info, RedisChannel.GNF_REPLY)
# ------------------------------------------------------------------------------------------ #
def recreatedb(self):
print("YOU WILL GET PROBLEMS DUE TO DATABASE BLOCKING IF aura.py IS RUNNING! NO CHECKS IMPLEMENTED SO FAR!")
x = AuraDatabaseModel()
x.recreate_db()
self.stringreply = "Database recreated!"
# ------------------------------------------------------------------------------------------ #
def redis_message(self, channel, message):
self.send_redis(channel, message)
self.stringreply = "Message '"+message+"' sent to channel '"+channel+"'"
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
self.stringreply = self.send_and_wait_redis("aura", "print_message_queue", RedisChannel.PMQ_REPLY)
# LIQUIDSOAP #
# ------------------------------------------------------------------------------------------ #
def select_mixer(self, mixername, activate=True):
# init lqs
self.init_liquidsoap_communication()
# select mixer and return the feedback
self.stringreply = self.ss.channel_activate(mixername, activate)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def set_volume(self, mixernumber, volume):
# init lqs and enable comm
self.init_liquidsoap_communication()
self.stringreply = self.ss.channel_volume(mixernumber, volume)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def mixer_channels_selected(self):
self.init_liquidsoap_communication()
am = self.ss.mixer_channels_selected()
if len(am) == 0:
self.destroy_liquidsoap_communication()
raise Exception("Guru recognized a problem: No active source!!!")
self.stringreply = str(am)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def mixer_status(self):
self.init_liquidsoap_communication()
status = self.ss.mixer_status()
for k, v in status.items():
self.stringreply += "source: " + k + "\t status: " + v + "\n"
# disable connection
self.destroy_liquidsoap_communication()
# REDIS #
# ------------------------------------------------------------------------------------------ #
def get_next_file(self, type):
# redis = RedisMessenger()
# next_file = redis.get_next_file_for(type)
# if next_file == "":
# next_file = "/var/audio/blank.flac"
# self.stringreply = next_file
#self.send_redis("aura", "set_next_file " + type)
next_file = self.send_and_wait_redis("aura", "get_next_file " + type, RedisChannel.GNF_REPLY)
self.stringreply = next_file
# ------------------------------------------------------------------------------------------ #
def set_next_file(self, type, file):
#from modules.cli.redis.messenger import RedisMessenger
#redis = RedisMessenger()
#redis.set_next_file_for(type, file)
self.send_redis("aura", "set_next_file " + type + " " + file)
self.stringreply = "Set "+file+" for fallback '"+type+"'"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import time
import json
from datetime import datetime
from threading import Event
import threading
import redis
from modules.cli.redis.messenger import RedisMessenger
from modules.cli.redis.statestore import RedisStateStore
# from modules.communication.connection_tester import ConnectionTester
from modules.base.exceptions import RedisConnectionException
from modules.cli.redis.channels import RedisChannel
from modules.base.utils import TerminalColors
# ------------------------------------------------------------------------------------------ #
class ServerRedisAdapter(threading.Thread, RedisMessenger):
debug = False
pubsub = None
config = None
redisdb = None
channel = ""
scheduler = None
redisclient = None
# connection_tester = None
engine = None
socket = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
threading.Thread.__init__(self)
RedisMessenger.__init__(self, config)
# init
#threading.Thread.__init__ (self)
self.config = config
self.shutdown_event = Event()
self.channel = RedisChannel.STANDARD.value
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
self.can_send = None
self.redisclient = ClientRedisAdapter(config)
# self.connection_tester = ConnectionTester()
# ------------------------------------------------------------------------------------------ #
def run(self):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"))
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(self.channel)
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
# listener loop
for item in self.pubsub.listen():
if item["type"] == "subscribe":
continue
self.logger.debug(TerminalColors.YELLOW.value + "received REDIS message: " + TerminalColors.ENDC.value + str(item))
item["channel"] = self.decode_if_needed(item["channel"])
item["data"] = self.decode_if_needed(item["data"])
try:
self.work(item)
except RedisConnectionException as rce:
self.logger.error(str(rce))
if not self.shutdown_event.is_set():
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
self.pubsub.unsubscribe()
if not self.shutdown_event.is_set():
self.logger.warning("unsubscribed from " + self.channel + " and finished")
# ------------------------------------------------------------------------------------------ #
def decode_if_needed(self, val):
if isinstance(val, bytes):
return val.decode("utf-8")
return val
# ------------------------------------------------------------------------------------------ #
def listen_for_one_message(self, channel, socket_timeout=2):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"), socket_timeout=socket_timeout)
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(channel)
try:
self.logger.debug("I am listening on channel '"+channel+"' for "+str(socket_timeout)+" seconds")
for item in self.pubsub.listen():
it = self.receive_message(item)
if it is not None:
break
except redis.exceptions.TimeoutError as te:
raise te
return item["data"]
# ------------------------------------------------------------------------------------------ #
def receive_message(self, item):
if item["type"] == "subscribe":
self.logger.info("i am subscribed to channel " + item["channel"].decode("utf-8"))
return None
item["channel"] = item["channel"].decode("utf-8")
if isinstance(item["data"], bytes):
item["data"] = item["data"].decode("utf-8")
self.pubsub.unsubscribe()
return item
# ------------------------------------------------------------------------------------------ #
def work(self, item):
if item["data"] == "fetch_new_programme":
self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.fetch_new_programme)
# self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "shutdown":
self.terminate()
elif item["data"] == "init_player":
self.execute(RedisChannel.IP_REPLY.value, self.engine.init_player)
# elif item["data"] == "get_act_programme":
# self.execute(RedisChannel.GAP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "get_status":
def get_status_string():
status = "No monitoring plugin available!"
if "monitor" in self.engine.plugins:
status = self.engine.plugins["monitor"].get_status()
return json.dumps(status)
self.execute(RedisChannel.GS_REPLY.value, get_status_string)
# elif item["data"] == "get_connection_status":
# self.execute(RedisChannel.GCS_REPLY.value, self.connection_tester.get_connection_status)
elif item["data"] == "print_message_queue":
self.execute(RedisChannel.PMQ_REPLY.value, self.scheduler.print_message_queue)
elif item["data"].find("set_next_file") >= 0:
playlist = item["data"].split()[1]
playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.SNF_REPLY.value, self.scheduler.set_next_file_for, playlist)
elif item["data"].find("get_next_file") >= 0:
playlist = item["data"].split()[1]
#playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist)
elif item["data"].find("on_play") >= 0:
source = item["data"].split("on_play ")[1]
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.engine.player.on_play, source)
elif item["data"] == "recreate_db":
self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database)
elif item["data"] == "status":
return True
else:
raise RedisConnectionException("ServerRedisAdapter Cannot understand command: " + item["data"])
# ------------------------------------------------------------------------------------------ #
def execute(self, channel, f, param1=None, param2=None, param3=None):
if param1 != None:
if param2 != None:
if param3 != None:
reply = f(param1, param2, param3)
else:
reply = f(param1, param2)
else:
reply = f(param1)
else:
reply = f()
if reply is None:
reply = ""
# sometimes the sender is faster than the receiver. redis messages would be lost
time.sleep(0.1)
self.logger.debug(TerminalColors.YELLOW.value + "replying REDIS message " + TerminalColors.ENDC.value + reply + TerminalColors.YELLOW.value + " on channel " + channel + TerminalColors.ENDC.value)
# publish
self.redisclient.publish(channel, reply)
# ------------------------------------------------------------------------------------------ #
def join_comm(self):
try:
while self.is_alive():
self.logger.debug(str(datetime.now())+" joining")
self.join()
self.logger.warning("join out")
except (KeyboardInterrupt, SystemExit):
# Dem Server den Shutdown event setzen
# server.shutdown_event.set()
# Der Server wartet auf Eingabe
# Daher einen Client initiieren, der eine Nachricht schickt
self.halt()
sys.exit('Terminated')
# ------------------------------------------------------------------------------------------ #
def halt(self):
"""
Stop the server
"""
if self.shutdown_event.is_set():
return
self.shutdown_event.set()
try:
self.socket.unbind("tcp://"+self.ip+":"+self.port)
except:
pass
# self.socket.close()
# ------------------------------------------------------------------------------------------ #
def send(self, message):
"""
Send a message to the client
:param message: string
"""
# FIXME Review logic
if not self.can_send:
self.logger.debug("sending a "+str(len(message))+" long message via REDIS.")
self.socket.send(message.encode("utf-8"))
self.can_send = False
else:
self.logger.warning("cannot send message via REDIS: "+str(message))
def terminate(self):
"""
Called when thread is stopped or a signal to terminate is received.
"""
self.shutdown_event.set()
self.scheduler.terminate()
self.pubsub.close()
self.logger.info("Shutdown event received. Bye bye ...")
# ------------------------------------------------------------------------------------------ #
class ClientRedisAdapter(RedisMessenger):
def __init__(self, config):
RedisMessenger.__init__(self, config)
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
if type(channel) == RedisChannel:
channel = channel.value
self.rstore.publish(channel, message)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
class RedisChannel(Enum):
STANDARD = "aura"
DPE_REPLY = "delete_playlist_entry_reply"
FNP_REPLY = "fetch_new_programme_reply"
# GAP_REPLY = "get_act_programme_reply"
GS_REPLY = "get_status_reply"
GCS_REPLY = "get_connection_status_reply"
GNF_REPLY = "get_next_file_reply"
IPE_REPLY = "insert_playlist_entry_reply"
IP_REPLY = "init_player_reply"
TS_REPLY = "track_service_reply"
MPE_REPLY = "move_playlist_entry_reply"
PMQ_REPLY = "print_message_queue_reply"
RDB_REPLY = "recreate_database_reply"
SNF_REPLY = "get_next_file_reply"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from modules.cli.redis.statestore import RedisStateStore
from modules.cli.redis.channels import RedisChannel
"""
Send and receive redis messages
"""
# ------------------------------------------------------------------------------------------ #
class RedisMessenger():
logger = None
rstore = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
super(RedisMessenger, self).__init__()
"""
Constructor
"""
self.logger = logging.getLogger("AuraEngine")
self.channel = RedisChannel.STANDARD
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Einen "Kanal" setzen - zb scheduling
@type channel: string
@param channel: Kanal/Name der Komponente
"""
self.channel = channel
if channel in self.components:
self.errnr = self.components[channel]
self.rstore.set_channel(channel)
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
@type section: string
@param section: Gültigkeitsbereich
"""
self.section = section
# # ------------------------------------------------------------------------------------------ #
# def set_mail_addresses(self, fromMail, adminMails):
# """
# Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
# @type section: string
# @param section: Gültigkeitsbereich
# """
# self.fromMail = fromMail
# self.adminMails = adminMails
# # ------------------------------------------------------------------------------------------ #
# def send(self, message, code, level, job, value='', section=''):
# """
# Eine Message senden
# @type message: string
# @param message: menschenverständliche Nachricht
# @type code: string
# @param code: Fehlercode - endet mit 00 bei Erfolg
# @type level: string
# @param level: Error-Level - info, warning, error, fatal
# @type job: string
# @param job: Name der ausgeführten Funktion
# @type value: string
# @param value: Ein Wert
# @type section: string
# @param section: Globale Sektion überschreiben
# """
# section = self.section if section == '' else section
# self.time = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
# self.utime = time.time()
# state = {'message':message.strip().replace("'","\\'"), 'code':self.errnr + str(code),'job':job,'value':value}
# self.rstore.set_section(section)
# self.rstore.store(level, state)
# if level == 'info' or level == 'success':
# self.logger.info(message)
# elif level == 'warning':
# self.logger.warning(message)
# elif level == 'error':
# self.logger.error(message)
# self.send_admin_mail(level, message, state)
# elif level == 'fatal':
# self.logger.critical(message)
# self.send_admin_mail(level, message, state)
# # ------------------------------------------------------------------------------------------ #
# def say_alive(self):
# """
# Soll alle 20 Sekunden von den Komponenten ausgeführt werden,
# um zu melden, dass sie am Leben sind
# """
# self.rstore.set_alive_state()
# # ------------------------------------------------------------------------------------------ #
# def get_alive_state(self, channel):
# """
# Live State abfragen
# @type channel: string
# @param channel: Channel/Komponente
# """
# return self.rstore.get_alive_state(channel)
# # ------------------------------------------------------------------------------------------ #
# def set_state(self, name, value, expires=None, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: Name des state
# @type value: string
# @param value: Wert
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.set_state(name, value, expires, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_add_event(self, name, eventtime, value, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: der Name des Events
# @type eventtime: string|datetime.datetime
# @param eventtime: Datum und Zeit des events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# if type(eventtime) == type(str()):
# eventtime_str = datetime.datetime.strptime(eventtime[0:16].replace(' ','T'), "%Y-%m-%dT%H:%M").strftime("%Y-%m-%dT%H:%M")
# elif type(eventtime) is datetime.datetime:
# eventtime_str = eventtime.strftime("%Y-%m-%dT%H:%M")
# else:
# raise TypeError('eventtime must be a datetime.date or a string, not a %s' % type(eventtime))
# self.rstore.queue_add_event(eventtime_str, name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_remove_events(self, name, channel=None):
# """
# Löscht Events
# @type name: string
# @param name: der Name des Events
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.queue_remove_events(name, channel)
# # ------------------------------------------------------------------------------------------ #
# def fire_event(self, name, value, channel=None):
# """
# Feuert einen Event
# @type name: string
# @param name: der Name des Events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.fire_event(name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def get_event_queue(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# queue = self.rstore.get_event_queue(name, channel)
# return queue
# # ------------------------------------------------------------------------------------------ #
# def get_events(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# events = self.rstore.get_events(name, channel)
# return events
# # ------------------------------------------------------------------------------------------ #
# def get_event(self, name=None, channel=None):
# """
# Holt event eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: dict
# @return: Event
# """
# events = self.rstore.get_events(name, channel)
# result = False
# if events:
# result = events.pop(0)
# return result
# # ------------------------------------------------------------------------------------------ #
# def send_admin_mail(self, level, message, state):
# """
# Sendent mail an Admin(s),
# @type message: string
# @param message: Die Message
# @type state: dict
# @param state: Der State
# @return result
# """
# # FIXME Make Mailer functional: Invalid constructor
# if self.fromMail and self.adminMails:
# #subject = "Possible comba problem on job " + state['job'] + " - " + level
# mailmessage = "Hi Admin,\n comba reports a possible problem\n\n"
# mailmessage = mailmessage + level + "!\n"
# mailmessage = mailmessage + message + "\n\n"
# mailmessage = mailmessage + "Additional information:\n"
# mailmessage = mailmessage + "##################################################\n"
# mailmessage = mailmessage + "Job:\t" + state['job'] + "\n"
# mailmessage = mailmessage + "Code:\t" + state['code'] + "\n"
# mailmessage = mailmessage + "Value:\t" + str(state['value']) + "\n"
# #mailer = AuraMailer(self.adminMails, self.fromMail)
# #mailer.send_admin_mail(subject, mailmessage)
# else:
# return False
# # ------------------------------------------------------------------------------------------ #
# def receive(self):
# """
# Bisher wird nichts empfangen
# """
# return ""
# # ------------------------------------------------------------------------------------------ #
# def get_next_file_for(self, playlisttype):
# next = self.rstore.db.get('next_'+playlisttype+'file')
# if next is None:
# next = b""
# return next.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def on_play(self, info):
# result = self.rstore.db.get('on_play')
# if result is None:
# result = b""
# return result.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def set_next_file_for(self, playlisttype, file):
# self.rstore.db.set("next_" + playlisttype + "file", file)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import datetime
import json
import re
import uuid
import redis
class RedisStateStore(object):
"""Store and get Reports from redis"""
def __init__(self, config, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
self.channel = '*'
self.section = '*'
self.separator = '_'
self.daily = False
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Kanal setzen
@type channel: string
@param channel: Kanal
"""
self.channel = channel
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Sektion setzen
@type section: string
@param section: Sektion
"""
self.section = section
# ------------------------------------------------------------------------------------------ #
def set_alive_state(self):
"""
Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
"""
self.set_state('alive', 'Hi', 21)
# ------------------------------------------------------------------------------------------ #
def get_alive_state(self, channel):
"""
Alive Status eines Channels ermitteln
@type channel: string
@param channel: der Channel
@rtype: string/None
@return: Ein String, oder None, bei negativem Ergebnis
"""
return self.get_state('alive', channel)
# ------------------------------------------------------------------------------------------ #
def set_state(self, name, value, expires=None, channel=None):
"""
Setzt einen Status
@type name: string
@param name: Name des state
@type value: string
@param value: Wert
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
key = self.__create_key__(channel + 'State', name)
if value == "":
self.db.delete(key)
else:
# publish on channel
message = json.dumps({'eventname':name, 'value': value})
self.db.publish(channel + 'Publish', message)
# store in database
self.db.set(key, value)
if(expires):
self.db.expire(key, 21)
# ------------------------------------------------------------------------------------------ #
def get_state(self, name, channel):
"""
Holt einen Status
@type name: string
@param name: Name des state
@type channel: string
@param channel: Kanal (optional)
"""
key = self.__create_key__(channel + 'State', name)
return self.db.get(key)
# ------------------------------------------------------------------------------------------ #
def queue_add_event(self, eventtime, name, value, channel=None):
"""
Kündigt einen Event an
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)
# ------------------------------------------------------------------------------------------ #
def queue_remove_events(self, name=None, channel=None):
"""
Löscht Events
@type name: string
@param name: Name des Events
@type channel: string
@param channel: Kanal (optional)
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
for delkey in keys:
self.db.delete(delkey)
# ------------------------------------------------------------------------------------------ #
def fire_event(self, name, value, channel=None):
"""
Feuert einen Event
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)
# ------------------------------------------------------------------------------------------ #
def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
"""
Feuert einen Event
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
key = self.__create_key__(channel + type, eventtime, name)
value['starts'] = eventtime[0:16]
value['eventchannel'] = channel
value['eventname'] = name
self.db.hset(key, namespace, value)
self.db.expire(key, expire)
# ------------------------------------------------------------------------------------------ #
def get_event_queue(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'evqueue')
return entries
# ------------------------------------------------------------------------------------------ #
def get_events(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Event_' if channel else '*Event_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'events')
return entries
# ------------------------------------------------------------------------------------------ #
def get_next_event(self, name=None, channel=None):
"""
Holt den aktuellsten Event
@type channel: string
@param channel: Kanal (optional)
@rtype: dict/boolean
@return: ein Event oder False
"""
events = self.get_event_queue(name, channel)
if len(events) > 0:
result = events.pop(0)
else:
result = False
return result
# ------------------------------------------------------------------------------------------ #
def store(self, level, value):
"""
Hash speichern
@type level: string
@param level: der errorlevel
@type value: dict
@param value: Werte als dict
"""
microtime = str(time.time())
value['microtime'] = microtime
value['level'] = level
key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
self.db.hset(key, self.channel, value)
self.db.expire(key, 864000)
# ------------------------------------------------------------------------------------------ #
def __get_keys__(self, level ='*'):
"""
Redis-Keys nach Suchkriterium ermitteln
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Die Keys auf die das Suchkriterium zutrifft
"""
key = self.__create_key__(self.channel, self.section, level)
microtime = str(time.time())
search = microtime[0:4] + '*' if self.daily else '*'
return self.db.keys(key + self.separator + '*')
# ------------------------------------------------------------------------------------------ #
def __create_key__(self, *args):
"""
Key erschaffen - beliebig viele Argumente
@rtype: string
@return: Der key
"""
return self.separator.join(args)
def get_entries(self, level ='*'):
"""
Liste von Hashs nach Suchkriterium erhalten
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Redis Hashs
"""
def tsort(x,y):
if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
return 1
elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
return -1
else:
return 0
keys = self.__get_keys__(level)
keys.sort(tsort)
entries = self.__get_entries__(keys, self.channel)
entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
return entries
# ------------------------------------------------------------------------------------------ #
def __get_entries__(self, keys, channel):
entries = []
for key in keys:
entry = self.db.hget(key,channel)
entry = json.dumps(entry.decode('utf-8'))
if not (entry is None):
try:
entry = entry.decode('utf-8').replace('None','"None"')
entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
entry = json.loads(entry)
entry['key'] = key
entries.append(entry)
except:
pass
return entries
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)
if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
raise Exception("No subscriber! Is Aura daemon running?")
self.db.publish(channel, message)
...@@ -41,9 +41,10 @@ class EngineControlInterface: ...@@ -41,9 +41,10 @@ class EngineControlInterface:
config = None config = None
logger = None logger = None
engine = None engine = None
event_dispatcher = None
sci = None sci = None
def __init__(self, engine): def __init__(self, engine, event_dispatcher):
""" """
Constructor Constructor
...@@ -51,10 +52,12 @@ class EngineControlInterface: ...@@ -51,10 +52,12 @@ class EngineControlInterface:
config (AuraConfig): Engine configuration config (AuraConfig): Engine configuration
logger (AuraLogger): The logger logger (AuraLogger): The logger
""" """
self.engine = engine
self.config = AuraConfig.config() self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.sci = SocketControlInterface.get_instance(engine)
self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ...")) self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
self.sci = SocketControlInterface.get_instance(event_dispatcher)
def terminate(self): def terminate(self):
...@@ -79,10 +82,10 @@ class SocketControlInterface: ...@@ -79,10 +82,10 @@ class SocketControlInterface:
config = None config = None
logger = None logger = None
server = None server = None
engine = None event_dispatcher = None
def __init__(self, engine): def __init__(self, event_dispatcher):
""" """
Constructor Constructor
...@@ -96,28 +99,22 @@ class SocketControlInterface: ...@@ -96,28 +99,22 @@ class SocketControlInterface:
SocketControlInterface.instance = self SocketControlInterface.instance = self
self.config = AuraConfig.config() self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.engine = engine self.event_dispatcher = event_dispatcher
host = "127.0.0.1" host = "127.0.0.1"
thread = Thread(target = self.run, args = (self.logger, host)) thread = Thread(target = self.run, args = (self.logger, host))
thread.start() thread.start()
@staticmethod @staticmethod
def get_instance(engine): def get_instance(event_dispatcher):
""" """
Returns the Singleton. Returns the Singleton.
""" """
if not SocketControlInterface.instance: if not SocketControlInterface.instance:
SocketControlInterface.instance = SocketControlInterface(engine) SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
return SocketControlInterface.instance return SocketControlInterface.instance
def attach(self, engine):
"""
Attaches the engine to pass events to.
"""
self.engine = engine
def run(self, logger, host): def run(self, logger, host):
""" """
...@@ -164,7 +161,7 @@ class SocketControlInterface: ...@@ -164,7 +161,7 @@ class SocketControlInterface:
meta_data = data["data"] meta_data = data["data"]
meta_data["duration"] = data["track_duration"] meta_data["duration"] = data["track_duration"]
logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA)) logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
self.engine.event_dispatcher.on_metadata(data["data"]) self.event_dispatcher.on_metadata(data["data"])
logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully")) logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
else: else:
logger.error(SU.red("[ECI] Unknown action: " + data["action"])) logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
......
...@@ -25,13 +25,13 @@ from threading import Thread ...@@ -25,13 +25,13 @@ from threading import Thread
import meta import meta
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU from modules.base.utils import SimpleUtil as SU
from modules.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, \ from modules.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, \
LoadSourceException LoadSourceException
from modules.core.resources import ResourceClass, ResourceUtil from modules.core.resources import ResourceClass, ResourceUtil
from modules.core.channels import ChannelType, TransitionType, LiquidsoapResponse, \ from modules.core.channels import ChannelType, TransitionType, LiquidsoapResponse, \
EntryPlayState, ResourceType, ChannelRouter EntryPlayState, ResourceType, ChannelRouter
from modules.core.startup import StartupThread
from modules.core.events import EngineEventDispatcher from modules.core.events import EngineEventDispatcher
from modules.core.control import EngineControlInterface from modules.core.control import EngineControlInterface
from modules.core.mixer import Mixer, MixerType from modules.core.mixer import Mixer, MixerType
...@@ -46,52 +46,48 @@ class Engine(): ...@@ -46,52 +46,48 @@ class Engine():
""" """
instance = None instance = None
engine_time_offset = 0.0 engine_time_offset = 0.0
logger = None logger = None
eci = None eci = None
channels = None channels = None
channel_router = None channel_router = None
scheduler = None scheduler = None
event_dispatcher = None event_dispatcher = None
is_liquidsoap_running = False
plugins = None plugins = None
connector = None connector = None
def __init__(self, config):
def __init__(self):
""" """
Constructor Constructor
Args:
config (AuraConfig): The configuration
""" """
if Engine.instance: if Engine.instance:
raise Exception("Engine is already running!") raise Exception("Engine is already running!")
Engine.instance = self Engine.instance = self
self.config = config self.logger = logging.getLogger("AuraEngine")
self.plugins = dict() self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") Engine.engine_time_offset = self.config.get("lqs_delay_offset")
self.eci = EngineControlInterface(self)
self.plugins = dict()
self.is_active() # TODO Check if it makes sense to move it to the boot-phase
self.channel_router = ChannelRouter(self.config, self.logger) self.channel_router = ChannelRouter(self.config, self.logger)
Engine.engine_time_offset = self.config.get("lqs_delay_offset") self.start()
def start(self): def start(self):
""" """
Starts the engine. Called when the connection to the sound-system implementation Starts the engine. Called when the connection to the sound-system implementation
has been established. has been established.
""" """
self.event_dispatcher = EngineEventDispatcher(self, self.scheduler) self.event_dispatcher = EngineEventDispatcher(self)
self.eci = EngineControlInterface(self, self.event_dispatcher)
# Sleep needed, because the socket is created too slowly by Liquidsoap
time.sleep(1)
self.player = Player(self.config, self.event_dispatcher) self.player = Player(self.config, self.event_dispatcher)
self.is_liquidsoap_running = True
self.event_dispatcher.on_initialized() self.event_dispatcher.on_initialized()
while not self.is_connected():
self.logger.info(SU.yellow("Waiting for Liquidsoap to be running ..."))
time.sleep(2)
self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap")) self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap"))
self.event_dispatcher.on_boot() self.event_dispatcher.on_boot()
self.logger.info(EngineSplash.splash_screen("Engine Core", meta.__version__)) self.logger.info(EngineSplash.splash_screen("Engine Core", meta.__version__))
self.event_dispatcher.on_ready() self.event_dispatcher.on_ready()
...@@ -103,35 +99,20 @@ class Engine(): ...@@ -103,35 +99,20 @@ class Engine():
# #
def init_player(self): def is_connected(self):
""" """
Initializes the LiquidSoap Player after startup of the engine. Checks if there's a valid connection to Liquidsoap.
Returns:
(String): Message that the player is started.
"""
t = StartupThread(self)
t.start()
return "Engine Core startup done!"
def is_active(self):
"""
Checks if Liquidsoap is running
""" """
has_connection = False
try: try:
self.uptime() self.uptime()
self.is_liquidsoap_running = True has_connection = True
except LQConnectionError as e: except LQConnectionError as e:
self.logger.info("Liquidsoap is not running so far") self.logger.info("Liquidsoap is not running so far")
self.is_liquidsoap_running = False
except Exception as e: except Exception as e:
self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e)) self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e))
self.is_liquidsoap_running = False
return self.is_liquidsoap_running return has_connection
def engine_state(self): def engine_state(self):
...@@ -154,7 +135,9 @@ class Engine(): ...@@ -154,7 +135,9 @@ class Engine():
""" """
Retrieves the uptime of Liquidsoap. Retrieves the uptime of Liquidsoap.
""" """
self.player.connector.enable_transaction()
data = self.player.connector.send_lqc_command("uptime", "") data = self.player.connector.send_lqc_command("uptime", "")
self.player.connector.disable_transaction()
return data return data
...@@ -359,21 +342,10 @@ class Player: ...@@ -359,21 +342,10 @@ class Player:
self.connector.disable_transaction() self.connector.disable_transaction()
Thread(target=clean_up).start() Thread(target=clean_up).start()
# Filesystem meta-changes trigger the event via Liquidsoap # Filesystem meta-changes trigger the event via Liquidsoap, so only
# issue event for LIVE and STREAM:
if not entry.channel in ChannelType.QUEUE.channels: if not entry.channel in ChannelType.QUEUE.channels:
self.on_play(entry) self.event_dispatcher.on_play(entry)
def on_play(self, source):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing.
Args:
source (String): The `Entry` or URI or of the media source currently being played
"""
self.event_dispatcher.on_play(source)
......
...@@ -20,15 +20,13 @@ ...@@ -20,15 +20,13 @@
import logging import logging
import datetime import datetime
from threading import Thread from threading import Thread
from modules.base.config import AuraConfig from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer from modules.base.mail import AuraMailer
from modules.plugins.monitor import AuraMonitor from modules.plugins.monitor import AuraMonitor
from modules.plugins.trackservice import TrackServiceHandler
from modules.plugins.trackservice import TrackServiceHandler
class EventBinding(): class EventBinding():
...@@ -45,7 +43,11 @@ class EventBinding(): ...@@ -45,7 +43,11 @@ class EventBinding():
dispatcher = None dispatcher = None
instance = None instance = None
def __init__(self, dispatcher, instance): def __init__(self, dispatcher, instance):
"""
Constructor
"""
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.instance = instance self.instance = instance
...@@ -80,16 +82,15 @@ class EngineEventDispatcher(): ...@@ -80,16 +82,15 @@ class EngineEventDispatcher():
monitor = None monitor = None
def __init__(self, engine, scheduler): def __init__(self, engine):
""" """
Initialize EventDispatcher Constructor
""" """
self.subscriber_registry = dict() self.subscriber_registry = dict()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.config = AuraConfig.config() self.config = AuraConfig.config()
self.mailer = AuraMailer(self.config) self.mailer = AuraMailer(self.config)
self.engine = engine self.engine = engine
self.scheduler = scheduler
binding = self.attach(AuraMonitor) binding = self.attach(AuraMonitor)
binding.subscribe("on_boot") binding.subscribe("on_boot")
...@@ -105,7 +106,7 @@ class EngineEventDispatcher(): ...@@ -105,7 +106,7 @@ class EngineEventDispatcher():
def attach(self, clazz): def attach(self, clazz):
""" """
Creates an intance of the given Class. Creates an instance of the given Class.
""" """
instance = clazz(self.engine) instance = clazz(self.engine)
return EventBinding(self, instance) return EventBinding(self, instance)
...@@ -145,16 +146,18 @@ class EngineEventDispatcher(): ...@@ -145,16 +146,18 @@ class EngineEventDispatcher():
def on_initialized(self): def on_initialized(self):
""" """
Called when the engine is initialized e.g. connected to Liquidsoap. Called when the engine is initialized, just before
""" """
self.logger.debug("on_initialized(..)") self.logger.debug("on_initialized(..)")
self.scheduler.on_initialized() from modules.scheduling.scheduler import AuraScheduler
self.scheduler = AuraScheduler(self.engine)
self.call_event("on_initialized", None) self.call_event("on_initialized", None)
def on_boot(self): def on_boot(self):
""" """
Called when the engine is starting up. This happens after the initialization step. Called when the engine is starting up. This happens after the initialization step.
Connection to Liquidsoap should be available here.
""" """
self.logger.debug("on_boot(..)") self.logger.debug("on_boot(..)")
self.call_event("on_boot", None) self.call_event("on_boot", None)
......
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import threading
from modules.base.exceptions import NoActiveScheduleException
from modules.base.utils import SimpleUtil as SU
class StartupThread(threading.Thread):
"""
StartupThread class.
Boots the engine and starts playing the current schedule.
"""
logger = None
active_entry = None
engine = None
def __init__(self, engine):
"""
Initialize the thread.
"""
threading.Thread.__init__(self)
self.logger = logging.getLogger("AuraEngine")
self.engine = engine
def run(self):
"""
Boots the soundsystem.
"""
try:
self.engine.start()
except NoActiveScheduleException as e:
self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.")
except Exception as e:
self.logger.error(SU.red("Error while initializing the soundsystem: " + str(e)), e)
...@@ -138,10 +138,3 @@ output_source = fallback_three ...@@ -138,10 +138,3 @@ output_source = fallback_three
# enable socket functions # enable socket functions
%include "serverfunctions.liq" %include "serverfunctions.liq"
########################
# start initialization #
########################
system('#{list.assoc(default="", "install_dir", ini)}/guru.py --init-player --quiet')
...@@ -30,7 +30,6 @@ from enum import Enum ...@@ -30,7 +30,6 @@ from enum import Enum
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
import meta import meta
from modules.cli.redis.adapter import ClientRedisAdapter
from modules.base.config import AuraConfig from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer from modules.base.mail import AuraMailer
...@@ -85,7 +84,6 @@ class AuraMonitor: ...@@ -85,7 +84,6 @@ class AuraMonitor:
self.status = dict() self.status = dict()
self.status["engine"] = dict() self.status["engine"] = dict()
self.status["lqs"] = dict() self.status["lqs"] = dict()
self.status["redis"] = dict()
self.status["api"] = dict() self.status["api"] = dict()
self.status["api"]["steering"] = dict() self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict() self.status["api"]["tank"] = dict()
...@@ -172,7 +170,6 @@ class AuraMonitor: ...@@ -172,7 +170,6 @@ class AuraMonitor:
try: try:
if self.status["lqs"]["active"] \ if self.status["lqs"]["active"] \
and self.status["lqs"]["mixer"]["in_filesystem_0"] \ and self.status["lqs"]["mixer"]["in_filesystem_0"] \
and self.status["redis"]["active"] \
and self.status["audio_source"]["exists"]: and self.status["audio_source"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value self.status["engine"]["status"] = MonitorResponseCode.OK.value
...@@ -245,10 +242,8 @@ class AuraMonitor: ...@@ -245,10 +242,8 @@ class AuraMonitor:
Refreshes the vital status info which are required for the engine to survive. Refreshes the vital status info which are required for the engine to survive.
""" """
self.engine.player.connector.enable_transaction() self.engine.player.connector.enable_transaction()
self.status["lqs"]["active"] = self.engine.is_active() self.status["lqs"]["active"] = self.engine.is_connected()
self.engine.player.connector.disable_transaction() self.engine.player.connector.disable_transaction()
self.status["redis"]["active"] = self.validate_redis_connection()
self.status["audio_source"] = self.validate_directory(self.config.get("audio_source_folder")) self.status["audio_source"] = self.validate_directory(self.config.get("audio_source_folder"))
# After first update start the Heartbeat Monitor # After first update start the Heartbeat Monitor
...@@ -320,19 +315,6 @@ class AuraMonitor: ...@@ -320,19 +315,6 @@ class AuraMonitor:
return True return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path): def validate_directory(self, dir_path):
""" """
Checks if a given directory is existing and holds content Checks if a given directory is existing and holds content
......
...@@ -168,8 +168,6 @@ class TrackServiceHandler(): ...@@ -168,8 +168,6 @@ class TrackServiceHandler():
""" """
current_playlist = self.engine.scheduler.get_active_playlist() current_playlist = self.engine.scheduler.get_active_playlist()
(past_timeslot, current_timeslot, next_timeslot) = self.playlog.get_timeslots() (past_timeslot, current_timeslot, next_timeslot) = self.playlog.get_timeslots()
# next_timeslot = self.engine.scheduler.get_next_timeslots(1)
# if next_timeslot: next_timeslot = next_timeslot[0]
data = dict() data = dict()
data["engine_source"] = self.config.get("api_engine_number") data["engine_source"] = self.config.get("api_engine_number")
...@@ -241,7 +239,31 @@ class Playlog: ...@@ -241,7 +239,31 @@ class Playlog:
self.engine = engine self.engine = engine
self.history = deque([None, None, None]) self.history = deque([None, None, None])
self.current_timeslot = {} self.current_timeslot = {}
self.set_timeslot(None) self.init_timeslot(None)
def init_timeslot(self, next_timeslot=None):
"""
Initializes the timeslot.
"""
data = {}
self.assign_fallback_playlist(data, None)
data["schedule_id"] = -1
data["show_id"] = -1
data["show_name"] = ""
if self.previous_timeslot:
data["schedule_start"] = self.previous_timeslot.get("schedule_end")
else:
data["schedule_start"] = datetime.now()
if next_timeslot:
data["schedule_end"] = next_timeslot.schedule_end
else:
# Fake the end, because the timeslot is actually not existing
data["schedule_end"] = datetime.now() + timedelta(hours=1)
def set_timeslot(self, timeslot): def set_timeslot(self, timeslot):
...@@ -280,21 +302,7 @@ class Playlog: ...@@ -280,21 +302,7 @@ class Playlog:
# Defaults for a not existing timeslot # Defaults for a not existing timeslot
else: else:
self.assign_fallback_playlist(data, None) self.init_timeslot(next_timeslot)
data["schedule_id"] = -1
data["show_id"] = -1
data["show_name"] = ""
if self.previous_timeslot:
data["schedule_start"] = self.previous_timeslot.get("schedule_end")
else:
data["schedule_start"] = datetime.now()
if next_timeslot:
data["schedule_end"] = next_timeslot.schedule_end
else:
# Fake the end, because the timeslot is actually not existing
data["schedule_end"] = datetime.now() + timedelta(hours=1)
# A valid following timeslot is available # A valid following timeslot is available
if next_timeslot: if next_timeslot:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment