control.py 5.44 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


from threading import Thread
from http_parser.http import HttpStream
from http_parser.reader import SocketReader

from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
    logger = None   
    engine = None
    sci = None

    def __init__(self, engine):
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """    
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")  
        self.sci = SocketControlInterface.get_instance(engine)
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
    """    
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
    logger = None    
    server = None
    engine = None


    def __init__(self, engine):
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
        
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
        self.engine = engine              
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
        thread.start() 


    @staticmethod
    def get_instance(engine):
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
            SocketControlInterface.instance = SocketControlInterface(engine)
        return SocketControlInterface.instance


    def attach(self, engine):
        """
        Attaches the engine to pass events to.
        """
        self.engine = engine


    def run(self, logger, host):
        """
        Starts the socket server
        """             
        while(True):       
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
        self.server.listen()    

        while(True):
            (conn, client) = self.server.accept()

            while(True):
                r = SocketReader(conn)
                p = HttpStream(r)
                data = p.body_file().read()                
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
                
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

                conn.close()                    
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
        if "action" in data:            
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
163
164
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
165
166
167
168
169
170
171
172
173
174
175
176
177
178
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
                self.engine.event_dispatcher.on_metadata(data["data"])
                logger.info(SU.yellow(f"[ECI] Successfully issued event '{SocketControlInterface.ACTION_ON_METADATA}'"))
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
    


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
        self.logger.info(SU.yellow("[ECI] Shutting down..."))