From 08c8105f4e8e93dcab30810089d2ece29404934c Mon Sep 17 00:00:00 2001 From: David Trattnig <david@subsquare.at> Date: Thu, 11 Aug 2022 18:43:13 +0200 Subject: [PATCH] refact: remove obsolete code --- src/aura_engine/control.py | 163 +------------------------------------ 1 file changed, 1 insertion(+), 162 deletions(-) diff --git a/src/aura_engine/control.py b/src/aura_engine/control.py index ab5384c7..f52b8644 100644 --- a/src/aura_engine/control.py +++ b/src/aura_engine/control.py @@ -21,176 +21,15 @@ Remote-control the Engine with these services. """ -import json + import logging -import socket import time from datetime import datetime, timedelta from threading import Lock, Thread, Timer -from http_parser.http import HttpStream -from http_parser.reader import SocketReader - -from aura_engine.base.api import LiquidsoapUtil as LU -from aura_engine.base.config import AuraConfig from aura_engine.base.utils import SimpleUtil as SU -class EngineControlInterface: - """ - Provides ability to control the engine in various ways. - """ - - config = None - logger = None - engine = None - event_dispatcher = None - sci = None - - def __init__(self, engine, event_dispatcher): - """ - Initialize the ECI. - """ - self.engine = engine - self.config = AuraConfig.config() - self.logger = logging.getLogger("AuraEngine") - if self.config.get("enable_sci", "false") == "true": - self.logger.info(SU.yellow("[ECI] Socket Control Interface starting ...")) - self.sci = SocketControlInterface.get_instance(event_dispatcher) - else: - self.logger.debug(SU.yellow("[ECI] Socket Control Interface disabled")) - - def terminate(self): - """ - Terminate the instance and all related objects. - """ - if self.sci: - self.sci.terminate() - self.logger.info(SU.yellow("[ECI] terminated.")) - - -class SocketControlInterface: - """ - Network socket server to control a running Engine from Liquidsoap. - - Note this server only allows a single connection at once. This - service is primarily utilized to store new playlogs. - """ - - DEFAULT_CONTROL_HOST = "0.0.0.0:1337" - ACTION_ON_METADATA = "on_metadata" - - instance = None - config = None - logger = None - server = None - event_dispatcher = None - - def __init__(self, event_dispatcher): - """ - Constructor. - - Args: - config (AuraConfig): Engine configuration - logger (AuraLogger): The logger - """ - if SocketControlInterface.instance: - raise Exception(SU.red("[ECI] Socket server is already running!")) - - SocketControlInterface.instance = self - self.config = AuraConfig.config() - self.logger = logging.getLogger("AuraEngine") - self.event_dispatcher = event_dispatcher - default_host = SocketControlInterface.DEFAULT_CONTROL_HOST - url_parts = self.config.get("api_engine_control_host", default_host).split(":") - host = url_parts[0] - port = int(url_parts[1]) - thread = Thread(target=self.run, args=(self.logger, host, port)) - thread.start() - - @staticmethod - def get_instance(event_dispatcher): - """ - Return the Singleton. - """ - if not SocketControlInterface.instance: - SocketControlInterface.instance = SocketControlInterface(event_dispatcher) - return SocketControlInterface.instance - - def run(self, logger, host, port): - """ - Start the socket server. - """ - while True: - try: - self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server.bind((host, port)) - break - except OSError: - wait_time = 2 - msg = f"Cannot bind to Socket. Retry in {wait_time} seconds..." - self.logger.error(SU.red(msg)) - time.sleep(wait_time) - - logger.info(SU.yellow(f"[ECI] Listening at {host}:{port}")) - self.server.listen() - - while True: - (conn, client) = self.server.accept() - while True: - r = SocketReader(conn) - p = HttpStream(r) - data = p.body_file().read() - msg = f"[ECI] Received socket data from {str(client)}: {str(data)}" - logger.debug(SU.yellow(msg)) - - try: - data = data.decode("utf-8") - data = LU.json_to_dict(data) - self.process(logger, json.loads(data)) - conn.sendall(b"\n[ECI] processing done.\n") - except Exception as e: - logger.error(SU.red(f"[ECI] Error while processing request: {data}"), e) - - conn.close() - break - - def process(self, logger, data): - """ - Process incoming actions. - """ - if "action" in data: - if data["action"] == SocketControlInterface.ACTION_ON_METADATA: - - def get_field(field): - for item in data["meta"]: - if item[0] == field: - return item[1] - return None - - meta_data = {} - meta_data["duration"] = data["track_duration"] - meta_data["filename"] = get_field("filename") - meta_data["on_air"] = get_field("on_air") - msg = f"[ECI] Exec action: {SocketControlInterface.ACTION_ON_METADATA}" - logger.debug(SU.yellow(msg)) - self.event_dispatcher.on_metadata(meta_data) - msg = f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued" - logger.info(SU.yellow(msg)) - else: - logger.error(SU.red("[ECI] Unknown action: " + data["action"])) - else: - logger.error(SU.red(f"[ECI] Missing action in request: {data}")) - - def terminate(self): - """ - Call when a shutdown signal is received. - """ - SocketControlInterface.instance = None - self.server.close() - self.logger.info(SU.yellow("[ECI] Shutting down...")) - - class EngineExecutor(Timer): """ Base class for timed or threaded execution of Engine commands. -- GitLab