Skip to content
Snippets Groups Projects
control.py 9.92 KiB
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


from threading                  import Thread, Timer
from datetime                   import datetime, timedelta
from http_parser.http           import HttpStream
from http_parser.reader         import SocketReader

from modules.base.config        import AuraConfig
from modules.base.utils         import SimpleUtil as SU




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
    logger = None   
    engine = None
    sci = None

    def __init__(self, engine):
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """    
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")  
        self.sci = SocketControlInterface.get_instance(engine)
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
    """    
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
    logger = None    
    server = None
    engine = None


    def __init__(self, engine):
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
        
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
        self.engine = engine              
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
        thread.start() 


    @staticmethod
    def get_instance(engine):
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
            SocketControlInterface.instance = SocketControlInterface(engine)
        return SocketControlInterface.instance


    def attach(self, engine):
        """
        Attaches the engine to pass events to.
        """
        self.engine = engine


    def run(self, logger, host):
        """
        Starts the socket server
        """             
        while(True):       
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
        self.server.listen()    

        while(True):
            (conn, client) = self.server.accept()

            while(True):
                r = SocketReader(conn)
                p = HttpStream(r)
                data = p.body_file().read()                
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
                
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

                conn.close()                    
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
        if "action" in data:            
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
                self.engine.event_dispatcher.on_metadata(data["data"])
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
    


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

    Primarly used for automation by the scheduler.
    """
    logger = logging.getLogger("AuraEngine")    
    timer_store = {}    
    child_timer = None
    direct_exec = None
    timer_id = None
    timer_type = None
    param = None
    diff = None
    dt = None


    def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None):
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
            child_timer (EngineExeuctor):   Child action which is bound to this timer
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
        """        
        from modules.core.engine import Engine
        now_unix = Engine.engine_time()        
        self.child_timer = child_timer
        self.direct_exec = False
        self.timer_type = timer_type       
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

        if not due_time:
            diff = 0
        else:                
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param

        if diff < 0:
            msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
            self.logger.error(SU.red(msg))
            self.exec_now()
        elif diff == 0:   
            self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
            self.exec_now()                    
        else:
            self.exec_timed()            
            self.start()

        self.update_store()



    def exec_now(self):
        """
        Immediate execution within a thread. It's not stored in the timer store.
        """
        self.direct_exec = True
        thread = Thread(target = self.func, args = (self.param,))
        thread.start()



    def exec_timed(self):
        """
        Timed execution in a thread.
        """
        def wrapper_func(param=None):

            # Remove from store
            self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
            del EngineExecutor.timer_store[self.timer_id]          

            # Call actual function
            if param: self.func(param,) 
            else: self.func()

        Timer.__init__(self, self.diff, wrapper_func, (self.param,))


    def update_store(self):
        """
        Adds the instance to the store and cancels any previously existing commands.
        """
        existing_command = None
        if self.timer_id in EngineExecutor.timer_store:
            existing_command = EngineExecutor.timer_store[self.timer_id]    
        if existing_command:
            self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
            existing_command.cancel()
            if existing_command.child_timer:
                self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))

        EngineExecutor.timer_store[self.timer_id] = self
        self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))


    def is_alive(self):
        """
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
        return super().is_alive()


    def __str__(self):
        """
        String represenation of the timer.
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


    @staticmethod
    def log_commands():
        """
        Prints a list of active timers to the log.
        """
        timers = EngineExecutor.timer_store.values()                
        msg = "\n [ ENGINE COMMAND QUEUE ]\n"

        if not timers:
            msg += "None available!\n"
        else:
            for timer in timers:
                msg += f"      =>   {str(timer)}\n"
                if timer.child_timer:
                    msg += f"            =>   {str(timer.child_timer)}\n"

        EngineExecutor.logger.info(msg + "\n")