Commit 1113af30 authored by David Trattnig's avatar David Trattnig
Browse files

Ability to control engine via socket. #43

parent 6d4d8418
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import socket
import time
import json
from threading import Thread
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
class EngineControlInterface:
"""
Provides ability to control the engine in various ways.
"""
config = None
logger = None
engine = None
sci = None
def __init__(self, engine):
"""
Constructor
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.sci = SocketControlInterface.get_instance(engine)
self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
def terminate(self):
"""
Terminates the instance and all related objects.
"""
if self.sci: self.sci.terminate()
class SocketControlInterface:
"""
Network socket server to control a running Engine from Liquidsoap.
Note this server only allows a single connection at once. This
service is primarly utilized to store new playlogs.
"""
PORT = 1337
ACTION_ON_METADATA = "on_metadata"
instance = None
config = None
logger = None
server = None
engine = None
def __init__(self, engine):
"""
Constructor
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
if SocketControlInterface.instance:
raise Exception(SU.red("[ECI] Socket server is already running!"))
SocketControlInterface.instance = self
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.engine = engine
host = "127.0.0.1"
thread = Thread(target = self.run, args = (self.logger, host))
thread.start()
@staticmethod
def get_instance(engine):
"""
Returns the Singleton.
"""
if not SocketControlInterface.instance:
SocketControlInterface.instance = SocketControlInterface(engine)
return SocketControlInterface.instance
def attach(self, engine):
"""
Attaches the engine to pass events to.
"""
self.engine = engine
def run(self, logger, host):
"""
Starts the socket server
"""
while(True):
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, SocketControlInterface.PORT))
break
except OSError as e:
wait_time = 2
self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
time.sleep(wait_time)
logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
self.server.listen()
while(True):
(conn, client) = self.server.accept()
while(True):
r = SocketReader(conn)
p = HttpStream(r)
data = p.body_file().read()
logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
try:
self.process(logger, json.loads(data))
conn.sendall(b'\n[ECI] processing done.\n')
except Exception as e:
logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)
conn.close()
break
def process(self, logger, data):
"""
Process incoming actions.
"""
if "action" in data:
if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
self.engine.event_dispatcher.on_metadata(data["data"])
logger.info(SU.yellow(f"[ECI] Successfully issued event '{SocketControlInterface.ACTION_ON_METADATA}'"))
else:
logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
else:
logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
def terminate(self):
SocketControlInterface.instance = None
self.server.close()
self.logger.info(SU.yellow("[ECI] Shutting down..."))
\ No newline at end of file
......@@ -33,7 +33,7 @@ from modules.core.channels import ChannelType, TransitionType, LiquidsoapRe
EntryPlayState, ResourceType, ChannelRouter
from modules.core.startup import StartupThread
from modules.core.events import EngineEventDispatcher
from modules.core.control import SocketControlInterface
from modules.core.control import EngineControlInterface
from modules.core.mixer import Mixer, MixerType
from modules.core.liquidsoap.connector import PlayerConnector
......@@ -47,7 +47,7 @@ class Engine():
engine_time_offset = 0.0
logger = None
sci = None
eci = None
channels = None
channel_router = None
scheduler = None
......@@ -71,8 +71,8 @@ class Engine():
self.config = config
self.plugins = dict()
self.logger = logging.getLogger("AuraEngine")
# self.sci = SocketControlInterface.get_instance(self.config, self.logger)
# self.sci.attach(self)
self.eci = EngineControlInterface(self)
self.is_active() # TODO Check if it makes sense to move it to the boot-phase
self.channel_router = ChannelRouter(self.config, self.logger)
......@@ -182,7 +182,7 @@ class Engine():
"""
Terminates the engine and all related processes.
"""
if self.sci: self.sci.terminate()
if self.eci: self.eci.terminate()
......
......@@ -5,4 +5,5 @@ Flask_SQLAlchemy==2.4.3
mysqlclient==1.3.12
redis==3.5.3
validators==0.12.1
accessify==0.3.1
\ No newline at end of file
accessify==0.3.1
http-parser==0.9.0
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment