control.py 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


26
from threading              import Thread, Timer, Lock
27
28
29
from datetime               import datetime, timedelta
from http_parser.http       import HttpStream
from http_parser.reader     import SocketReader
30

31
32
from src.base.config        import AuraConfig
from src.base.utils         import SimpleUtil as SU
33
34
35
36
37
38
39
40
41
42
43




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
    logger = None   
    engine = None
44
    event_dispatcher = None
45
46
    sci = None

47
    def __init__(self, engine, event_dispatcher):
48
49
50
51
52
53
54
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """    
55
        self.engine = engine
56
        self.config = AuraConfig.config()
57
        self.logger = logging.getLogger("AuraEngine")                  
58
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
59
60
        self.sci = SocketControlInterface.get_instance(event_dispatcher)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
    """    
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
    logger = None    
    server = None
85
    event_dispatcher = None
86
87


88
    def __init__(self, event_dispatcher):
89
90
91
92
93
94
95
96
97
98
99
100
101
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
        
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
102
        self.event_dispatcher = event_dispatcher              
103
104
105
106
107
108
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
        thread.start() 


    @staticmethod
109
    def get_instance(event_dispatcher):
110
111
112
113
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
114
            SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
115
116
117
118
119
120
121
122
        return SocketControlInterface.instance



    def run(self, logger, host):
        """
        Starts the socket server
        """             
David Trattnig's avatar
David Trattnig committed
123
        while True:
124
125
126
127
128
129
130
131
132
133
134
135
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
        self.server.listen()    

David Trattnig's avatar
David Trattnig committed
136
        while True:
137
            (conn, client) = self.server.accept()
David Trattnig's avatar
David Trattnig committed
138
            while True:
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
                r = SocketReader(conn)
                p = HttpStream(r)
                data = p.body_file().read()                
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
                
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

                conn.close()                    
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
        if "action" in data:            
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
160
161
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
162
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
163
                self.event_dispatcher.on_metadata(data["data"])
David Trattnig's avatar
David Trattnig committed
164
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
165
166
167
168
169
170
171
172
173
174
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
    


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
175
176
177
178
179
180
181
182
183
184
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

David Trattnig's avatar
David Trattnig committed
185
    Primarily used for automations performed by the scheduler.
186
    """
187
    _lock = None
188
    logger = logging.getLogger("AuraEngine")    
189
190
    timer_store = {}
    parent_timer = None
191
192
193
194
195
196
197
198
199
    child_timer = None
    direct_exec = None
    timer_id = None
    timer_type = None
    param = None
    diff = None
    dt = None


200
    def __init__(self, timer_type="BASE", parent_timer=None, due_time=None, func=None, param=None):
201
202
203
204
205
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
206
            parent_timer (EngineExeuctor):  Parent action which is a prerequisite for this timer
207
208
209
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
210
211
        """
        self._lock = Lock()
212
        from src.core.engine import Engine
213
214
215
        now_unix = Engine.engine_time()       

        # Init parent-child relation 
216
217
218
        self.parent_timer = parent_timer
        if self.parent_timer:
            self.parent_timer.child_timer = self
219
220
        
        # Init meta data
221
222
223
224
225
226
227
228
229
230
231
232
233
        self.direct_exec = False
        self.timer_type = timer_type       
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

        if not due_time:
            diff = 0
        else:                
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param
234
235
236

        is_stored = self.update_store()
        if not is_stored:
237
            self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
238
        else:            
239
240
241
242
243
244
245
246
247
248
            if diff < 0:
                msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
                self.logger.error(SU.red(msg))
                self.exec_now()
            elif diff == 0:   
                self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
                self.exec_now()
            else:            
                self.exec_timed()
                self.start()
249
250


251
252
253
254
    def wait_for_parent(self):
        """
        Child timers are dependend on their parents. So let's wait until parents are done with their stuff.
        """
255
256
257
258
        if self.parent_timer:
            # Wait a bit to allow any parent to complete initialization, in case child & parent are instantiated at the 'same' time
            # Required to avoid "Thread.__init__() not called" exceptions on the parent
            time.sleep(0.1)
David Trattnig's avatar
David Trattnig committed
259
            while self.parent_timer.is_alive():
260
                self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
David Trattnig's avatar
David Trattnig committed
261
                time.sleep(0.2)
262

263
264
265
266

    def exec_now(self):
        """
        Immediate execution within a thread. It's not stored in the timer store.
David Trattnig's avatar
David Trattnig committed
267
268

        Assigns the `timer_id` as the thread name.
269
270
        """
        self.direct_exec = True
271
        self.wait_for_parent()
David Trattnig's avatar
David Trattnig committed
272
        thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
273
        thread.start()
David Trattnig's avatar
David Trattnig committed
274
       
275
276
277
278

    def exec_timed(self):
        """
        Timed execution in a thread.
David Trattnig's avatar
David Trattnig committed
279
280

        Assigns the `timer_id` as the thread name.
281
282
        """
        def wrapper_func(param=None):
283
284
285
286
            self.wait_for_parent()       
            if param: self.func(param,)
            else: self.func()
        super().__init__(self.diff, wrapper_func, (self.param,))
David Trattnig's avatar
David Trattnig committed
287
        self._name = self.timer_id
288
289
290
291
292


    def update_store(self):
        """
        Adds the instance to the store and cancels any previously existing commands.
293
294
295
296
297
298

        If a timer with the given ID is already existing but also already executed,
        then it is not added to the store. In such case the method returns `False`.

        Returns:
            (Boolean):  True if the timer has been added to the store
299
        """
300
301
302
303
        with self._lock:
            existing_command = None
            if self.timer_id in EngineExecutor.timer_store:
                existing_command = EngineExecutor.timer_store[self.timer_id]
304

305
            if existing_command:
306

307
308
309
310
                # Check if existing timer has been executed already -> don't update
                if not existing_command.is_alive():
                    self.logger.debug(f"Existing dead timer with ID: {self.timer_id}. Don't update.")
                    return False
311

312
313
314
315
316
317
                # Still waiting for execution -> update
                else:                  
                    self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
                    existing_command.cancel()                        
                    if existing_command.child_timer:
                        self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")                    
318

319
320
321
            EngineExecutor.timer_store[self.timer_id] = self
            self.logger.debug(f"Created command timer with ID: {self.timer_id}")
            return True
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339


    def is_alive(self):
        """
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
        return super().is_alive()


    def __str__(self):
        """
        String represenation of the timer.
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


David Trattnig's avatar
David Trattnig committed
340
341
342
    @staticmethod
    def remove_stale_timers():
        """
343
        Removes timers from store which have been executed and are older than 3 hours.
David Trattnig's avatar
David Trattnig committed
344
345
346
347
348
        """
        timers = EngineExecutor.timer_store.values()
        del_keys = []

        for timer in timers:
349
            if timer.dt < datetime.now() - timedelta(hours=3):        
David Trattnig's avatar
David Trattnig committed
350
351
352
353
354
                if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
                    timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
                    del_keys.append(timer.timer_id)

        for timer_id in del_keys:
355
            del EngineExecutor.timer_store[timer_id]
David Trattnig's avatar
David Trattnig committed
356
357


358
359
360
    @staticmethod
    def log_commands():
        """
361
        Prints a list of recent active and inactive timers.
362
        """
363
        msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n")
David Trattnig's avatar
David Trattnig committed
364
        EngineExecutor.remove_stale_timers()
365
        timers = EngineExecutor.timer_store.values()
366
367

        if not timers:
368
            msg += "\nNone available!\n"
369
370
        else:
            for timer in timers:
371
                if not timer.parent_timer:
372
373
374
375
                    line = f"      =>   {str(timer)}\n"#
                    if timer.is_alive():
                        line = SU.green(line)
                    msg += line
376
                if timer.child_timer:
377
378
379
380
                    line = f"            =>   {str(timer.child_timer)}\n"
                    if timer.child_timer.is_alive():
                        line = SU.green(line)
                    msg += line
381
382

        EngineExecutor.logger.info(msg + "\n")