control.py 13 KB
Newer Older
1
2
3
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
4
# Copyright (C) 2017-now() - The Aura Engine Team.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


26
from threading              import Thread, Timer, Lock
27
28
29
from datetime               import datetime, timedelta
from http_parser.http       import HttpStream
from http_parser.reader     import SocketReader
30

31
32
from src.base.config        import AuraConfig
from src.base.utils         import SimpleUtil as SU
33
34
35
36
37
38
39
40
41




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
42
    logger = None
43
    engine = None
44
    event_dispatcher = None
45
46
    sci = None

47
    def __init__(self, engine, event_dispatcher):
48
49
50
51
52
53
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
54
        """
55
        self.engine = engine
56
        self.config = AuraConfig.config()
57
        self.logger = logging.getLogger("AuraEngine")
58
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
59
60
        self.sci = SocketControlInterface.get_instance(event_dispatcher)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
Lars Kruse's avatar
Lars Kruse committed
76
    service is primarily utilized to store new playlogs.
77
    """
78
79
80
81
82
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
83
    logger = None
84
    server = None
85
    event_dispatcher = None
86
87


88
    def __init__(self, event_dispatcher):
89
90
91
92
93
94
95
96
97
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
98

99
100
101
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
102
        self.event_dispatcher = event_dispatcher
103
104
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
105
        thread.start()
106
107
108


    @staticmethod
109
    def get_instance(event_dispatcher):
110
111
112
113
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
114
            SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
115
116
117
118
119
120
121
        return SocketControlInterface.instance



    def run(self, logger, host):
        """
        Starts the socket server
122
        """
David Trattnig's avatar
David Trattnig committed
123
        while True:
124
125
126
127
128
129
130
131
132
133
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
134
        self.server.listen()
135

David Trattnig's avatar
David Trattnig committed
136
        while True:
137
            (conn, client) = self.server.accept()
David Trattnig's avatar
David Trattnig committed
138
            while True:
139
140
                r = SocketReader(conn)
                p = HttpStream(r)
141
                data = p.body_file().read()
142
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
143

144
145
146
147
148
149
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

150
                conn.close()
151
152
153
154
155
156
157
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
158
        if "action" in data:
159
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
160
161
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
162
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
163
                self.event_dispatcher.on_metadata(data["data"])
David Trattnig's avatar
David Trattnig committed
164
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
165
166
167
168
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
169

170
171
172
173
174


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
175
176
177
178
179
180
181
182
183
184
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

David Trattnig's avatar
David Trattnig committed
185
    Primarily used for automations performed by the scheduler.
186
    """
David Trattnig's avatar
David Trattnig committed
187
    timer_store: dict = {}
188
    logger = logging.getLogger("AuraEngine")
David Trattnig's avatar
David Trattnig committed
189
190
191

    _lock = None
    direct_exec: bool = None
192
    is_aborted: bool = None
David Trattnig's avatar
David Trattnig committed
193
194
195
196
    parent_timer: Timer = None
    child_timer: Timer = None
    timer_id: str = None
    timer_type: str = None
197
    func: func = None
198
199
200
201
202
    param = None
    diff = None
    dt = None


David Trattnig's avatar
David Trattnig committed
203
    def __init__(self, timer_type:str="BASE", parent_timer:Timer=None, due_time=None, func=None, param=None):
204
205
206
207
208
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
209
            parent_timer (EngineExeuctor):  Parent action which is a prerequisite for this timer
210
211
212
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
213
214
        """
        self._lock = Lock()
David Trattnig's avatar
David Trattnig committed
215
        from src.engine import Engine
216
        now_unix = Engine.engine_time()
217

218
        # Init parent-child relation
219
220
221
        self.parent_timer = parent_timer
        if self.parent_timer:
            self.parent_timer.child_timer = self
222

223
        # Init meta data
224
        self.direct_exec = False
225
        self.is_aborted = False
226
        self.timer_type = timer_type
227
228
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

David Trattnig's avatar
David Trattnig committed
229
230
        diff = 0
        if due_time:
231
232
233
234
235
236
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param
237
238
239

        is_stored = self.update_store()
        if not is_stored:
240
241
            self.logger.info(SU.yellow(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
            self.is_aborted = True
242
        else:
243
244
            if diff < 0:
                msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
245
                self.logger.warning(SU.yellow(msg))
246
                self.exec_now()
247
            elif diff == 0:
David Trattnig's avatar
David Trattnig committed
248
                self.logger.debug(f"Timer '{self.timer_id}' to be executed immediately")
249
                self.exec_now()
250
            else:
David Trattnig's avatar
David Trattnig committed
251
                self.logger.debug(f"Timer '{self.timer_id}' to be executed in default manner")
252
                self.exec_timed()
253
254


255
256
    def wait_for_parent(self):
        """
David Trattnig's avatar
David Trattnig committed
257
258
        @private

Lars Kruse's avatar
Lars Kruse committed
259
        Child timers are dependent on their parents. So let's wait until parents are done with their stuff => finished execution.
David Trattnig's avatar
David Trattnig committed
260
        Checks the parent state to be finished every 0.2 seconds.
261
        """
262
        if self.parent_timer:
David Trattnig's avatar
David Trattnig committed
263
            while self.parent_timer.is_alive():
264
                self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
David Trattnig's avatar
David Trattnig committed
265
                time.sleep(0.2)
266

267
268
269

    def exec_now(self):
        """
David Trattnig's avatar
David Trattnig committed
270
271
        @private

272
        Immediate execution within a thread. It's not stored in the timer store.
David Trattnig's avatar
David Trattnig committed
273

David Trattnig's avatar
David Trattnig committed
274
        It also assigns the `timer_id` as the thread name.
275
276
        """
        self.direct_exec = True
277
        self.wait_for_parent()
David Trattnig's avatar
David Trattnig committed
278
        thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
279
        thread.start()
280

281
282
283

    def exec_timed(self):
        """
David Trattnig's avatar
David Trattnig committed
284
        @private
David Trattnig's avatar
David Trattnig committed
285

David Trattnig's avatar
David Trattnig committed
286
287
288
289
        Timed execution in a thread. This method instroduces a slight delay to ensure
        the thread is properly initialized before starting it.

        It also assigns the `timer_id` as the thread name.
290
291
        """
        def wrapper_func(param=None):
292
            self.wait_for_parent()
293
294
295
            if param: self.func(param,)
            else: self.func()
        super().__init__(self.diff, wrapper_func, (self.param,))
David Trattnig's avatar
David Trattnig committed
296
        self._name = self.timer_id
David Trattnig's avatar
David Trattnig committed
297
        self.start()
298
299
300
301


    def update_store(self):
        """
David Trattnig's avatar
David Trattnig committed
302
303
        @private

304
        Adds the instance to the store and cancels any previously existing commands.
305
306
307
308
309

        If a timer with the given ID is already existing but also already executed,
        then it is not added to the store. In such case the method returns `False`.

        Returns:
David Trattnig's avatar
David Trattnig committed
310
311
            (Boolean):  True if the timer has been added to the store. False if the
                        timer is already existing but dead.
312
        """
313
314
315
316
        with self._lock:
            existing_command = None
            if self.timer_id in EngineExecutor.timer_store:
                existing_command = EngineExecutor.timer_store[self.timer_id]
317

318
            if existing_command:
319

320
321
322
323
                # Check if existing timer has been executed already -> don't update
                if not existing_command.is_alive():
                    self.logger.debug(f"Existing dead timer with ID: {self.timer_id}. Don't update.")
                    return False
324

325
                # Still waiting for execution -> update
326
                else:
327
                    self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
328
                    existing_command.cancel()
329
                    if existing_command.child_timer:
330
                        self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
331

332
333
334
            EngineExecutor.timer_store[self.timer_id] = self
            self.logger.debug(f"Created command timer with ID: {self.timer_id}")
            return True
335
336
337
338


    def is_alive(self):
        """
David Trattnig's avatar
David Trattnig committed
339
340
        @private

341
342
343
344
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
345
346
        if self.is_aborted == True:
            return False
347
348
349
350
351
        return super().is_alive()


    def __str__(self):
        """
David Trattnig's avatar
David Trattnig committed
352
        String representation of the timer.
353
354
355
356
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


David Trattnig's avatar
David Trattnig committed
357
358
359
    @staticmethod
    def remove_stale_timers():
        """
360
        Removes timers from store which have been executed and are older than 3 hours.
David Trattnig's avatar
David Trattnig committed
361
362
363
364
365
        """
        timers = EngineExecutor.timer_store.values()
        del_keys = []

        for timer in timers:
366
            if timer.dt < datetime.now() - timedelta(hours=3):
David Trattnig's avatar
David Trattnig committed
367
368
369
370
371
                if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
                    timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
                    del_keys.append(timer.timer_id)

        for timer_id in del_keys:
372
            del EngineExecutor.timer_store[timer_id]
David Trattnig's avatar
David Trattnig committed
373
374


375
376
377
378
379
380
381
382
    @staticmethod
    def command_history():
        """
        Returns a list of recent active and inactive timers to the logger.
        """
        return EngineExecutor.timer_store.values()


383
384
385
    @staticmethod
    def log_commands():
        """
David Trattnig's avatar
David Trattnig committed
386
        Prints a list of recent active and inactive timers to the logger.
387
        """
388
        msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n")
David Trattnig's avatar
David Trattnig committed
389
        EngineExecutor.remove_stale_timers()
390
        timers = EngineExecutor.timer_store.values()
391
392

        if not timers:
393
            msg += "\nNone available!\n"
394
395
        else:
            for timer in timers:
396
                if not timer.parent_timer:
David Trattnig's avatar
David Trattnig committed
397
                    line = f"      =>   {str(timer)}\n"
398
399
400
                    if timer.is_alive():
                        line = SU.green(line)
                    msg += line
401
                if timer.child_timer:
402
403
404
405
                    line = f"            =>   {str(timer.child_timer)}\n"
                    if timer.child_timer.is_alive():
                        line = SU.green(line)
                    msg += line
406
407

        EngineExecutor.logger.info(msg + "\n")