control.py 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


26
27
28
29
from threading              import Thread, Timer
from datetime               import datetime, timedelta
from http_parser.http       import HttpStream
from http_parser.reader     import SocketReader
30

31
32
from src.base.config        import AuraConfig
from src.base.utils         import SimpleUtil as SU
33
34
35
36
37
38
39
40
41
42
43




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
    logger = None   
    engine = None
44
    event_dispatcher = None
45
46
    sci = None

47
    def __init__(self, engine, event_dispatcher):
48
49
50
51
52
53
54
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """    
55
        self.engine = engine
56
        self.config = AuraConfig.config()
57
        self.logger = logging.getLogger("AuraEngine")                  
58
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
59
60
        self.sci = SocketControlInterface.get_instance(event_dispatcher)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
    """    
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
    logger = None    
    server = None
85
    event_dispatcher = None
86
87


88
    def __init__(self, event_dispatcher):
89
90
91
92
93
94
95
96
97
98
99
100
101
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
        
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
102
        self.event_dispatcher = event_dispatcher              
103
104
105
106
107
108
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
        thread.start() 


    @staticmethod
109
    def get_instance(event_dispatcher):
110
111
112
113
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
114
            SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
        return SocketControlInterface.instance



    def run(self, logger, host):
        """
        Starts the socket server
        """             
        while(True):       
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
        self.server.listen()    

        while(True):
            (conn, client) = self.server.accept()

            while(True):
                r = SocketReader(conn)
                p = HttpStream(r)
                data = p.body_file().read()                
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
                
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

                conn.close()                    
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
        if "action" in data:            
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
161
162
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
163
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
164
                self.event_dispatcher.on_metadata(data["data"])
David Trattnig's avatar
David Trattnig committed
165
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
166
167
168
169
170
171
172
173
174
175
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
    


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
176
177
178
179
180
181
182
183
184
185
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

David Trattnig's avatar
David Trattnig committed
186
    Primarily used for automations performed by the scheduler.
187
188
    """
    logger = logging.getLogger("AuraEngine")    
189
190
    timer_store = {}
    parent_timer = None
191
192
193
194
195
196
197
198
199
    child_timer = None
    direct_exec = None
    timer_id = None
    timer_type = None
    param = None
    diff = None
    dt = None


200
    def __init__(self, timer_type="BASE", parent_timer=None, due_time=None, func=None, param=None):
201
202
203
204
205
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
206
            parent_timer (EngineExeuctor):  Parent action which is a prerequisite for this timer
207
208
209
210
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
        """        
211
        from src.core.engine import Engine
212
        now_unix = Engine.engine_time()        
213
214
215
        self.parent_timer = parent_timer
        if self.parent_timer:
            self.parent_timer.child_timer = self
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
        self.direct_exec = False
        self.timer_type = timer_type       
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

        if not due_time:
            diff = 0
        else:                
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param

        if diff < 0:
            msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
            self.logger.error(SU.red(msg))
            self.exec_now()
        elif diff == 0:   
            self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
236
237
            self.exec_now()
        else:            
238
239
240
241
242
243
            self.exec_timed()            
            self.start()

        self.update_store()


244
245
246
247
248
249
250
    def wait_for_parent(self):
        """
        Child timers are dependend on their parents. So let's wait until parents are done with their stuff.
        """
        if self.parent_timer and self.parent_timer.is_alive():
            while(self.parent_timer.is_alive()):
                self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
David Trattnig's avatar
David Trattnig committed
251
                time.sleep(0.1)
252

253
254
255
256
257
258

    def exec_now(self):
        """
        Immediate execution within a thread. It's not stored in the timer store.
        """
        self.direct_exec = True
259
        self.wait_for_parent()
260
261
262
263
264
265
266
267
268
        thread = Thread(target = self.func, args = (self.param,))
        thread.start()


    def exec_timed(self):
        """
        Timed execution in a thread.
        """
        def wrapper_func(param=None):
269
            self.wait_for_parent()
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326

            # Remove from store
            self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
            del EngineExecutor.timer_store[self.timer_id]          

            # Call actual function
            if param: self.func(param,) 
            else: self.func()

        Timer.__init__(self, self.diff, wrapper_func, (self.param,))


    def update_store(self):
        """
        Adds the instance to the store and cancels any previously existing commands.
        """
        existing_command = None
        if self.timer_id in EngineExecutor.timer_store:
            existing_command = EngineExecutor.timer_store[self.timer_id]    
        if existing_command:
            self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
            existing_command.cancel()
            if existing_command.child_timer:
                self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))

        EngineExecutor.timer_store[self.timer_id] = self
        self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))


    def is_alive(self):
        """
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
        return super().is_alive()


    def __str__(self):
        """
        String represenation of the timer.
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


    @staticmethod
    def log_commands():
        """
        Prints a list of active timers to the log.
        """
        timers = EngineExecutor.timer_store.values()                
        msg = "\n [ ENGINE COMMAND QUEUE ]\n"

        if not timers:
            msg += "None available!\n"
        else:
            for timer in timers:
327
328
                if not timer.parent_timer:
                    msg += f"      =>   {str(timer)}\n"
329
330
331
332
333
334
335
                if timer.child_timer:
                    msg += f"            =>   {str(timer.child_timer)}\n"

        EngineExecutor.logger.info(msg + "\n")