control.py 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


26
from threading              import Thread, Timer, Lock
27
28
29
from datetime               import datetime, timedelta
from http_parser.http       import HttpStream
from http_parser.reader     import SocketReader
30

31
32
from src.base.config        import AuraConfig
from src.base.utils         import SimpleUtil as SU
33
34
35
36
37
38
39
40
41




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
42
    logger = None
43
    engine = None
44
    event_dispatcher = None
45
46
    sci = None

47
    def __init__(self, engine, event_dispatcher):
48
49
50
51
52
53
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
54
        """
55
        self.engine = engine
56
        self.config = AuraConfig.config()
57
        self.logger = logging.getLogger("AuraEngine")
58
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
59
60
        self.sci = SocketControlInterface.get_instance(event_dispatcher)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
77
    """
78
79
80
81
82
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
83
    logger = None
84
    server = None
85
    event_dispatcher = None
86
87


88
    def __init__(self, event_dispatcher):
89
90
91
92
93
94
95
96
97
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
98

99
100
101
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
102
        self.event_dispatcher = event_dispatcher
103
104
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
105
        thread.start()
106
107
108


    @staticmethod
109
    def get_instance(event_dispatcher):
110
111
112
113
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
114
            SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
115
116
117
118
119
120
121
        return SocketControlInterface.instance



    def run(self, logger, host):
        """
        Starts the socket server
122
        """
David Trattnig's avatar
David Trattnig committed
123
        while True:
124
125
126
127
128
129
130
131
132
133
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
134
        self.server.listen()
135

David Trattnig's avatar
David Trattnig committed
136
        while True:
137
            (conn, client) = self.server.accept()
David Trattnig's avatar
David Trattnig committed
138
            while True:
139
140
                r = SocketReader(conn)
                p = HttpStream(r)
141
                data = p.body_file().read()
142
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
143

144
145
146
147
148
149
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

150
                conn.close()
151
152
153
154
155
156
157
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
158
        if "action" in data:
159
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
160
161
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
162
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
163
                self.event_dispatcher.on_metadata(data["data"])
David Trattnig's avatar
David Trattnig committed
164
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
165
166
167
168
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
169

170
171
172
173
174


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
175
176
177
178
179
180
181
182
183
184
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

David Trattnig's avatar
David Trattnig committed
185
    Primarily used for automations performed by the scheduler.
186
    """
187
    _lock = None
188
189
    logger = logging.getLogger("AuraEngine")
    initialized = None
190
191
    timer_store = {}
    parent_timer = None
192
193
194
195
196
197
198
199
200
    child_timer = None
    direct_exec = None
    timer_id = None
    timer_type = None
    param = None
    diff = None
    dt = None


201
    def __init__(self, timer_type="BASE", parent_timer=None, due_time=None, func=None, param=None):
202
203
204
205
206
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
207
            parent_timer (EngineExeuctor):  Parent action which is a prerequisite for this timer
208
209
210
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
211
        """
212
        self.initialized = False
213
        self._lock = Lock()
David Trattnig's avatar
David Trattnig committed
214
        from src.engine import Engine
215
        now_unix = Engine.engine_time()
216

217
        # Init parent-child relation
218
219
220
        self.parent_timer = parent_timer
        if self.parent_timer:
            self.parent_timer.child_timer = self
221

222
        # Init meta data
223
        self.direct_exec = False
224
        self.timer_type = timer_type
225
226
227
228
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

        if not due_time:
            diff = 0
229
        else:
230
231
232
233
234
235
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param
236
237
238

        is_stored = self.update_store()
        if not is_stored:
239
            self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
240
        else:
241
242
243
244
            if diff < 0:
                msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
                self.logger.error(SU.red(msg))
                self.exec_now()
245
            elif diff == 0:
246
247
                self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
                self.exec_now()
248
            else:
249
250
                self.exec_timed()
                self.start()
251
252


253
254
255
256
    def on_ready(self, func):
        """
        Calls the passed function `func` when the timer is ready.
        """
257
258
259
260
261
262
        while self.initialized == False:
            timer.sleep(0.001)
            self.logger.info(SU.orange("Waiting until the EngineExecutor is done with initialization..."))

        if not self.direct_exec: #TODO Evaluate if we should join for direct exec too
            self.join()
263
264
265
        func()


266
267
268
269
    def wait_for_parent(self):
        """
        Child timers are dependend on their parents. So let's wait until parents are done with their stuff.
        """
270
        if self.parent_timer:
David Trattnig's avatar
David Trattnig committed
271
            while self.parent_timer.is_alive():
272
                self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
David Trattnig's avatar
David Trattnig committed
273
                time.sleep(0.2)
274

275
276
277
278

    def exec_now(self):
        """
        Immediate execution within a thread. It's not stored in the timer store.
David Trattnig's avatar
David Trattnig committed
279
280

        Assigns the `timer_id` as the thread name.
281
282
        """
        self.direct_exec = True
283
        self.wait_for_parent()
David Trattnig's avatar
David Trattnig committed
284
        thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
285
        thread.start()
286
287
        self.initialized = True

288
289
290
291

    def exec_timed(self):
        """
        Timed execution in a thread.
David Trattnig's avatar
David Trattnig committed
292
293

        Assigns the `timer_id` as the thread name.
294
295
        """
        def wrapper_func(param=None):
296
            self.wait_for_parent()
297
298
299
            if param: self.func(param,)
            else: self.func()
        super().__init__(self.diff, wrapper_func, (self.param,))
David Trattnig's avatar
David Trattnig committed
300
        self._name = self.timer_id
301
        self.initialized = True
302
303
304
305
306


    def update_store(self):
        """
        Adds the instance to the store and cancels any previously existing commands.
307
308
309
310
311
312

        If a timer with the given ID is already existing but also already executed,
        then it is not added to the store. In such case the method returns `False`.

        Returns:
            (Boolean):  True if the timer has been added to the store
313
        """
314
315
316
317
        with self._lock:
            existing_command = None
            if self.timer_id in EngineExecutor.timer_store:
                existing_command = EngineExecutor.timer_store[self.timer_id]
318

319
            if existing_command:
320

321
322
323
324
                # Check if existing timer has been executed already -> don't update
                if not existing_command.is_alive():
                    self.logger.debug(f"Existing dead timer with ID: {self.timer_id}. Don't update.")
                    return False
325

326
                # Still waiting for execution -> update
327
                else:
328
                    self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
329
                    existing_command.cancel()
330
                    if existing_command.child_timer:
331
                        self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
332

333
334
335
            EngineExecutor.timer_store[self.timer_id] = self
            self.logger.debug(f"Created command timer with ID: {self.timer_id}")
            return True
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353


    def is_alive(self):
        """
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
        return super().is_alive()


    def __str__(self):
        """
        String represenation of the timer.
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


David Trattnig's avatar
David Trattnig committed
354
355
356
    @staticmethod
    def remove_stale_timers():
        """
357
        Removes timers from store which have been executed and are older than 3 hours.
David Trattnig's avatar
David Trattnig committed
358
359
360
361
362
        """
        timers = EngineExecutor.timer_store.values()
        del_keys = []

        for timer in timers:
363
            if timer.dt < datetime.now() - timedelta(hours=3):
David Trattnig's avatar
David Trattnig committed
364
365
366
367
368
                if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
                    timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
                    del_keys.append(timer.timer_id)

        for timer_id in del_keys:
369
            del EngineExecutor.timer_store[timer_id]
David Trattnig's avatar
David Trattnig committed
370
371


372
373
374
    @staticmethod
    def log_commands():
        """
375
        Prints a list of recent active and inactive timers.
376
        """
377
        msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n")
David Trattnig's avatar
David Trattnig committed
378
        EngineExecutor.remove_stale_timers()
379
        timers = EngineExecutor.timer_store.values()
380
381

        if not timers:
382
            msg += "\nNone available!\n"
383
384
        else:
            for timer in timers:
385
                if not timer.parent_timer:
386
387
388
389
                    line = f"      =>   {str(timer)}\n"#
                    if timer.is_alive():
                        line = SU.green(line)
                    msg += line
390
                if timer.child_timer:
391
392
393
394
                    line = f"            =>   {str(timer.child_timer)}\n"
                    if timer.child_timer.is_alive():
                        line = SU.green(line)
                    msg += line
395
396

        EngineExecutor.logger.info(msg + "\n")