From 1113af30a8ef2e2953e841cdb9cc6e778667f691 Mon Sep 17 00:00:00 2001 From: David Trattnig <david.trattnig@o94.at> Date: Sun, 25 Oct 2020 16:19:45 +0100 Subject: [PATCH] Ability to control engine via socket. #43 --- modules/core/control.py | 176 ++++++++++++++++++++++++++++++++++++++++ modules/core/engine.py | 10 +-- requirements.txt | 3 +- 3 files changed, 183 insertions(+), 6 deletions(-) create mode 100644 modules/core/control.py diff --git a/modules/core/control.py b/modules/core/control.py new file mode 100644 index 00000000..df265cfe --- /dev/null +++ b/modules/core/control.py @@ -0,0 +1,176 @@ +# +# Aura Engine (https://gitlab.servus.at/aura/engine) +# +# Copyright (C) 2017-2020 - The Aura Engine Team. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +import socket +import time +import json + + +from threading import Thread +from http_parser.http import HttpStream +from http_parser.reader import SocketReader + +from modules.base.config import AuraConfig +from modules.base.utils import SimpleUtil as SU + + + + +class EngineControlInterface: + """ + Provides ability to control the engine in various ways. + """ + config = None + logger = None + engine = None + sci = None + + def __init__(self, engine): + """ + Constructor + + Args: + config (AuraConfig): Engine configuration + logger (AuraLogger): The logger + """ + self.config = AuraConfig.config() + self.logger = logging.getLogger("AuraEngine") + self.sci = SocketControlInterface.get_instance(engine) + self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ...")) + + + def terminate(self): + """ + Terminates the instance and all related objects. + """ + if self.sci: self.sci.terminate() + + + +class SocketControlInterface: + """ + Network socket server to control a running Engine from Liquidsoap. + + Note this server only allows a single connection at once. This + service is primarly utilized to store new playlogs. + """ + PORT = 1337 + ACTION_ON_METADATA = "on_metadata" + + instance = None + config = None + logger = None + server = None + engine = None + + + def __init__(self, engine): + """ + Constructor + + Args: + config (AuraConfig): Engine configuration + logger (AuraLogger): The logger + """ + if SocketControlInterface.instance: + raise Exception(SU.red("[ECI] Socket server is already running!")) + + SocketControlInterface.instance = self + self.config = AuraConfig.config() + self.logger = logging.getLogger("AuraEngine") + self.engine = engine + host = "127.0.0.1" + thread = Thread(target = self.run, args = (self.logger, host)) + thread.start() + + + @staticmethod + def get_instance(engine): + """ + Returns the Singleton. + """ + if not SocketControlInterface.instance: + SocketControlInterface.instance = SocketControlInterface(engine) + return SocketControlInterface.instance + + + def attach(self, engine): + """ + Attaches the engine to pass events to. + """ + self.engine = engine + + + def run(self, logger, host): + """ + Starts the socket server + """ + while(True): + try: + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server.bind((host, SocketControlInterface.PORT)) + break + except OSError as e: + wait_time = 2 + self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds...")) + time.sleep(wait_time) + + logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}')) + self.server.listen() + + while(True): + (conn, client) = self.server.accept() + + while(True): + r = SocketReader(conn) + p = HttpStream(r) + data = p.body_file().read() + logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}')) + + try: + self.process(logger, json.loads(data)) + conn.sendall(b'\n[ECI] processing done.\n') + except Exception as e: + logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e) + + conn.close() + break + + + def process(self, logger, data): + """ + Process incoming actions. + """ + if "action" in data: + if data["action"] == SocketControlInterface.ACTION_ON_METADATA: + logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA)) + self.engine.event_dispatcher.on_metadata(data["data"]) + logger.info(SU.yellow(f"[ECI] Successfully issued event '{SocketControlInterface.ACTION_ON_METADATA}'")) + else: + logger.error(SU.red("[ECI] Unknown action: " + data["action"])) + else: + logger.error(SU.red(f'[ECI] Missing action in request: {data}')) + + + + def terminate(self): + SocketControlInterface.instance = None + self.server.close() + self.logger.info(SU.yellow("[ECI] Shutting down...")) \ No newline at end of file diff --git a/modules/core/engine.py b/modules/core/engine.py index 3a45ca2c..37267c26 100644 --- a/modules/core/engine.py +++ b/modules/core/engine.py @@ -33,7 +33,7 @@ from modules.core.channels import ChannelType, TransitionType, LiquidsoapRe EntryPlayState, ResourceType, ChannelRouter from modules.core.startup import StartupThread from modules.core.events import EngineEventDispatcher -from modules.core.control import SocketControlInterface +from modules.core.control import EngineControlInterface from modules.core.mixer import Mixer, MixerType from modules.core.liquidsoap.connector import PlayerConnector @@ -47,7 +47,7 @@ class Engine(): engine_time_offset = 0.0 logger = None - sci = None + eci = None channels = None channel_router = None scheduler = None @@ -71,8 +71,8 @@ class Engine(): self.config = config self.plugins = dict() self.logger = logging.getLogger("AuraEngine") - # self.sci = SocketControlInterface.get_instance(self.config, self.logger) - # self.sci.attach(self) + self.eci = EngineControlInterface(self) + self.is_active() # TODO Check if it makes sense to move it to the boot-phase self.channel_router = ChannelRouter(self.config, self.logger) @@ -182,7 +182,7 @@ class Engine(): """ Terminates the engine and all related processes. """ - if self.sci: self.sci.terminate() + if self.eci: self.eci.terminate() diff --git a/requirements.txt b/requirements.txt index 9d605fd0..9f0c0534 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ Flask_SQLAlchemy==2.4.3 mysqlclient==1.3.12 redis==3.5.3 validators==0.12.1 -accessify==0.3.1 \ No newline at end of file +accessify==0.3.1 +http-parser==0.9.0 \ No newline at end of file -- GitLab