control.py 12.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


26
from threading              import Thread, Timer, Lock
27
28
29
from datetime               import datetime, timedelta
from http_parser.http       import HttpStream
from http_parser.reader     import SocketReader
30

31
32
from src.base.config        import AuraConfig
from src.base.utils         import SimpleUtil as SU
33
34
35
36
37
38
39
40
41




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
42
    logger = None
43
    engine = None
44
    event_dispatcher = None
45
46
    sci = None

47
    def __init__(self, engine, event_dispatcher):
48
49
50
51
52
53
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
54
        """
55
        self.engine = engine
56
        self.config = AuraConfig.config()
57
        self.logger = logging.getLogger("AuraEngine")
58
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
59
60
        self.sci = SocketControlInterface.get_instance(event_dispatcher)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
77
    """
78
79
80
81
82
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
83
    logger = None
84
    server = None
85
    event_dispatcher = None
86
87


88
    def __init__(self, event_dispatcher):
89
90
91
92
93
94
95
96
97
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
98

99
100
101
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
102
        self.event_dispatcher = event_dispatcher
103
104
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
105
        thread.start()
106
107
108


    @staticmethod
109
    def get_instance(event_dispatcher):
110
111
112
113
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
114
            SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
115
116
117
118
119
120
121
        return SocketControlInterface.instance



    def run(self, logger, host):
        """
        Starts the socket server
122
        """
David Trattnig's avatar
David Trattnig committed
123
        while True:
124
125
126
127
128
129
130
131
132
133
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
134
        self.server.listen()
135

David Trattnig's avatar
David Trattnig committed
136
        while True:
137
            (conn, client) = self.server.accept()
David Trattnig's avatar
David Trattnig committed
138
            while True:
139
140
                r = SocketReader(conn)
                p = HttpStream(r)
141
                data = p.body_file().read()
142
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
143

144
145
146
147
148
149
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

150
                conn.close()
151
152
153
154
155
156
157
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
158
        if "action" in data:
159
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
160
161
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
162
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
163
                self.event_dispatcher.on_metadata(data["data"])
David Trattnig's avatar
David Trattnig committed
164
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
165
166
167
168
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
169

170
171
172
173
174


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
175
176
177
178
179
180
181
182
183
184
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

David Trattnig's avatar
David Trattnig committed
185
    Primarily used for automations performed by the scheduler.
186
    """
David Trattnig's avatar
David Trattnig committed
187
    timer_store: dict = {}
188
    logger = logging.getLogger("AuraEngine")
David Trattnig's avatar
David Trattnig committed
189
190
191
192
193
194
195
196
197
    EVENT_ON_READY = "on_ready"

    _lock = None
    direct_exec: bool = None
    parent_timer: Timer = None
    child_timer: Timer = None
    timer_id: str = None
    timer_type: str = None
    func = None
198
199
200
201
202
    param = None
    diff = None
    dt = None


David Trattnig's avatar
David Trattnig committed
203
    def __init__(self, timer_type:str="BASE", parent_timer:Timer=None, due_time=None, func=None, param=None):
204
205
206
207
208
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
209
            parent_timer (EngineExeuctor):  Parent action which is a prerequisite for this timer
210
211
212
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
213
214
        """
        self._lock = Lock()
David Trattnig's avatar
David Trattnig committed
215
        from src.engine import Engine
216
        now_unix = Engine.engine_time()
217

218
        # Init parent-child relation
219
220
221
        self.parent_timer = parent_timer
        if self.parent_timer:
            self.parent_timer.child_timer = self
222

223
        # Init meta data
224
        self.direct_exec = False
225
        self.timer_type = timer_type
226
227
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

David Trattnig's avatar
David Trattnig committed
228
229
        diff = 0
        if due_time:
230
231
232
233
234
235
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param
236
237
238

        is_stored = self.update_store()
        if not is_stored:
239
            self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
240
        else:
241
242
            if diff < 0:
                msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
David Trattnig's avatar
David Trattnig committed
243
                self.logger.warn(SU.yellow(msg))
244
                self.exec_now()
245
            elif diff == 0:
David Trattnig's avatar
David Trattnig committed
246
                self.logger.debug(f"Timer '{self.timer_id}' to be executed immediately")
247
                self.exec_now()
248
            else:
David Trattnig's avatar
David Trattnig committed
249
                self.logger.debug(f"Timer '{self.timer_id}' to be executed in default manner")
250
                self.exec_timed()
251
252


253
254
    def wait_for_parent(self):
        """
David Trattnig's avatar
David Trattnig committed
255
256
257
258
        @private

        Child timers are dependend on their parents. So let's wait until parents are done with their stuff => finished execution.
        Checks the parent state to be finished every 0.2 seconds.
259
        """
260
        if self.parent_timer:
David Trattnig's avatar
David Trattnig committed
261
            while self.parent_timer.is_alive():
262
                self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
David Trattnig's avatar
David Trattnig committed
263
                time.sleep(0.2)
264

265
266
267

    def exec_now(self):
        """
David Trattnig's avatar
David Trattnig committed
268
269
        @private

270
        Immediate execution within a thread. It's not stored in the timer store.
David Trattnig's avatar
David Trattnig committed
271

David Trattnig's avatar
David Trattnig committed
272
        It also assigns the `timer_id` as the thread name.
273
274
        """
        self.direct_exec = True
275
        self.wait_for_parent()
David Trattnig's avatar
David Trattnig committed
276
        thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
David Trattnig's avatar
David Trattnig committed
277
        time.sleep(0.2)
278
        thread.start()
279

280
281
282

    def exec_timed(self):
        """
David Trattnig's avatar
David Trattnig committed
283
        @private
David Trattnig's avatar
David Trattnig committed
284

David Trattnig's avatar
David Trattnig committed
285
286
287
288
        Timed execution in a thread. This method instroduces a slight delay to ensure
        the thread is properly initialized before starting it.

        It also assigns the `timer_id` as the thread name.
289
290
        """
        def wrapper_func(param=None):
291
            self.wait_for_parent()
292
293
294
            if param: self.func(param,)
            else: self.func()
        super().__init__(self.diff, wrapper_func, (self.param,))
David Trattnig's avatar
David Trattnig committed
295
        self._name = self.timer_id
David Trattnig's avatar
David Trattnig committed
296
297
        time.sleep(0.2)
        self.start()
298
299
300
301


    def update_store(self):
        """
David Trattnig's avatar
David Trattnig committed
302
303
        @private

304
        Adds the instance to the store and cancels any previously existing commands.
305
306
307
308
309

        If a timer with the given ID is already existing but also already executed,
        then it is not added to the store. In such case the method returns `False`.

        Returns:
David Trattnig's avatar
David Trattnig committed
310
311
            (Boolean):  True if the timer has been added to the store. False if the
                        timer is already existing but dead.
312
        """
313
314
315
316
        with self._lock:
            existing_command = None
            if self.timer_id in EngineExecutor.timer_store:
                existing_command = EngineExecutor.timer_store[self.timer_id]
317

318
            if existing_command:
319

320
321
322
323
                # Check if existing timer has been executed already -> don't update
                if not existing_command.is_alive():
                    self.logger.debug(f"Existing dead timer with ID: {self.timer_id}. Don't update.")
                    return False
324

325
                # Still waiting for execution -> update
326
                else:
327
                    self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
328
                    existing_command.cancel()
329
                    if existing_command.child_timer:
330
                        self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
331

332
333
334
            EngineExecutor.timer_store[self.timer_id] = self
            self.logger.debug(f"Created command timer with ID: {self.timer_id}")
            return True
335
336
337
338


    def is_alive(self):
        """
David Trattnig's avatar
David Trattnig committed
339
340
        @private

341
342
343
344
345
346
347
348
349
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
        return super().is_alive()


    def __str__(self):
        """
David Trattnig's avatar
David Trattnig committed
350
        String representation of the timer.
351
352
353
354
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


David Trattnig's avatar
David Trattnig committed
355
356
357
    @staticmethod
    def remove_stale_timers():
        """
358
        Removes timers from store which have been executed and are older than 3 hours.
David Trattnig's avatar
David Trattnig committed
359
360
361
362
363
        """
        timers = EngineExecutor.timer_store.values()
        del_keys = []

        for timer in timers:
364
            if timer.dt < datetime.now() - timedelta(hours=3):
David Trattnig's avatar
David Trattnig committed
365
366
367
368
369
                if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
                    timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
                    del_keys.append(timer.timer_id)

        for timer_id in del_keys:
370
            del EngineExecutor.timer_store[timer_id]
David Trattnig's avatar
David Trattnig committed
371
372


373
374
375
    @staticmethod
    def log_commands():
        """
David Trattnig's avatar
David Trattnig committed
376
        Prints a list of recent active and inactive timers to the logger.
377
        """
378
        msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n")
David Trattnig's avatar
David Trattnig committed
379
        EngineExecutor.remove_stale_timers()
380
        timers = EngineExecutor.timer_store.values()
381
382

        if not timers:
383
            msg += "\nNone available!\n"
384
385
        else:
            for timer in timers:
386
                if not timer.parent_timer:
David Trattnig's avatar
David Trattnig committed
387
                    line = f"      =>   {str(timer)}\n"
388
389
390
                    if timer.is_alive():
                        line = SU.green(line)
                    msg += line
391
                if timer.child_timer:
392
393
394
395
                    line = f"            =>   {str(timer.child_timer)}\n"
                    if timer.child_timer.is_alive():
                        line = SU.green(line)
                    msg += line
396
397

        EngineExecutor.logger.info(msg + "\n")