Skip to content
Snippets Groups Projects
Commit 08c8105f authored by David Trattnig's avatar David Trattnig
Browse files

refact: remove obsolete code

parent 913bb796
No related branches found
No related tags found
No related merge requests found
...@@ -21,176 +21,15 @@ ...@@ -21,176 +21,15 @@
Remote-control the Engine with these services. Remote-control the Engine with these services.
""" """
import json
import logging import logging
import socket
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from threading import Lock, Thread, Timer from threading import Lock, Thread, Timer
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
from aura_engine.base.api import LiquidsoapUtil as LU
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU from aura_engine.base.utils import SimpleUtil as SU
class EngineControlInterface:
"""
Provides ability to control the engine in various ways.
"""
config = None
logger = None
engine = None
event_dispatcher = None
sci = None
def __init__(self, engine, event_dispatcher):
"""
Initialize the ECI.
"""
self.engine = engine
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
if self.config.get("enable_sci", "false") == "true":
self.logger.info(SU.yellow("[ECI] Socket Control Interface starting ..."))
self.sci = SocketControlInterface.get_instance(event_dispatcher)
else:
self.logger.debug(SU.yellow("[ECI] Socket Control Interface disabled"))
def terminate(self):
"""
Terminate the instance and all related objects.
"""
if self.sci:
self.sci.terminate()
self.logger.info(SU.yellow("[ECI] terminated."))
class SocketControlInterface:
"""
Network socket server to control a running Engine from Liquidsoap.
Note this server only allows a single connection at once. This
service is primarily utilized to store new playlogs.
"""
DEFAULT_CONTROL_HOST = "0.0.0.0:1337"
ACTION_ON_METADATA = "on_metadata"
instance = None
config = None
logger = None
server = None
event_dispatcher = None
def __init__(self, event_dispatcher):
"""
Constructor.
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
if SocketControlInterface.instance:
raise Exception(SU.red("[ECI] Socket server is already running!"))
SocketControlInterface.instance = self
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.event_dispatcher = event_dispatcher
default_host = SocketControlInterface.DEFAULT_CONTROL_HOST
url_parts = self.config.get("api_engine_control_host", default_host).split(":")
host = url_parts[0]
port = int(url_parts[1])
thread = Thread(target=self.run, args=(self.logger, host, port))
thread.start()
@staticmethod
def get_instance(event_dispatcher):
"""
Return the Singleton.
"""
if not SocketControlInterface.instance:
SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
return SocketControlInterface.instance
def run(self, logger, host, port):
"""
Start the socket server.
"""
while True:
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, port))
break
except OSError:
wait_time = 2
msg = f"Cannot bind to Socket. Retry in {wait_time} seconds..."
self.logger.error(SU.red(msg))
time.sleep(wait_time)
logger.info(SU.yellow(f"[ECI] Listening at {host}:{port}"))
self.server.listen()
while True:
(conn, client) = self.server.accept()
while True:
r = SocketReader(conn)
p = HttpStream(r)
data = p.body_file().read()
msg = f"[ECI] Received socket data from {str(client)}: {str(data)}"
logger.debug(SU.yellow(msg))
try:
data = data.decode("utf-8")
data = LU.json_to_dict(data)
self.process(logger, json.loads(data))
conn.sendall(b"\n[ECI] processing done.\n")
except Exception as e:
logger.error(SU.red(f"[ECI] Error while processing request: {data}"), e)
conn.close()
break
def process(self, logger, data):
"""
Process incoming actions.
"""
if "action" in data:
if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
def get_field(field):
for item in data["meta"]:
if item[0] == field:
return item[1]
return None
meta_data = {}
meta_data["duration"] = data["track_duration"]
meta_data["filename"] = get_field("filename")
meta_data["on_air"] = get_field("on_air")
msg = f"[ECI] Exec action: {SocketControlInterface.ACTION_ON_METADATA}"
logger.debug(SU.yellow(msg))
self.event_dispatcher.on_metadata(meta_data)
msg = f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued"
logger.info(SU.yellow(msg))
else:
logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
else:
logger.error(SU.red(f"[ECI] Missing action in request: {data}"))
def terminate(self):
"""
Call when a shutdown signal is received.
"""
SocketControlInterface.instance = None
self.server.close()
self.logger.info(SU.yellow("[ECI] Shutting down..."))
class EngineExecutor(Timer): class EngineExecutor(Timer):
""" """
Base class for timed or threaded execution of Engine commands. Base class for timed or threaded execution of Engine commands.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment