# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import logging import socket import time import json from threading import Thread, Timer from datetime import datetime, timedelta from http_parser.http import HttpStream from http_parser.reader import SocketReader from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU class EngineControlInterface: """ Provides ability to control the engine in various ways. """ config = None logger = None engine = None sci = None def __init__(self, engine): """ Constructor Args: config (AuraConfig): Engine configuration logger (AuraLogger): The logger """ self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") self.sci = SocketControlInterface.get_instance(engine) self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ...")) def terminate(self): """ Terminates the instance and all related objects. """ if self.sci: self.sci.terminate() class SocketControlInterface: """ Network socket server to control a running Engine from Liquidsoap. Note this server only allows a single connection at once. This service is primarly utilized to store new playlogs. """ PORT = 1337 ACTION_ON_METADATA = "on_metadata" instance = None config = None logger = None server = None engine = None def __init__(self, engine): """ Constructor Args: config (AuraConfig): Engine configuration logger (AuraLogger): The logger """ if SocketControlInterface.instance: raise Exception(SU.red("[ECI] Socket server is already running!")) SocketControlInterface.instance = self self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") self.engine = engine host = "127.0.0.1" thread = Thread(target = self.run, args = (self.logger, host)) thread.start() @staticmethod def get_instance(engine): """ Returns the Singleton. """ if not SocketControlInterface.instance: SocketControlInterface.instance = SocketControlInterface(engine) return SocketControlInterface.instance def attach(self, engine): """ Attaches the engine to pass events to. """ self.engine = engine def run(self, logger, host): """ Starts the socket server """ while(True): try: self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((host, SocketControlInterface.PORT)) break except OSError as e: wait_time = 2 self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds...")) time.sleep(wait_time) logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}')) self.server.listen() while(True): (conn, client) = self.server.accept() while(True): r = SocketReader(conn) p = HttpStream(r) data = p.body_file().read() logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}')) try: self.process(logger, json.loads(data)) conn.sendall(b'\n[ECI] processing done.\n') except Exception as e: logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e) conn.close() break def process(self, logger, data): """ Process incoming actions. """ if "action" in data: if data["action"] == SocketControlInterface.ACTION_ON_METADATA: meta_data = data["data"] meta_data["duration"] = data["track_duration"] logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA)) self.engine.event_dispatcher.on_metadata(data["data"]) logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully")) else: logger.error(SU.red("[ECI] Unknown action: " + data["action"])) else: logger.error(SU.red(f'[ECI] Missing action in request: {data}')) def terminate(self): SocketControlInterface.instance = None self.server.close() self.logger.info(SU.yellow("[ECI] Shutting down...")) class EngineExecutor(Timer): """ Base class for timed or threaded execution of Engine commands. Primarly used for automation by the scheduler. """ logger = logging.getLogger("AuraEngine") timer_store = {} child_timer = None direct_exec = None timer_id = None timer_type = None param = None diff = None dt = None def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None): """ Constructor Args: timer_type (String): Prefix used for the `timer_id` to make it unique child_timer (EngineExeuctor): Child action which is bound to this timer due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way func (function): The function to be called param (object): Parameter passt to the function """ from modules.core.engine import Engine now_unix = Engine.engine_time() self.child_timer = child_timer self.direct_exec = False self.timer_type = timer_type self.timer_id = f"{timer_type}:{func.__name__}:{due_time}" if not due_time: diff = 0 else: diff = due_time - now_unix self.diff = diff self.dt = datetime.now() + timedelta(seconds=diff) self.func = func self.param = param if diff < 0: msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..." self.logger.error(SU.red(msg)) self.exec_now() elif diff == 0: self.logger.info(f"Timer '{self.timer_id}' to be executed immediately") self.exec_now() else: self.exec_timed() self.start() self.update_store() def exec_now(self): """ Immediate execution within a thread. It's not stored in the timer store. """ self.direct_exec = True thread = Thread(target = self.func, args = (self.param,)) thread.start() def exec_timed(self): """ Timed execution in a thread. """ def wrapper_func(param=None): # Remove from store self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}")) del EngineExecutor.timer_store[self.timer_id] # Call actual function if param: self.func(param,) else: self.func() Timer.__init__(self, self.diff, wrapper_func, (self.param,)) def update_store(self): """ Adds the instance to the store and cancels any previously existing commands. """ existing_command = None if self.timer_id in EngineExecutor.timer_store: existing_command = EngineExecutor.timer_store[self.timer_id] if existing_command: self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}")) existing_command.cancel() if existing_command.child_timer: self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}")) EngineExecutor.timer_store[self.timer_id] = self self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}")) def is_alive(self): """ Returns true if the command is still due to be executed. """ if self.direct_exec == True: return False return super().is_alive() def __str__(self): """ String represenation of the timer. """ return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})" @staticmethod def log_commands(): """ Prints a list of active timers to the log. """ timers = EngineExecutor.timer_store.values() msg = "\n [ ENGINE COMMAND QUEUE ]\n" if not timers: msg += "None available!\n" else: for timer in timers: msg += f" => {str(timer)}\n" if timer.child_timer: msg += f" => {str(timer.child_timer)}\n" EngineExecutor.logger.info(msg + "\n")