control.py 11.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
import socket
import time
import json


26
27
28
29
from threading              import Thread, Timer
from datetime               import datetime, timedelta
from http_parser.http       import HttpStream
from http_parser.reader     import SocketReader
30

31
32
from src.base.config        import AuraConfig
from src.base.utils         import SimpleUtil as SU
33
34
35
36
37
38
39
40
41
42
43




class EngineControlInterface:
    """
    Provides ability to control the engine in various ways.
    """
    config = None
    logger = None   
    engine = None
44
    event_dispatcher = None
45
46
    sci = None

47
    def __init__(self, engine, event_dispatcher):
48
49
50
51
52
53
54
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """    
55
        self.engine = engine
56
        self.config = AuraConfig.config()
57
        self.logger = logging.getLogger("AuraEngine")                  
58
        self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
59
60
        self.sci = SocketControlInterface.get_instance(event_dispatcher)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84


    def terminate(self):
        """
        Terminates the instance and all related objects.
        """
        if self.sci: self.sci.terminate()



class SocketControlInterface:
    """
    Network socket server to control a running Engine from Liquidsoap.

    Note this server only allows a single connection at once. This
    service is primarly utilized to store new playlogs.
    """    
    PORT = 1337
    ACTION_ON_METADATA = "on_metadata"

    instance = None
    config = None
    logger = None    
    server = None
85
    event_dispatcher = None
86
87


88
    def __init__(self, event_dispatcher):
89
90
91
92
93
94
95
96
97
98
99
100
101
        """
        Constructor

        Args:
            config (AuraConfig):    Engine configuration
            logger (AuraLogger):    The logger
        """
        if SocketControlInterface.instance:
            raise Exception(SU.red("[ECI] Socket server is already running!"))
        
        SocketControlInterface.instance = self
        self.config = AuraConfig.config()
        self.logger = logging.getLogger("AuraEngine")
102
        self.event_dispatcher = event_dispatcher              
103
104
105
106
107
108
        host = "127.0.0.1"
        thread = Thread(target = self.run, args = (self.logger, host))
        thread.start() 


    @staticmethod
109
    def get_instance(event_dispatcher):
110
111
112
113
        """
        Returns the Singleton.
        """
        if not SocketControlInterface.instance:
114
            SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
115
116
117
118
119
120
121
122
        return SocketControlInterface.instance



    def run(self, logger, host):
        """
        Starts the socket server
        """             
David Trattnig's avatar
David Trattnig committed
123
        while True:
124
125
126
127
128
129
130
131
132
133
134
135
            try:
                self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.server.bind((host, SocketControlInterface.PORT))
                break
            except OSError as e:
                wait_time = 2
                self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
                time.sleep(wait_time)

        logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
        self.server.listen()    

David Trattnig's avatar
David Trattnig committed
136
        while True:
137
            (conn, client) = self.server.accept()
David Trattnig's avatar
David Trattnig committed
138
            while True:
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
                r = SocketReader(conn)
                p = HttpStream(r)
                data = p.body_file().read()                
                logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
                
                try:
                    self.process(logger, json.loads(data))
                    conn.sendall(b'\n[ECI] processing done.\n')
                except Exception as e:
                    logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)

                conn.close()                    
                break


    def process(self, logger, data):
        """
        Process incoming actions.
        """
        if "action" in data:            
            if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
160
161
                meta_data = data["data"]
                meta_data["duration"] = data["track_duration"]
162
                logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
163
                self.event_dispatcher.on_metadata(data["data"])
David Trattnig's avatar
David Trattnig committed
164
                logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
165
166
167
168
169
170
171
172
173
174
            else:
                logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
        else:
            logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
    


    def terminate(self):
        SocketControlInterface.instance = None
        self.server.close()
175
176
177
178
179
180
181
182
183
184
        self.logger.info(SU.yellow("[ECI] Shutting down..."))





class EngineExecutor(Timer):
    """
    Base class for timed or threaded execution of Engine commands.

David Trattnig's avatar
David Trattnig committed
185
    Primarily used for automations performed by the scheduler.
186
187
    """
    logger = logging.getLogger("AuraEngine")    
188
189
    timer_store = {}
    parent_timer = None
190
191
192
193
194
195
196
197
198
    child_timer = None
    direct_exec = None
    timer_id = None
    timer_type = None
    param = None
    diff = None
    dt = None


199
    def __init__(self, timer_type="BASE", parent_timer=None, due_time=None, func=None, param=None):
200
201
202
203
204
        """
        Constructor

        Args:
            timer_type (String):            Prefix used for the `timer_id` to make it unique
205
            parent_timer (EngineExeuctor):  Parent action which is a prerequisite for this timer
206
207
208
209
            due_time (Float):               When timer should be executed. For values <= 0 execution happens immediately in a threaded way
            func (function):                The function to be called
            param (object):                 Parameter passt to the function
        """        
210
        from src.core.engine import Engine
211
        now_unix = Engine.engine_time()        
212
213
214
        self.parent_timer = parent_timer
        if self.parent_timer:
            self.parent_timer.child_timer = self
215
216
217
218
219
220
221
222
223
224
225
226
227
        self.direct_exec = False
        self.timer_type = timer_type       
        self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"

        if not due_time:
            diff = 0
        else:                
            diff = due_time - now_unix

        self.diff = diff
        self.dt = datetime.now() + timedelta(seconds=diff)
        self.func = func
        self.param = param
David Trattnig's avatar
David Trattnig committed
228
        self.update_store()
229
230
231
232
233
234
235

        if diff < 0:
            msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
            self.logger.error(SU.red(msg))
            self.exec_now()
        elif diff == 0:   
            self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
236
237
            self.exec_now()
        else:            
238
            self.exec_timed()            
David Trattnig's avatar
David Trattnig committed
239
            self.start()      
240
241


242
243
244
245
246
    def wait_for_parent(self):
        """
        Child timers are dependend on their parents. So let's wait until parents are done with their stuff.
        """
        if self.parent_timer and self.parent_timer.is_alive():
David Trattnig's avatar
David Trattnig committed
247
            while self.parent_timer.is_alive():
248
                self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
David Trattnig's avatar
David Trattnig committed
249
                time.sleep(0.2)
250

251
252
253
254
255
256

    def exec_now(self):
        """
        Immediate execution within a thread. It's not stored in the timer store.
        """
        self.direct_exec = True
257
        self.wait_for_parent()
258
259
        thread = Thread(target = self.func, args = (self.param,))
        thread.start()
David Trattnig's avatar
David Trattnig committed
260
       
261
262
263
264
265
266

    def exec_timed(self):
        """
        Timed execution in a thread.
        """
        def wrapper_func(param=None):
David Trattnig's avatar
David Trattnig committed
267
            self.wait_for_parent()            
268
            if param: self.func(param,) 
David Trattnig's avatar
David Trattnig committed
269
            else: self.func()            
270
271
272
273
274
275
276
277
278
279
280
        Timer.__init__(self, self.diff, wrapper_func, (self.param,))


    def update_store(self):
        """
        Adds the instance to the store and cancels any previously existing commands.
        """
        existing_command = None
        if self.timer_id in EngineExecutor.timer_store:
            existing_command = EngineExecutor.timer_store[self.timer_id]    
        if existing_command:
David Trattnig's avatar
David Trattnig committed
281
            self.logger.debug(f"Cancelling previous timer with ID: {self.timer_id}")
282
283
            existing_command.cancel()
            if existing_command.child_timer:
David Trattnig's avatar
David Trattnig committed
284
                self.logger.debug(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}")
285
286

        EngineExecutor.timer_store[self.timer_id] = self
David Trattnig's avatar
David Trattnig committed
287
        self.logger.debug(f"Created command timer with ID: {self.timer_id}")
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305


    def is_alive(self):
        """
        Returns true if the command is still due to be executed.
        """
        if self.direct_exec == True:
            return False
        return super().is_alive()


    def __str__(self):
        """
        String represenation of the timer.
        """
        return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"


David Trattnig's avatar
David Trattnig committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    @staticmethod
    def remove_stale_timers():
        """
        Removes timers from store which have been executed and are older than one hour.
        """
        timers = EngineExecutor.timer_store.values()
        del_keys = []

        for timer in timers:
            if timer.dt < datetime.now() - timedelta(seconds=3600):        
                if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
                    timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
                    del_keys.append(timer.timer_id)

        for timer_id in del_keys:
            del EngineExecutor.timer_store[timer_id]                    


324
325
326
    @staticmethod
    def log_commands():
        """
David Trattnig's avatar
David Trattnig committed
327
        Prints a list of active timers and inactive timer not older than one hour.
328
329
330
        """
        timers = EngineExecutor.timer_store.values()                
        msg = "\n [ ENGINE COMMAND QUEUE ]\n"
David Trattnig's avatar
David Trattnig committed
331
        EngineExecutor.remove_stale_timers()
332
333
334
335
336

        if not timers:
            msg += "None available!\n"
        else:
            for timer in timers:
337
338
                if not timer.parent_timer:
                    msg += f"      =>   {str(timer)}\n"
339
340
341
342
                if timer.child_timer:
                    msg += f"            =>   {str(timer.child_timer)}\n"

        EngineExecutor.logger.info(msg + "\n")