From 97b13ddc684be3c71569fc59747c8b3d851a2d61 Mon Sep 17 00:00:00 2001 From: David Trattnig <david.trattnig@o94.at> Date: Wed, 28 Oct 2020 19:47:28 +0100 Subject: [PATCH] Refactoring to remove Redis. #39 --- .gitlab-ci.yml | 2 +- Dockerfile | 1 - configuration/sample-development.engine.ini | 4 - configuration/sample-docker.engine.ini | 4 - configuration/sample-production.engine.ini | 4 - engine-core.py | 39 +-- guru.py | 168 ---------- modules/base/exceptions.py | 3 - modules/cli/padavan.py | 319 ------------------- modules/cli/redis/adapter.py | 286 ----------------- modules/cli/redis/channels.py | 39 --- modules/cli/redis/messenger.py | 307 ------------------ modules/cli/redis/statestore.py | 326 -------------------- modules/core/control.py | 27 +- modules/core/engine.py | 86 ++---- modules/core/events.py | 31 +- modules/core/startup.py | 59 ---- modules/liquidsoap/engine.liq | 7 - modules/plugins/monitor.py | 20 +- modules/plugins/trackservice.py | 44 +-- modules/scheduling/scheduler.py | 66 ++-- requirements.txt | 1 - test/connection_tester.py | 20 -- 23 files changed, 131 insertions(+), 1732 deletions(-) delete mode 100755 guru.py delete mode 100644 modules/cli/padavan.py delete mode 100644 modules/cli/redis/adapter.py delete mode 100644 modules/cli/redis/channels.py delete mode 100644 modules/cli/redis/messenger.py delete mode 100644 modules/cli/redis/statestore.py delete mode 100644 modules/core/startup.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index bca0a12d..dbfd0a3a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -6,7 +6,7 @@ stages: before_script: - apt-get -qq update - apt-cache search libmariadb - - apt-get install -y python3-virtualenv virtualenv opam redis-server redis-tools libev4 libev-dev libsndfile1 quelcom # mariadb-server libmariadbclient-dev + - apt-get install -y python3-virtualenv virtualenv opam libev4 libev-dev libsndfile1 quelcom # mariadb-server libmariadbclient-dev - /usr/bin/virtualenv venv - . venv/bin/activate # - opam init --disable-sandboxing -y diff --git a/Dockerfile b/Dockerfile index 243fb820..0c9323ab 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,6 @@ LABEL maintainer="David Trattnig <david.trattnig@subsquare.at>" RUN apt-get update && apt-get -y install \ apt-utils \ - redis-server \ ffmpeg \ quelcom \ build-essential \ diff --git a/configuration/sample-development.engine.ini b/configuration/sample-development.engine.ini index 990ada0f..b3de4e33 100644 --- a/configuration/sample-development.engine.ini +++ b/configuration/sample-development.engine.ini @@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---" db_host="localhost" db_charset="utf8" -[redis] -redis_host="localhost" -redis_port=6379 -redis_db=0 [monitoring] mail_server="mail.example.com" diff --git a/configuration/sample-docker.engine.ini b/configuration/sample-docker.engine.ini index 06a6dc36..bdd0a188 100644 --- a/configuration/sample-docker.engine.ini +++ b/configuration/sample-docker.engine.ini @@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---" db_host="127.0.0.1" db_charset="utf8" -[redis] -redis_host="127.0.0.1" -redis_port=6379 -redis_db=0 [monitoring] mail_server="mail.example.com" diff --git a/configuration/sample-production.engine.ini b/configuration/sample-production.engine.ini index f5981e7b..bf58f432 100644 --- a/configuration/sample-production.engine.ini +++ b/configuration/sample-production.engine.ini @@ -16,10 +16,6 @@ db_pass="---SECRET--PASSWORD---" db_host="localhost" db_charset="utf8" -[redis] -redis_host="localhost" -redis_port=6379 -redis_db=0 [monitoring] mail_server="mail.example.com" diff --git a/engine-core.py b/engine-core.py index ae996997..5224196f 100755 --- a/engine-core.py +++ b/engine-core.py @@ -53,7 +53,6 @@ class AuraEngine: AuraEngine does the following: 1. Initialize the engine and scheduler - 2. Initialize Redis 3. Start Liquidsoap in a separate thread which connects to the engine """ @@ -86,38 +85,30 @@ class AuraEngine: """ from modules.scheduling.scheduler import AuraScheduler from modules.core.engine import Engine - from modules.cli.redis.adapter import ServerRedisAdapter # If Liquidsoap should be started automatically self.lqs_startup = lqs_startup # Check if the database has to be re-created if self.config.get("recreate_db") is not None: - AuraScheduler(self.config, None, None) + AuraScheduler.init_database() # Create scheduler and Liquidsoap communicator - self.engine = Engine(self.config) - self.scheduler = AuraScheduler(self.config, self.engine, self.on_initialized) - - # Create the Redis adapter - self.messenger = ServerRedisAdapter(self.config) - self.messenger.scheduler = self.scheduler - self.messenger.engine = self.engine - - # And finally wait for redis message / start listener thread - self.messenger.start() - - + self.engine = Engine() + + # Sleep needed, because the socket is created too slowly by Liquidsoap + # time.sleep(1) + + # def on_initialized(): + # """ + # Called when the engine is initialized, before the Liquidsoap connection is established." + # """ + # self.logger.info(SU.green("Engine Core initialized - Waiting for Liquidsoap connection ...")) + # if self.lqs_startup: + # self.start_lqs(False, False) + # else: + # self.logger.info(SU.yellow("Please note, Liquidsoap needs to be started manually.")) - def on_initialized(self): - """ - Called when the engine is initialized, before the Liquidsoap connection is established." - """ - self.logger.info(SimpleUtil.green("Engine Core initialized - Waiting for Liquidsoap connection ...")) - if self.lqs_startup: - self.start_lqs(False, False) - else: - self.logger.info(SimpleUtil.yellow("Please note, Liquidsoap needs to be started manually.")) def start_lqs(self, debug_output, verbose_output): diff --git a/guru.py b/guru.py deleted file mode 100755 index f0b8e981..00000000 --- a/guru.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/bin/sh -''''which python3.8 >/dev/null 2>&1 && exec python3.8 "$0" "$@" # ''' -''''which python3.7 >/dev/null 2>&1 && exec python3.7 "$0" "$@" # ''' -''''exec echo "Error: Snaaakey Python, where are you?" # ''' - -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import time -import sys - -import redis - -from argparse import ArgumentParser - -from modules.cli.padavan import Padavan -from modules.base.exceptions import PlaylistException -from modules.base.config import AuraConfig - - -class Guru(): - """ - Command Line Interface (CLI) for Aura Engine. - """ - # config_path = "%s/configuration/engine.ini" % Path(__file__).parent.absolute() - config = AuraConfig() - parser = None - args = None - - # ------------------------------------------------------------------------------------------ # - def __init__(self): - self.init_argument_parser() - self.handle_arguments() - - def handle_arguments(self): - if self.args.stoptime: - start = time.time() - - if not self.args.quiet: - print("Guru thinking...") - - try: - p = Padavan(self.args, self.config) - p.meditate() - except PlaylistException as pe: - # typically there is no next file found - if not self.args.quiet: - print(pe) - else: - print("") - exit(4) - except redis.exceptions.TimeoutError: - print("Timeout when waiting for redis message. Is AURA daemon running? Exiting...") - exit(3) - - if not self.args.quiet: - print("...result: ") - - if p.stringreply != "": - #print(p.stringreply) - if p.stringreply[len(p.stringreply)-1] == "\n": - print(p.stringreply[0:len(p.stringreply) - 1]) - else: - print(p.stringreply[0:len(p.stringreply)]) - - if self.args.stoptime: - end = time.time() - exectime = end-start - print("execution time: "+str(exectime)+"s") - - def init_argument_parser(self): - - try: - self.create_parser() - self.args = self.parser.parse_args() - - except (ValueError, TypeError) as e: - if self.parser is not None: - self.parser.print_help() - print() - print(e) - exit(1) - - def create_parser(self): - self.parser = ArgumentParser() - - # options - self.parser.add_argument("-sep", "--stop-execution-time", action="store_true", dest="stoptime", default=False, help="Prints the execution time at the end of the skript") - self.parser.add_argument("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Just the result will outputed to stout") - self.parser.add_argument("-rd", "--recreate-database", action="store_true", dest="recreatedb", default=False, help="Do you want to recreate the database?") - - # getter - self.parser.add_argument("-pcs", "--print-connection-status", action="store_true", dest="get_connection_status", default=False, help="Prints the status of the connection to liquidsoap, pv and tank") - self.parser.add_argument("-gam", "--get-active-mixer", action="store_true", dest="mixer_channels_selected",default=False, help="Which mixer channels are selected?") - self.parser.add_argument("-pms", "--print-mixer-status", action="store_true", dest="mixer_status", default=False, help="Prints all mixer sources and their states") - self.parser.add_argument("-pap", "--print-act-programme", action="store_true", dest="get_act_programme", default=False, help="Prints the actual Programme, the controller holds") - self.parser.add_argument("-s", "--status", action="store_true", dest="get_status", default=False, help="Returns the Engine Status as JSON") - - # liquid manipulation - self.parser.add_argument("-am", "--select-mixer", action="store", dest="select_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?") - self.parser.add_argument("-dm", "--de-select-mixer", action="store", dest="deselect_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?") - self.parser.add_argument("-vm", "--volume", action="store", dest="set_volume", default=0, metavar=("MIXERNUM", "VOLUME"), nargs=2, help="Set volume of a mixer source", type=int) - - # shutdown server - self.parser.add_argument("-sd", "--shutdown", action="store_true", dest="shutdown", default=False, help="Shutting down aura server") - - # playlist in/output - self.parser.add_argument("-fnp", "--fetch-new-programmes", action="store_true", dest="fetch_new_programme", default=False, help="Fetch new programmes from api_steering_calendar in engine.ini") - self.parser.add_argument("-pmq", "--print-message-queue", action="store_true", dest="print_message_queue", default=False, help="Prints message queue") - - # send a redis message - self.parser.add_argument("-rm", "--redis-message", action="store", dest="redis_message", default=False, metavar=("CHANNEL", "MESSAGE"), nargs=2, help="Send a redis message to the Listeners") - - # calls from liquidsoap - self.parser.add_argument("-gnf", "--get-next-file-for", action="store", dest="get_file_for", default=False, metavar="PLAYLISTTYPE", help="For which type you wanna GET a next audio file?") - self.parser.add_argument("-snf", "--set-next-file-for", action="store", dest="set_file_for", default=False, metavar=("PLAYLISTTYPE", "FILE"), nargs=2, help="For which type you wanna SET a next audio file?") - self.parser.add_argument("-np", "--now-playing", action="store_true", dest="now_playing", default=False, help="Which source is now playing") - - self.parser.add_argument("-ip", "--init-player", action="store_true", dest="init_player", default=False, help="Reset liquidsoap volume and mixer activations?") - self.parser.add_argument("-ts", "--on_play", action="store", dest="on_play", default=False, metavar="INFO", help="Event handling when some entry started playing") - - if len(sys.argv) == 1: - raise ValueError("No Argument passed!") - - -def valid_playlist_entry(argument): - from datetime import datetime - - try: - - index = int(argument[0]) - fromtime = datetime.strptime(argument[1], "%Y-%m-%d") - source = argument[2] - return index, fromtime, source - except: - msg = "Not a valid date: '{0}'.".format(argument[0]) - print(msg) - raise - - -# # ## ## ## ## ## # # -# # ENTRY FUNCTION # # -# # ## ## ## ## ## # # -def main(): - Guru() -# # ## ## ## ## ## ## # # -# # End ENTRY FUNCTION # # -# # ## ## ## ## ## ## # # - - -if __name__ == "__main__": - main() diff --git a/modules/base/exceptions.py b/modules/base/exceptions.py index ec60c36c..0495625e 100644 --- a/modules/base/exceptions.py +++ b/modules/base/exceptions.py @@ -51,6 +51,3 @@ class LQConnectionError(Exception): class LQStreamException(Exception): pass -class RedisConnectionException(Exception): - pass - diff --git a/modules/cli/padavan.py b/modules/cli/padavan.py deleted file mode 100644 index e566c44e..00000000 --- a/modules/cli/padavan.py +++ /dev/null @@ -1,319 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import json - -from modules.cli.redis.channels import RedisChannel -from modules.base.utils import TerminalColors -from modules.cli.redis.adapter import ClientRedisAdapter, ServerRedisAdapter -from modules.base.models import AuraDatabaseModel - - -class Padavan: - args = None - config = None - - ss = None - zmqclient = None - redisclient = None - stringreply = "" - - # ------------------------------------------------------------------------------------------ # - def __init__(self, args, config): - self.args = args - self.config = config - - # ------------------------------------------------------------------------------------------ # - def meditate(self): - if self.args.fetch_new_programme: - self.fetch_new_programme() - - elif self.args.mixer_channels_selected: - self.mixer_channels_selected() - - elif self.args.mixer_status: - self.mixer_status() - - # elif self.args.get_act_programme: - # self.get_act_programme() - - elif self.args.get_status: - self.get_status() - - elif self.args.get_connection_status: - self.get_connection_status() - - elif self.args.shutdown: - self.shutdown() - - elif self.args.redis_message: - self.redis_message(self.args.redis_message[0], self.args.redis_message[1]) - - elif self.args.select_mixer != -1: - self.select_mixer(self.args.select_mixer) - - elif self.args.deselect_mixer != -1: - self.select_mixer(self.args.deselect_mixer, False) - - elif self.args.set_volume: - self.set_volume(self.args.set_volume[0], self.args.set_volume[1]) - - elif self.args.print_message_queue: - self.print_message_queue() - - elif self.args.get_file_for: - self.get_next_file(self.args.get_file_for) - - elif self.args.set_file_for: - self.set_next_file(self.args.set_file_for[0], self.args.set_file_for[1]) - - elif self.args.now_playing: - print("") - - elif self.args.init_player: - self.init_player() - - elif self.args.on_play: - self.on_play(self.args.on_play) - - elif self.args.recreatedb: - self.recreatedb() - - # else: - # raise Exception("") - - # init liquid => faster exec time, when loading at runtime just what is needed - # ------------------------------------------------------------------------------------------ # - def init_liquidsoap_communication(self): - # import - from modules.core.engine import SoundSystem - - # init liquidsoap communication - self.ss = SoundSystem(self.config) - # enable connection - self.ss.enable_transaction() - - # ------------------------------------------------------------------------------------------ # - def destroy_liquidsoap_communication(self): - # enable connection - self.ss.disable_transaction() - - # ------------------------------------------------------------------------------------------ # - def init_redis_communication(self, with_server=False): - self.redisclient = ClientRedisAdapter(self.config) - - if with_server: - self.redisserver = ServerRedisAdapter(self.config) - - # ------------------------------------------------------------------------------------------ # - def send_redis(self, channel, message): - self.init_redis_communication() - self.redisclient.publish(channel, message) - - # ------------------------------------------------------------------------------------------ # - def send_and_wait_redis(self, channel, message, reply_channel): - self.init_redis_communication(True) - self.redisclient.publish(channel, message) - return self.redisserver.listen_for_one_message(reply_channel.value) - - # ------------------------------------------------------------------------------------------ # - def shutdown(self): - self.send_redis("aura", "shutdown") - self.stringreply = "Shutdown message sent!" - - # ------------------------------------------------------------------------------------------ # - def fetch_new_programme(self): - json_reply = self.send_and_wait_redis("aura", "fetch_new_programme", RedisChannel.FNP_REPLY) - if json_reply != "": - actprogramme = json.loads(json_reply) - # self.print_programme(actprogramme) - else: - print("No programme fetched") - - # ------------------------------------------------------------------------------------------ # - # def get_act_programme(self): - # json_reply = self.send_and_wait_redis("aura", "get_act_programme", RedisChannel.GAP_REPLY) - # actprogramme = json.loads(json_reply) - # self.print_programme(actprogramme) - - - def get_status(self): - """ - Retrieves the Engine's status information. - """ - json_reply = self.send_and_wait_redis("aura", "get_status", RedisChannel.GS_REPLY) - # status = json.loads(json_reply) - self.stringreply = json_reply - - - # ------------------------------------------------------------------------------------------ # - def get_connection_status(self): - json_reply = self.send_and_wait_redis("aura", "get_connection_status", RedisChannel.GCS_REPLY) - connection_status = json.loads(json_reply) - self.print_connection_status(connection_status) - - # ------------------------------------------------------------------------------------------ # - # def print_programme(self, programme): - # cnt = 1 - # for show in programme: - # for entry in show["playlist"]: - # self.stringreply += str(cnt) + \ - # " --- schedule id #" + str(show["schedule_id"]) + "." + str(entry["entry_num"]) + \ - # " - show: " + show["show_name"] + \ - # " - starts @ " + entry["entry_start"] + \ - # " - plays " + str(entry["source"]) + "\n" - # cnt = cnt + 1 - - # ------------------------------------------------------------------------------------------ # - def print_connection_status(self, connection_status): - - if connection_status["pv"]: - self.stringreply = "Connection to pv: " + TerminalColors.GREEN.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value - else: - self.stringreply = "Connection to pv: " + TerminalColors.RED.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value - - if connection_status["db"]: - self.stringreply += "\nConnection to db: " + TerminalColors.GREEN.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value - else: - self.stringreply += "\nConnection to db: " + TerminalColors.RED.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value - - if connection_status["lqs"]: - self.stringreply += "\nConnection to lqs: " + TerminalColors.GREEN.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value - else: - self.stringreply += "\nConnection to lqs: " + TerminalColors.RED.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value - - if connection_status["lqsr"]: - self.stringreply += "\nConnection to lqsr: " + TerminalColors.GREEN.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value - else: - self.stringreply += "\nConnection to lqsr: " + TerminalColors.RED.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value - - if connection_status["tank"]: - self.stringreply += "\nConnection to tank: " + TerminalColors.GREEN.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value - else: - self.stringreply += "\nConnection to tank: " + TerminalColors.RED.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value - - if connection_status["redis"]: - self.stringreply += "\nConnection to redis: " + TerminalColors.GREEN.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value - else: - self.stringreply += "\nConnection to redis: " + TerminalColors.RED.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value - - # ------------------------------------------------------------------------------------------ # - - - def init_player(self): - """ - Initializes the player on Liquidsaop startup. - """ - self.stringreply = self.send_and_wait_redis("aura", "init_player", RedisChannel.IP_REPLY) - - - - def on_play(self, info): - """ - Event handler to be called when some entry started playing. - """ - - self.stringreply = self.send_and_wait_redis("aura", "on_play " + info, RedisChannel.GNF_REPLY) - - # ------------------------------------------------------------------------------------------ # - def recreatedb(self): - print("YOU WILL GET PROBLEMS DUE TO DATABASE BLOCKING IF aura.py IS RUNNING! NO CHECKS IMPLEMENTED SO FAR!") - x = AuraDatabaseModel() - x.recreate_db() - self.stringreply = "Database recreated!" - - # ------------------------------------------------------------------------------------------ # - def redis_message(self, channel, message): - self.send_redis(channel, message) - self.stringreply = "Message '"+message+"' sent to channel '"+channel+"'" - - # ------------------------------------------------------------------------------------------ # - def print_message_queue(self): - self.stringreply = self.send_and_wait_redis("aura", "print_message_queue", RedisChannel.PMQ_REPLY) - - # LIQUIDSOAP # - - # ------------------------------------------------------------------------------------------ # - def select_mixer(self, mixername, activate=True): - # init lqs - self.init_liquidsoap_communication() - - # select mixer and return the feedback - self.stringreply = self.ss.channel_activate(mixername, activate) - - # disable connection - self.destroy_liquidsoap_communication() - - # ------------------------------------------------------------------------------------------ # - def set_volume(self, mixernumber, volume): - # init lqs and enable comm - self.init_liquidsoap_communication() - self.stringreply = self.ss.channel_volume(mixernumber, volume) - # disable connection - self.destroy_liquidsoap_communication() - - # ------------------------------------------------------------------------------------------ # - def mixer_channels_selected(self): - self.init_liquidsoap_communication() - am = self.ss.mixer_channels_selected() - - if len(am) == 0: - self.destroy_liquidsoap_communication() - raise Exception("Guru recognized a problem: No active source!!!") - - self.stringreply = str(am) - - # disable connection - self.destroy_liquidsoap_communication() - - # ------------------------------------------------------------------------------------------ # - def mixer_status(self): - self.init_liquidsoap_communication() - - status = self.ss.mixer_status() - - for k, v in status.items(): - self.stringreply += "source: " + k + "\t status: " + v + "\n" - - # disable connection - self.destroy_liquidsoap_communication() - - # REDIS # - - # ------------------------------------------------------------------------------------------ # - def get_next_file(self, type): -# redis = RedisMessenger() -# next_file = redis.get_next_file_for(type) -# if next_file == "": -# next_file = "/var/audio/blank.flac" -# self.stringreply = next_file - - #self.send_redis("aura", "set_next_file " + type) - - next_file = self.send_and_wait_redis("aura", "get_next_file " + type, RedisChannel.GNF_REPLY) - self.stringreply = next_file - - # ------------------------------------------------------------------------------------------ # - def set_next_file(self, type, file): - #from modules.cli.redis.messenger import RedisMessenger - #redis = RedisMessenger() - - #redis.set_next_file_for(type, file) - self.send_redis("aura", "set_next_file " + type + " " + file) - self.stringreply = "Set "+file+" for fallback '"+type+"'" diff --git a/modules/cli/redis/adapter.py b/modules/cli/redis/adapter.py deleted file mode 100644 index 94acb7cc..00000000 --- a/modules/cli/redis/adapter.py +++ /dev/null @@ -1,286 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import sys -import time -import json -from datetime import datetime -from threading import Event -import threading - -import redis - - -from modules.cli.redis.messenger import RedisMessenger -from modules.cli.redis.statestore import RedisStateStore -# from modules.communication.connection_tester import ConnectionTester -from modules.base.exceptions import RedisConnectionException -from modules.cli.redis.channels import RedisChannel -from modules.base.utils import TerminalColors - - -# ------------------------------------------------------------------------------------------ # -class ServerRedisAdapter(threading.Thread, RedisMessenger): - debug = False - pubsub = None - config = None - redisdb = None - channel = "" - scheduler = None - redisclient = None -# connection_tester = None - engine = None - socket = None - - # ------------------------------------------------------------------------------------------ # - def __init__(self, config): - threading.Thread.__init__(self) - RedisMessenger.__init__(self, config) - - # init - #threading.Thread.__init__ (self) - self.config = config - self.shutdown_event = Event() - - self.channel = RedisChannel.STANDARD.value - self.section = '' - self.rstore = RedisStateStore(config) - self.errnr = '00' - self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'} - self.fromMail = '' - self.adminMails = '' - self.can_send = None - self.redisclient = ClientRedisAdapter(config) -# self.connection_tester = ConnectionTester() - - # ------------------------------------------------------------------------------------------ # - def run(self): - self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db")) - self.pubsub = self.redisdb.pubsub() - self.pubsub.subscribe(self.channel) - - self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value) - - # listener loop - for item in self.pubsub.listen(): - if item["type"] == "subscribe": - continue - - self.logger.debug(TerminalColors.YELLOW.value + "received REDIS message: " + TerminalColors.ENDC.value + str(item)) - - item["channel"] = self.decode_if_needed(item["channel"]) - item["data"] = self.decode_if_needed(item["data"]) - - try: - self.work(item) - except RedisConnectionException as rce: - self.logger.error(str(rce)) - - if not self.shutdown_event.is_set(): - self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value) - - self.pubsub.unsubscribe() - - if not self.shutdown_event.is_set(): - self.logger.warning("unsubscribed from " + self.channel + " and finished") - - # ------------------------------------------------------------------------------------------ # - def decode_if_needed(self, val): - if isinstance(val, bytes): - return val.decode("utf-8") - return val - - # ------------------------------------------------------------------------------------------ # - def listen_for_one_message(self, channel, socket_timeout=2): - self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"), socket_timeout=socket_timeout) - self.pubsub = self.redisdb.pubsub() - self.pubsub.subscribe(channel) - - try: - self.logger.debug("I am listening on channel '"+channel+"' for "+str(socket_timeout)+" seconds") - - for item in self.pubsub.listen(): - it = self.receive_message(item) - if it is not None: - break - except redis.exceptions.TimeoutError as te: - raise te - - return item["data"] - - # ------------------------------------------------------------------------------------------ # - def receive_message(self, item): - if item["type"] == "subscribe": - self.logger.info("i am subscribed to channel " + item["channel"].decode("utf-8")) - return None - - item["channel"] = item["channel"].decode("utf-8") - if isinstance(item["data"], bytes): - item["data"] = item["data"].decode("utf-8") - - self.pubsub.unsubscribe() - return item - - # ------------------------------------------------------------------------------------------ # - def work(self, item): - if item["data"] == "fetch_new_programme": - self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.fetch_new_programme) - # self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.get_act_programme_as_string) - - elif item["data"] == "shutdown": - self.terminate() - - elif item["data"] == "init_player": - self.execute(RedisChannel.IP_REPLY.value, self.engine.init_player) - - # elif item["data"] == "get_act_programme": - # self.execute(RedisChannel.GAP_REPLY.value, self.scheduler.get_act_programme_as_string) - - elif item["data"] == "get_status": - def get_status_string(): - status = "No monitoring plugin available!" - if "monitor" in self.engine.plugins: - status = self.engine.plugins["monitor"].get_status() - return json.dumps(status) - - self.execute(RedisChannel.GS_REPLY.value, get_status_string) - -# elif item["data"] == "get_connection_status": -# self.execute(RedisChannel.GCS_REPLY.value, self.connection_tester.get_connection_status) - - elif item["data"] == "print_message_queue": - self.execute(RedisChannel.PMQ_REPLY.value, self.scheduler.print_message_queue) - - elif item["data"].find("set_next_file") >= 0: - playlist = item["data"].split()[1] - playlist = playlist[0:len(playlist)-8] - self.execute(RedisChannel.SNF_REPLY.value, self.scheduler.set_next_file_for, playlist) - - elif item["data"].find("get_next_file") >= 0: - playlist = item["data"].split()[1] - #playlist = playlist[0:len(playlist)-8] - self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist) - - elif item["data"].find("on_play") >= 0: - source = item["data"].split("on_play ")[1] - self.execute(RedisChannel.TS_REPLY.value, self.scheduler.engine.player.on_play, source) - - elif item["data"] == "recreate_db": - self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database) - - elif item["data"] == "status": - return True - - else: - raise RedisConnectionException("ServerRedisAdapter Cannot understand command: " + item["data"]) - - # ------------------------------------------------------------------------------------------ # - def execute(self, channel, f, param1=None, param2=None, param3=None): - if param1 != None: - if param2 != None: - if param3 != None: - reply = f(param1, param2, param3) - else: - reply = f(param1, param2) - else: - reply = f(param1) - else: - reply = f() - - if reply is None: - reply = "" - - # sometimes the sender is faster than the receiver. redis messages would be lost - time.sleep(0.1) - - self.logger.debug(TerminalColors.YELLOW.value + "replying REDIS message " + TerminalColors.ENDC.value + reply + TerminalColors.YELLOW.value + " on channel " + channel + TerminalColors.ENDC.value) - - # publish - self.redisclient.publish(channel, reply) - - # ------------------------------------------------------------------------------------------ # - def join_comm(self): - try: - while self.is_alive(): - self.logger.debug(str(datetime.now())+" joining") - self.join() - self.logger.warning("join out") - - except (KeyboardInterrupt, SystemExit): - # Dem Server den Shutdown event setzen - # server.shutdown_event.set() - # Der Server wartet auf Eingabe - # Daher einen Client initiieren, der eine Nachricht schickt - self.halt() - sys.exit('Terminated') - - # ------------------------------------------------------------------------------------------ # - def halt(self): - """ - Stop the server - """ - if self.shutdown_event.is_set(): - return - self.shutdown_event.set() - try: - self.socket.unbind("tcp://"+self.ip+":"+self.port) - except: - pass - - # self.socket.close() - - # ------------------------------------------------------------------------------------------ # - def send(self, message): - """ - Send a message to the client - :param message: string - """ - - # FIXME Review logic - if not self.can_send: - self.logger.debug("sending a "+str(len(message))+" long message via REDIS.") - self.socket.send(message.encode("utf-8")) - self.can_send = False - else: - self.logger.warning("cannot send message via REDIS: "+str(message)) - - - - def terminate(self): - """ - Called when thread is stopped or a signal to terminate is received. - """ - self.shutdown_event.set() - self.scheduler.terminate() - self.pubsub.close() - self.logger.info("Shutdown event received. Bye bye ...") - - -# ------------------------------------------------------------------------------------------ # -class ClientRedisAdapter(RedisMessenger): - - def __init__(self, config): - RedisMessenger.__init__(self, config) - - # ------------------------------------------------------------------------------------------ # - def publish(self, channel, message): - if type(channel) == RedisChannel: - channel = channel.value - - self.rstore.publish(channel, message) diff --git a/modules/cli/redis/channels.py b/modules/cli/redis/channels.py deleted file mode 100644 index 57e778d4..00000000 --- a/modules/cli/redis/channels.py +++ /dev/null @@ -1,39 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -from enum import Enum - - - -class RedisChannel(Enum): - STANDARD = "aura" - - DPE_REPLY = "delete_playlist_entry_reply" - FNP_REPLY = "fetch_new_programme_reply" - # GAP_REPLY = "get_act_programme_reply" - GS_REPLY = "get_status_reply" - GCS_REPLY = "get_connection_status_reply" - GNF_REPLY = "get_next_file_reply" - IPE_REPLY = "insert_playlist_entry_reply" - IP_REPLY = "init_player_reply" - TS_REPLY = "track_service_reply" - MPE_REPLY = "move_playlist_entry_reply" - PMQ_REPLY = "print_message_queue_reply" - RDB_REPLY = "recreate_database_reply" - SNF_REPLY = "get_next_file_reply" diff --git a/modules/cli/redis/messenger.py b/modules/cli/redis/messenger.py deleted file mode 100644 index b6821832..00000000 --- a/modules/cli/redis/messenger.py +++ /dev/null @@ -1,307 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - - -import logging - - -from modules.cli.redis.statestore import RedisStateStore -from modules.cli.redis.channels import RedisChannel - - -""" -Send and receive redis messages -""" - - -# ------------------------------------------------------------------------------------------ # -class RedisMessenger(): - logger = None - rstore = None - - # ------------------------------------------------------------------------------------------ # - def __init__(self, config): - super(RedisMessenger, self).__init__() - """ - Constructor - """ - self.logger = logging.getLogger("AuraEngine") - self.channel = RedisChannel.STANDARD - self.section = '' - self.rstore = RedisStateStore(config) - self.errnr = '00' - self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'} - self.fromMail = '' - self.adminMails = '' - - # ------------------------------------------------------------------------------------------ # - def set_channel(self, channel): - """ - Einen "Kanal" setzen - zb scheduling - @type channel: string - @param channel: Kanal/Name der Komponente - """ - self.channel = channel - if channel in self.components: - self.errnr = self.components[channel] - self.rstore.set_channel(channel) - - # ------------------------------------------------------------------------------------------ # - def set_section(self, section): - """ - Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal - @type section: string - @param section: Gültigkeitsbereich - """ - self.section = section - - -# # ------------------------------------------------------------------------------------------ # -# def set_mail_addresses(self, fromMail, adminMails): -# """ -# Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal -# @type section: string -# @param section: Gültigkeitsbereich -# """ -# self.fromMail = fromMail -# self.adminMails = adminMails - -# # ------------------------------------------------------------------------------------------ # -# def send(self, message, code, level, job, value='', section=''): -# """ -# Eine Message senden -# @type message: string -# @param message: menschenverständliche Nachricht -# @type code: string -# @param code: Fehlercode - endet mit 00 bei Erfolg -# @type level: string -# @param level: Error-Level - info, warning, error, fatal -# @type job: string -# @param job: Name der ausgeführten Funktion -# @type value: string -# @param value: Ein Wert -# @type section: string -# @param section: Globale Sektion überschreiben -# """ -# section = self.section if section == '' else section -# self.time = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')) -# self.utime = time.time() -# state = {'message':message.strip().replace("'","\\'"), 'code':self.errnr + str(code),'job':job,'value':value} -# self.rstore.set_section(section) -# self.rstore.store(level, state) - -# if level == 'info' or level == 'success': -# self.logger.info(message) -# elif level == 'warning': -# self.logger.warning(message) -# elif level == 'error': -# self.logger.error(message) -# self.send_admin_mail(level, message, state) - -# elif level == 'fatal': -# self.logger.critical(message) -# self.send_admin_mail(level, message, state) - -# # ------------------------------------------------------------------------------------------ # -# def say_alive(self): -# """ -# Soll alle 20 Sekunden von den Komponenten ausgeführt werden, -# um zu melden, dass sie am Leben sind -# """ -# self.rstore.set_alive_state() - -# # ------------------------------------------------------------------------------------------ # -# def get_alive_state(self, channel): -# """ -# Live State abfragen -# @type channel: string -# @param channel: Channel/Komponente -# """ -# return self.rstore.get_alive_state(channel) - -# # ------------------------------------------------------------------------------------------ # -# def set_state(self, name, value, expires=None, channel=None): -# """ -# Kündigt einen Event an -# @type name: string -# @param name: Name des state -# @type value: string -# @param value: Wert -# @type channel: string -# @param channel: Kanal (optional) -# """ -# if not channel: -# channel = self.channel - -# self.rstore.set_state(name, value, expires, channel) - -# # ------------------------------------------------------------------------------------------ # -# def queue_add_event(self, name, eventtime, value, channel=None): -# """ -# Kündigt einen Event an -# @type name: string -# @param name: der Name des Events -# @type eventtime: string|datetime.datetime -# @param eventtime: Datum und Zeit des events -# @type value: dict -# @param value: Werte -# @type channel: string -# @param channel: Kanal (optional) -# """ -# if not channel: -# channel = self.channel - -# if type(eventtime) == type(str()): -# eventtime_str = datetime.datetime.strptime(eventtime[0:16].replace(' ','T'), "%Y-%m-%dT%H:%M").strftime("%Y-%m-%dT%H:%M") - -# elif type(eventtime) is datetime.datetime: -# eventtime_str = eventtime.strftime("%Y-%m-%dT%H:%M") - -# else: -# raise TypeError('eventtime must be a datetime.date or a string, not a %s' % type(eventtime)) - -# self.rstore.queue_add_event(eventtime_str, name, value, channel) - -# # ------------------------------------------------------------------------------------------ # -# def queue_remove_events(self, name, channel=None): -# """ -# Löscht Events -# @type name: string -# @param name: der Name des Events -# @type channel: string -# @param channel: Kanal (optional) -# """ -# if not channel: -# channel = self.channel - -# self.rstore.queue_remove_events(name, channel) - -# # ------------------------------------------------------------------------------------------ # -# def fire_event(self, name, value, channel=None): -# """ -# Feuert einen Event -# @type name: string -# @param name: der Name des Events -# @type value: dict -# @param value: Werte -# @type channel: string -# @param channel: Kanal (optional) -# """ -# if not channel: -# channel = self.channel - -# self.rstore.fire_event(name, value, channel) - -# # ------------------------------------------------------------------------------------------ # -# def get_event_queue(self, name=None, channel=None): -# """ -# Holt events eines Kanals -# @type channel: string -# @param channel: Kanal (optional) -# @rtype: list -# @return: Liste der Events -# """ -# queue = self.rstore.get_event_queue(name, channel) -# return queue - -# # ------------------------------------------------------------------------------------------ # -# def get_events(self, name=None, channel=None): -# """ -# Holt events eines Kanals -# @type channel: string -# @param channel: Kanal (optional) -# @rtype: list -# @return: Liste der Events -# """ -# events = self.rstore.get_events(name, channel) -# return events - -# # ------------------------------------------------------------------------------------------ # -# def get_event(self, name=None, channel=None): -# """ -# Holt event eines Kanals -# @type channel: string -# @param channel: Kanal (optional) -# @rtype: dict -# @return: Event -# """ -# events = self.rstore.get_events(name, channel) -# result = False -# if events: -# result = events.pop(0) -# return result - -# # ------------------------------------------------------------------------------------------ # -# def send_admin_mail(self, level, message, state): -# """ -# Sendent mail an Admin(s), -# @type message: string -# @param message: Die Message -# @type state: dict -# @param state: Der State -# @return result -# """ - -# # FIXME Make Mailer functional: Invalid constructor -# if self.fromMail and self.adminMails: -# #subject = "Possible comba problem on job " + state['job'] + " - " + level -# mailmessage = "Hi Admin,\n comba reports a possible problem\n\n" -# mailmessage = mailmessage + level + "!\n" -# mailmessage = mailmessage + message + "\n\n" -# mailmessage = mailmessage + "Additional information:\n" -# mailmessage = mailmessage + "##################################################\n" -# mailmessage = mailmessage + "Job:\t" + state['job'] + "\n" -# mailmessage = mailmessage + "Code:\t" + state['code'] + "\n" -# mailmessage = mailmessage + "Value:\t" + str(state['value']) + "\n" - -# #mailer = AuraMailer(self.adminMails, self.fromMail) -# #mailer.send_admin_mail(subject, mailmessage) -# else: -# return False - -# # ------------------------------------------------------------------------------------------ # -# def receive(self): -# """ -# Bisher wird nichts empfangen -# """ -# return "" - -# # ------------------------------------------------------------------------------------------ # -# def get_next_file_for(self, playlisttype): -# next = self.rstore.db.get('next_'+playlisttype+'file') - -# if next is None: -# next = b"" - -# return next.decode('utf-8') - - # ------------------------------------------------------------------------------------------ # - # def on_play(self, info): - # result = self.rstore.db.get('on_play') - - # if result is None: - # result = b"" - - # return result.decode('utf-8') - - # ------------------------------------------------------------------------------------------ # - # def set_next_file_for(self, playlisttype, file): - # self.rstore.db.set("next_" + playlisttype + "file", file) - - diff --git a/modules/cli/redis/statestore.py b/modules/cli/redis/statestore.py deleted file mode 100644 index de7dfeb9..00000000 --- a/modules/cli/redis/statestore.py +++ /dev/null @@ -1,326 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import time -import datetime -import json -import re -import uuid - -import redis - - -class RedisStateStore(object): - - """Store and get Reports from redis""" - - def __init__(self, config, **redis_kwargs): - """The default connection parameters are: host='localhost', port=6379, db=0""" - self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db")) - self.channel = '*' - self.section = '*' - self.separator = '_' - self.daily = False - - # ------------------------------------------------------------------------------------------ # - def set_channel(self, channel): - """ - Kanal setzen - @type channel: string - @param channel: Kanal - """ - self.channel = channel - - # ------------------------------------------------------------------------------------------ # - def set_section(self, section): - """ - Sektion setzen - @type section: string - @param section: Sektion - """ - self.section = section - - # ------------------------------------------------------------------------------------------ # - def set_alive_state(self): - """ - Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist - """ - self.set_state('alive', 'Hi', 21) - - # ------------------------------------------------------------------------------------------ # - def get_alive_state(self, channel): - """ - Alive Status eines Channels ermitteln - @type channel: string - @param channel: der Channel - @rtype: string/None - @return: Ein String, oder None, bei negativem Ergebnis - """ - return self.get_state('alive', channel) - - # ------------------------------------------------------------------------------------------ # - def set_state(self, name, value, expires=None, channel=None): - """ - Setzt einen Status - @type name: string - @param name: Name des state - @type value: string - @param value: Wert - @type channel: string - @param channel: Kanal (optional) - """ - if not channel: - channel = self.channel - - key = self.__create_key__(channel + 'State', name) - - if value == "": - self.db.delete(key) - else: - # publish on channel - message = json.dumps({'eventname':name, 'value': value}) - self.db.publish(channel + 'Publish', message) - # store in database - self.db.set(key, value) - if(expires): - self.db.expire(key, 21) - - # ------------------------------------------------------------------------------------------ # - def get_state(self, name, channel): - """ - Holt einen Status - @type name: string - @param name: Name des state - @type channel: string - @param channel: Kanal (optional) - """ - key = self.__create_key__(channel + 'State', name) - return self.db.get(key) - - # ------------------------------------------------------------------------------------------ # - def queue_add_event(self, eventtime, name, value, channel=None): - """ - Kündigt einen Event an - @type eventtime: string - @param eventtime: Datum und Zeit des events - @type name: string - @param name: Name des Events - @type value: dict - @param value: Werte - @type channel: string - @param channel: Kanal (optional) - """ - timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M") - expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60 - self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel) - - # ------------------------------------------------------------------------------------------ # - def queue_remove_events(self, name=None, channel=None): - """ - Löscht Events - @type name: string - @param name: Name des Events - @type channel: string - @param channel: Kanal (optional) - """ - query = channel + 'Evqueue_' if channel else '*Evqueue_' - query = query + '*_' + name if name else query + '*_*' - - keys = self.db.keys(query) - - for delkey in keys: - self.db.delete(delkey) - - # ------------------------------------------------------------------------------------------ # - def fire_event(self, name, value, channel=None): - """ - Feuert einen Event - @type name: string - @param name: Name des Events - @type value: dict - @param value: Werte - @type channel: string - @param channel: Kanal (optional) - """ - eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M") - self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel) - - # ------------------------------------------------------------------------------------------ # - def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None): - """ - Feuert einen Event - @type eventtime: string - @param eventtime: Datum und Zeit des events - @type value: dict - @param value: Werte - @type channel: string - @param channel: Kanal (optional) - """ - if not channel: - channel = self.channel - - timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M") - key = self.__create_key__(channel + type, eventtime, name) - - value['starts'] = eventtime[0:16] - value['eventchannel'] = channel - value['eventname'] = name - self.db.hset(key, namespace, value) - self.db.expire(key, expire) - - # ------------------------------------------------------------------------------------------ # - def get_event_queue(self, name=None, channel=None): - """ - Holt events eines Kanals - @type channel: string - @param channel: Kanal (optional) - @rtype: list - @return: Liste der Events - """ - query = channel + 'Evqueue_' if channel else '*Evqueue_' - query = query + '*_' + name if name else query + '*_*' - keys = self.db.keys(query) - keys.sort() - entries = self.__get_entries__(keys, 'evqueue') - return entries - - # ------------------------------------------------------------------------------------------ # - def get_events(self, name=None, channel=None): - """ - Holt events eines Kanals - @type channel: string - @param channel: Kanal (optional) - @rtype: list - @return: Liste der Events - """ - query = channel + 'Event_' if channel else '*Event_' - query = query + '*_' + name if name else query + '*_*' - keys = self.db.keys(query) - keys.sort() - entries = self.__get_entries__(keys, 'events') - return entries - - # ------------------------------------------------------------------------------------------ # - def get_next_event(self, name=None, channel=None): - """ - Holt den aktuellsten Event - @type channel: string - @param channel: Kanal (optional) - @rtype: dict/boolean - @return: ein Event oder False - """ - events = self.get_event_queue(name, channel) - if len(events) > 0: - result = events.pop(0) - else: - result = False - - return result - - # ------------------------------------------------------------------------------------------ # - def store(self, level, value): - """ - Hash speichern - @type level: string - @param level: der errorlevel - @type value: dict - @param value: Werte als dict - """ - microtime = str(time.time()) - value['microtime'] = microtime - value['level'] = level - key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1())) - self.db.hset(key, self.channel, value) - self.db.expire(key, 864000) - - # ------------------------------------------------------------------------------------------ # - def __get_keys__(self, level ='*'): - """ - Redis-Keys nach Suchkriterium ermitteln - @type level: string - @param level: einen Errorlevel filtern - @rtype: list - @return: Die Keys auf die das Suchkriterium zutrifft - """ - key = self.__create_key__(self.channel, self.section, level) - microtime = str(time.time()) - search = microtime[0:4] + '*' if self.daily else '*' - return self.db.keys(key + self.separator + '*') - - # ------------------------------------------------------------------------------------------ # - def __create_key__(self, *args): - """ - Key erschaffen - beliebig viele Argumente - @rtype: string - @return: Der key - """ - return self.separator.join(args) - - def get_entries(self, level ='*'): - """ - Liste von Hashs nach Suchkriterium erhalten - @type level: string - @param level: einen Errorlevel filtern - @rtype: list - @return: Redis Hashs - """ - def tsort(x,y): - - if float(x.split('_',4)[3]) > float(y.split('_',4)[3]): - return 1 - elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]): - return -1 - else: - return 0 - - keys = self.__get_keys__(level) - - keys.sort(tsort) - entries = self.__get_entries__(keys, self.channel) - entries = sorted(entries, key=lambda k: k['microtime'], reverse=True) - return entries - - # ------------------------------------------------------------------------------------------ # - def __get_entries__(self, keys, channel): - entries = [] - for key in keys: - entry = self.db.hget(key,channel) - entry = json.dumps(entry.decode('utf-8')) - - if not (entry is None): - try: - entry = entry.decode('utf-8').replace('None','"None"') - entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"") - entry = json.loads(entry) - entry['key'] = key - entries.append(entry) - except: - pass - - return entries - - # ------------------------------------------------------------------------------------------ # - def publish(self, channel, message): - subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel) - - if channel.lower().find("reply") < 0 and subscriber_count[1] == 0: - raise Exception("No subscriber! Is Aura daemon running?") - - self.db.publish(channel, message) - diff --git a/modules/core/control.py b/modules/core/control.py index c5857205..693b9650 100644 --- a/modules/core/control.py +++ b/modules/core/control.py @@ -41,9 +41,10 @@ class EngineControlInterface: config = None logger = None engine = None + event_dispatcher = None sci = None - def __init__(self, engine): + def __init__(self, engine, event_dispatcher): """ Constructor @@ -51,10 +52,12 @@ class EngineControlInterface: config (AuraConfig): Engine configuration logger (AuraLogger): The logger """ + self.engine = engine self.config = AuraConfig.config() - self.logger = logging.getLogger("AuraEngine") - self.sci = SocketControlInterface.get_instance(engine) + self.logger = logging.getLogger("AuraEngine") self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ...")) + self.sci = SocketControlInterface.get_instance(event_dispatcher) + def terminate(self): @@ -79,10 +82,10 @@ class SocketControlInterface: config = None logger = None server = None - engine = None + event_dispatcher = None - def __init__(self, engine): + def __init__(self, event_dispatcher): """ Constructor @@ -96,28 +99,22 @@ class SocketControlInterface: SocketControlInterface.instance = self self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") - self.engine = engine + self.event_dispatcher = event_dispatcher host = "127.0.0.1" thread = Thread(target = self.run, args = (self.logger, host)) thread.start() @staticmethod - def get_instance(engine): + def get_instance(event_dispatcher): """ Returns the Singleton. """ if not SocketControlInterface.instance: - SocketControlInterface.instance = SocketControlInterface(engine) + SocketControlInterface.instance = SocketControlInterface(event_dispatcher) return SocketControlInterface.instance - def attach(self, engine): - """ - Attaches the engine to pass events to. - """ - self.engine = engine - def run(self, logger, host): """ @@ -164,7 +161,7 @@ class SocketControlInterface: meta_data = data["data"] meta_data["duration"] = data["track_duration"] logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA)) - self.engine.event_dispatcher.on_metadata(data["data"]) + self.event_dispatcher.on_metadata(data["data"]) logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully")) else: logger.error(SU.red("[ECI] Unknown action: " + data["action"])) diff --git a/modules/core/engine.py b/modules/core/engine.py index 8fd5efa3..0809495d 100644 --- a/modules/core/engine.py +++ b/modules/core/engine.py @@ -25,13 +25,13 @@ from threading import Thread import meta +from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU from modules.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, \ LoadSourceException from modules.core.resources import ResourceClass, ResourceUtil from modules.core.channels import ChannelType, TransitionType, LiquidsoapResponse, \ EntryPlayState, ResourceType, ChannelRouter -from modules.core.startup import StartupThread from modules.core.events import EngineEventDispatcher from modules.core.control import EngineControlInterface from modules.core.mixer import Mixer, MixerType @@ -46,52 +46,48 @@ class Engine(): """ instance = None engine_time_offset = 0.0 - logger = None eci = None channels = None channel_router = None scheduler = None event_dispatcher = None - is_liquidsoap_running = False plugins = None connector = None - def __init__(self, config): + + def __init__(self): """ Constructor - - Args: - config (AuraConfig): The configuration """ if Engine.instance: - raise Exception("Engine is already running!") - Engine.instance = self - self.config = config - self.plugins = dict() - self.logger = logging.getLogger("AuraEngine") - self.eci = EngineControlInterface(self) + raise Exception("Engine is already running!") + Engine.instance = self + self.logger = logging.getLogger("AuraEngine") + self.config = AuraConfig.config() + Engine.engine_time_offset = self.config.get("lqs_delay_offset") - - self.is_active() # TODO Check if it makes sense to move it to the boot-phase + self.plugins = dict() self.channel_router = ChannelRouter(self.config, self.logger) - Engine.engine_time_offset = self.config.get("lqs_delay_offset") - + self.start() + + def start(self): """ Starts the engine. Called when the connection to the sound-system implementation has been established. """ - self.event_dispatcher = EngineEventDispatcher(self, self.scheduler) - - # Sleep needed, because the socket is created too slowly by Liquidsoap - time.sleep(1) + self.event_dispatcher = EngineEventDispatcher(self) + self.eci = EngineControlInterface(self, self.event_dispatcher) self.player = Player(self.config, self.event_dispatcher) - - self.is_liquidsoap_running = True self.event_dispatcher.on_initialized() + + while not self.is_connected(): + self.logger.info(SU.yellow("Waiting for Liquidsoap to be running ...")) + time.sleep(2) self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap")) + self.event_dispatcher.on_boot() self.logger.info(EngineSplash.splash_screen("Engine Core", meta.__version__)) self.event_dispatcher.on_ready() @@ -103,35 +99,20 @@ class Engine(): # - def init_player(self): + def is_connected(self): """ - Initializes the LiquidSoap Player after startup of the engine. - - Returns: - (String): Message that the player is started. - """ - t = StartupThread(self) - t.start() - - return "Engine Core startup done!" - - - - def is_active(self): - """ - Checks if Liquidsoap is running + Checks if there's a valid connection to Liquidsoap. """ + has_connection = False try: self.uptime() - self.is_liquidsoap_running = True + has_connection = True except LQConnectionError as e: self.logger.info("Liquidsoap is not running so far") - self.is_liquidsoap_running = False except Exception as e: self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e)) - self.is_liquidsoap_running = False - return self.is_liquidsoap_running + return has_connection def engine_state(self): @@ -154,7 +135,9 @@ class Engine(): """ Retrieves the uptime of Liquidsoap. """ + self.player.connector.enable_transaction() data = self.player.connector.send_lqc_command("uptime", "") + self.player.connector.disable_transaction() return data @@ -359,21 +342,10 @@ class Player: self.connector.disable_transaction() Thread(target=clean_up).start() - # Filesystem meta-changes trigger the event via Liquidsoap + # Filesystem meta-changes trigger the event via Liquidsoap, so only + # issue event for LIVE and STREAM: if not entry.channel in ChannelType.QUEUE.channels: - self.on_play(entry) - - - - def on_play(self, source): - """ - Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) - when some entry is actually playing. - - Args: - source (String): The `Entry` or URI or of the media source currently being played - """ - self.event_dispatcher.on_play(source) + self.event_dispatcher.on_play(entry) diff --git a/modules/core/events.py b/modules/core/events.py index e82be4ca..ec8d7480 100644 --- a/modules/core/events.py +++ b/modules/core/events.py @@ -20,15 +20,13 @@ import logging import datetime -from threading import Thread +from threading import Thread -from modules.base.config import AuraConfig -from modules.base.utils import SimpleUtil as SU -from modules.base.mail import AuraMailer -from modules.plugins.monitor import AuraMonitor - - -from modules.plugins.trackservice import TrackServiceHandler +from modules.base.config import AuraConfig +from modules.base.utils import SimpleUtil as SU +from modules.base.mail import AuraMailer +from modules.plugins.monitor import AuraMonitor +from modules.plugins.trackservice import TrackServiceHandler class EventBinding(): @@ -45,7 +43,11 @@ class EventBinding(): dispatcher = None instance = None + def __init__(self, dispatcher, instance): + """ + Constructor + """ self.dispatcher = dispatcher self.instance = instance @@ -80,16 +82,15 @@ class EngineEventDispatcher(): monitor = None - def __init__(self, engine, scheduler): + def __init__(self, engine): """ - Initialize EventDispatcher + Constructor """ self.subscriber_registry = dict() self.logger = logging.getLogger("AuraEngine") self.config = AuraConfig.config() self.mailer = AuraMailer(self.config) self.engine = engine - self.scheduler = scheduler binding = self.attach(AuraMonitor) binding.subscribe("on_boot") @@ -105,7 +106,7 @@ class EngineEventDispatcher(): def attach(self, clazz): """ - Creates an intance of the given Class. + Creates an instance of the given Class. """ instance = clazz(self.engine) return EventBinding(self, instance) @@ -145,16 +146,18 @@ class EngineEventDispatcher(): def on_initialized(self): """ - Called when the engine is initialized e.g. connected to Liquidsoap. + Called when the engine is initialized, just before """ self.logger.debug("on_initialized(..)") - self.scheduler.on_initialized() + from modules.scheduling.scheduler import AuraScheduler + self.scheduler = AuraScheduler(self.engine) self.call_event("on_initialized", None) def on_boot(self): """ Called when the engine is starting up. This happens after the initialization step. + Connection to Liquidsoap should be available here. """ self.logger.debug("on_boot(..)") self.call_event("on_boot", None) diff --git a/modules/core/startup.py b/modules/core/startup.py deleted file mode 100644 index 8e5d9ddd..00000000 --- a/modules/core/startup.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - - -import logging -import threading - -from modules.base.exceptions import NoActiveScheduleException -from modules.base.utils import SimpleUtil as SU - - -class StartupThread(threading.Thread): - """ - StartupThread class. - - Boots the engine and starts playing the current schedule. - """ - logger = None - active_entry = None - engine = None - - - def __init__(self, engine): - """ - Initialize the thread. - """ - threading.Thread.__init__(self) - self.logger = logging.getLogger("AuraEngine") - self.engine = engine - - - def run(self): - """ - Boots the soundsystem. - """ - try: - self.engine.start() - - except NoActiveScheduleException as e: - self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.") - except Exception as e: - self.logger.error(SU.red("Error while initializing the soundsystem: " + str(e)), e) - diff --git a/modules/liquidsoap/engine.liq b/modules/liquidsoap/engine.liq index 69894256..2b0ee39e 100644 --- a/modules/liquidsoap/engine.liq +++ b/modules/liquidsoap/engine.liq @@ -138,10 +138,3 @@ output_source = fallback_three # enable socket functions %include "serverfunctions.liq" -######################## -# start initialization # -######################## - -system('#{list.assoc(default="", "install_dir", ini)}/guru.py --init-player --quiet') - - diff --git a/modules/plugins/monitor.py b/modules/plugins/monitor.py index 248dd520..6cdae73c 100644 --- a/modules/plugins/monitor.py +++ b/modules/plugins/monitor.py @@ -30,7 +30,6 @@ from enum import Enum from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST import meta -from modules.cli.redis.adapter import ClientRedisAdapter from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU from modules.base.mail import AuraMailer @@ -85,7 +84,6 @@ class AuraMonitor: self.status = dict() self.status["engine"] = dict() self.status["lqs"] = dict() - self.status["redis"] = dict() self.status["api"] = dict() self.status["api"]["steering"] = dict() self.status["api"]["tank"] = dict() @@ -172,7 +170,6 @@ class AuraMonitor: try: if self.status["lqs"]["active"] \ and self.status["lqs"]["mixer"]["in_filesystem_0"] \ - and self.status["redis"]["active"] \ and self.status["audio_source"]["exists"]: self.status["engine"]["status"] = MonitorResponseCode.OK.value @@ -245,10 +242,8 @@ class AuraMonitor: Refreshes the vital status info which are required for the engine to survive. """ self.engine.player.connector.enable_transaction() - self.status["lqs"]["active"] = self.engine.is_active() + self.status["lqs"]["active"] = self.engine.is_connected() self.engine.player.connector.disable_transaction() - - self.status["redis"]["active"] = self.validate_redis_connection() self.status["audio_source"] = self.validate_directory(self.config.get("audio_source_folder")) # After first update start the Heartbeat Monitor @@ -320,19 +315,6 @@ class AuraMonitor: return True - def validate_redis_connection(self): - """ - Checks if the connection to Redis is successful. - """ - try: - cra = ClientRedisAdapter(self.config) - cra.publish("aura", "status") - except: - return False - - return True - - def validate_directory(self, dir_path): """ Checks if a given directory is existing and holds content diff --git a/modules/plugins/trackservice.py b/modules/plugins/trackservice.py index 498593c8..2107b75d 100644 --- a/modules/plugins/trackservice.py +++ b/modules/plugins/trackservice.py @@ -168,8 +168,6 @@ class TrackServiceHandler(): """ current_playlist = self.engine.scheduler.get_active_playlist() (past_timeslot, current_timeslot, next_timeslot) = self.playlog.get_timeslots() - # next_timeslot = self.engine.scheduler.get_next_timeslots(1) - # if next_timeslot: next_timeslot = next_timeslot[0] data = dict() data["engine_source"] = self.config.get("api_engine_number") @@ -241,7 +239,31 @@ class Playlog: self.engine = engine self.history = deque([None, None, None]) self.current_timeslot = {} - self.set_timeslot(None) + self.init_timeslot(None) + + + + def init_timeslot(self, next_timeslot=None): + """ + Initializes the timeslot. + """ + data = {} + self.assign_fallback_playlist(data, None) + data["schedule_id"] = -1 + data["show_id"] = -1 + data["show_name"] = "" + + if self.previous_timeslot: + data["schedule_start"] = self.previous_timeslot.get("schedule_end") + else: + data["schedule_start"] = datetime.now() + + if next_timeslot: + data["schedule_end"] = next_timeslot.schedule_end + else: + # Fake the end, because the timeslot is actually not existing + data["schedule_end"] = datetime.now() + timedelta(hours=1) + def set_timeslot(self, timeslot): @@ -280,21 +302,7 @@ class Playlog: # Defaults for a not existing timeslot else: - self.assign_fallback_playlist(data, None) - data["schedule_id"] = -1 - data["show_id"] = -1 - data["show_name"] = "" - - if self.previous_timeslot: - data["schedule_start"] = self.previous_timeslot.get("schedule_end") - else: - data["schedule_start"] = datetime.now() - - if next_timeslot: - data["schedule_end"] = next_timeslot.schedule_end - else: - # Fake the end, because the timeslot is actually not existing - data["schedule_end"] = datetime.now() + timedelta(hours=1) + self.init_timeslot(next_timeslot) # A valid following timeslot is available if next_timeslot: diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index 33359613..d6389556 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -27,7 +27,7 @@ from enum import Enum from operator import attrgetter from datetime import datetime, timedelta -from modules.cli.redis.messenger import RedisMessenger +from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU from modules.base.models import AuraDatabaseModel, Schedule, Playlist from modules.base.exceptions import NoActiveScheduleException, LoadSourceException @@ -69,24 +69,22 @@ class AuraScheduler(threading.Thread): active_entry(Show, Track): This is a Tuple consisting of the currently played `Show` and `Track` message_timer(List<threading.Timer>): The timer queue of sound-system commands for playlists/entries to be played """ - redismessenger = None - job_result = {} - config = None logger = None - exit_event = None engine = None + exit_event = None + is_initialized = None + is_initialized = None + last_successful_fetch = None programme = None message_timer = [] fallback = None - client = None - is_soundsytem_ready = None - is_initialized = None - def __init__(self, config, engine, func_on_init): + + def __init__(self, engine): """ Constructor @@ -95,29 +93,21 @@ class AuraScheduler(threading.Thread): engine (Engine): The engine to play the schedule on func_on_init (Function): The function to be called when the scheduler is initialized """ - self.config = config + self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") - self.init_database() + AuraScheduler.init_database() self.fallback = FallbackManager(self) - self.redismessenger = RedisMessenger(config) self.engine = engine self.engine.scheduler = self self.is_soundsytem_init = False # Scheduler Initialization self.is_initialized = False - self.func_on_initialized = func_on_init + self.is_engine_ready = False - # init threading + # Init scheduling thread threading.Thread.__init__(self) - - # init messenger.. FIXME probably not needed anymore - self.redismessenger.set_channel('scheduler') - self.redismessenger.set_section('execjob') - #self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal') - - # Create exit event self.exit_event = threading.Event() self.start() @@ -138,16 +128,19 @@ class AuraScheduler(threading.Thread): self.config.load_config() seconds_to_wait = int(self.config.get("fetching_frequency")) self.logger.info(SU.cyan(f"== start fetching new schedules (every {seconds_to_wait} seconds) ==")) + + # Load some stuff from the API in any case self.fetch_new_programme() - # The scheduler is ready - if not self.is_initialized: - self.is_initialized = True - if self.func_on_initialized: - self.func_on_initialized() + # Called upon first boot only + if self.is_engine_ready: + if not self.is_initialized: + + # Queue the start items + self.is_initialized = True + self.on_scheduler_ready() - # The engine is ready - if self.is_soundsytem_init: + # Queue all the other ones self.queue_programme() except Exception as e: @@ -171,18 +164,18 @@ class AuraScheduler(threading.Thread): # PUBLIC METHODS # - def on_initialized(self): + + def on_ready(self): """ - Called when the sound-sytem is connected and ready to play. + Called when the engine is ready. """ - self.is_soundsytem_init = True + self.is_engine_ready = True - def on_ready(self): + def on_scheduler_ready(self): """ - Called when the engine is ready. + Called when the scheduler is ready. """ - # self.queue_programme() self.logger.info(self.get_ascii_programme()) try: @@ -967,14 +960,15 @@ class AuraScheduler(threading.Thread): # FIXME Move to adequate module - def init_database(self): + @staticmethod + def init_database(): """ Initializes the database. Raises: sqlalchemy.exc.ProgrammingError: In case the DB model is invalid """ - if self.config.get("recreate_db") is not None: + if AuraConfig.config().get("recreate_db") is not None: AuraDatabaseModel.recreate_db(systemexit=True) # Check if tables exists, if not create them diff --git a/requirements.txt b/requirements.txt index 9f0c0534..06a4cda5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ sqlalchemy==1.3.17 Flask==1.1.2 Flask_SQLAlchemy==2.4.3 mysqlclient==1.3.12 -redis==3.5.3 validators==0.12.1 accessify==0.3.1 http-parser==0.9.0 \ No newline at end of file diff --git a/test/connection_tester.py b/test/connection_tester.py index e96fc4b3..f0b0c094 100644 --- a/test/connection_tester.py +++ b/test/connection_tester.py @@ -41,7 +41,6 @@ class ConnectionTester(AuraConfig): status["lqs"] = self.test_lqs_conn() status["lqsr"] = False # self.test_lqsr_conn() status["tank"] = self.test_tank_conn() - status["redis"] = self.test_redis_conn() return json.dumps(status) @@ -64,15 +63,6 @@ class ConnectionTester(AuraConfig): except Exception as e: return False - # ------------------------------------------------------------------------------------------ # - def test_lqsr_conn(self): - try: - lsc = Engine(self.config) - lsc.get_recorder_status() - return True - - except Exception as e: - return False # ------------------------------------------------------------------------------------------ # def test_pv_conn(self): @@ -83,16 +73,6 @@ class ConnectionTester(AuraConfig): # test load of playlist 1 return self.test_url_connection(self.config.get("importerurl")+"1") - # ------------------------------------------------------------------------------------------ # - def test_redis_conn(self): - from modules.cli.redis.adapter import ClientRedisAdapter - try: - cra = ClientRedisAdapter() - cra.publish("aura", "status") - except: - return False - - return True def test_url_connection(self, url): try: -- GitLab