-
David Trattnig authoredDavid Trattnig authored
control.py 9.92 KiB
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import socket
import time
import json
from threading import Thread, Timer
from datetime import datetime, timedelta
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
class EngineControlInterface:
"""
Provides ability to control the engine in various ways.
"""
config = None
logger = None
engine = None
sci = None
def __init__(self, engine):
"""
Constructor
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.sci = SocketControlInterface.get_instance(engine)
self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
def terminate(self):
"""
Terminates the instance and all related objects.
"""
if self.sci: self.sci.terminate()
class SocketControlInterface:
"""
Network socket server to control a running Engine from Liquidsoap.
Note this server only allows a single connection at once. This
service is primarly utilized to store new playlogs.
"""
PORT = 1337
ACTION_ON_METADATA = "on_metadata"
instance = None
config = None
logger = None
server = None
engine = None
def __init__(self, engine):
"""
Constructor
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
if SocketControlInterface.instance:
raise Exception(SU.red("[ECI] Socket server is already running!"))
SocketControlInterface.instance = self
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.engine = engine
host = "127.0.0.1"
thread = Thread(target = self.run, args = (self.logger, host))
thread.start()
@staticmethod
def get_instance(engine):
"""
Returns the Singleton.
"""
if not SocketControlInterface.instance:
SocketControlInterface.instance = SocketControlInterface(engine)
return SocketControlInterface.instance
def attach(self, engine):
"""
Attaches the engine to pass events to.
"""
self.engine = engine
def run(self, logger, host):
"""
Starts the socket server
"""
while(True):
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, SocketControlInterface.PORT))
break
except OSError as e:
wait_time = 2
self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
time.sleep(wait_time)
logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
self.server.listen()
while(True):
(conn, client) = self.server.accept()
while(True):
r = SocketReader(conn)
p = HttpStream(r)
data = p.body_file().read()
logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
try:
self.process(logger, json.loads(data))
conn.sendall(b'\n[ECI] processing done.\n')
except Exception as e:
logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)
conn.close()
break
def process(self, logger, data):
"""
Process incoming actions.
"""
if "action" in data:
if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
meta_data = data["data"]
meta_data["duration"] = data["track_duration"]
logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
self.engine.event_dispatcher.on_metadata(data["data"])
logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
else:
logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
else:
logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
def terminate(self):
SocketControlInterface.instance = None
self.server.close()
self.logger.info(SU.yellow("[ECI] Shutting down..."))
class EngineExecutor(Timer):
"""
Base class for timed or threaded execution of Engine commands.
Primarly used for automation by the scheduler.
"""
logger = logging.getLogger("AuraEngine")
timer_store = {}
child_timer = None
direct_exec = None
timer_id = None
timer_type = None
param = None
diff = None
dt = None
def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None):
"""
Constructor
Args:
timer_type (String): Prefix used for the `timer_id` to make it unique
child_timer (EngineExeuctor): Child action which is bound to this timer
due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passt to the function
"""
from modules.core.engine import Engine
now_unix = Engine.engine_time()
self.child_timer = child_timer
self.direct_exec = False
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
if not due_time:
diff = 0
else:
diff = due_time - now_unix
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
self.func = func
self.param = param
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg))
self.exec_now()
elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
self.exec_timed()
self.start()
self.update_store()
def exec_now(self):
"""
Immediate execution within a thread. It's not stored in the timer store.
"""
self.direct_exec = True
thread = Thread(target = self.func, args = (self.param,))
thread.start()
def exec_timed(self):
"""
Timed execution in a thread.
"""
def wrapper_func(param=None):
# Remove from store
self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
del EngineExecutor.timer_store[self.timer_id]
# Call actual function
if param: self.func(param,)
else: self.func()
Timer.__init__(self, self.diff, wrapper_func, (self.param,))
def update_store(self):
"""
Adds the instance to the store and cancels any previously existing commands.
"""
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
existing_command.cancel()
if existing_command.child_timer:
self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))
EngineExecutor.timer_store[self.timer_id] = self
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
def is_alive(self):
"""
Returns true if the command is still due to be executed.
"""
if self.direct_exec == True:
return False
return super().is_alive()
def __str__(self):
"""
String represenation of the timer.
"""
return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"
@staticmethod
def log_commands():
"""
Prints a list of active timers to the log.
"""
timers = EngineExecutor.timer_store.values()
msg = "\n [ ENGINE COMMAND QUEUE ]\n"
if not timers:
msg += "None available!\n"
else:
for timer in timers:
msg += f" => {str(timer)}\n"
if timer.child_timer:
msg += f" => {str(timer.child_timer)}\n"
EngineExecutor.logger.info(msg + "\n")