events.py 10.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
21
import datetime
22

23
from threading                  import Thread
David Trattnig's avatar
David Trattnig committed
24

25
from src.base.config            import AuraConfig
David Trattnig's avatar
David Trattnig committed
26
from src.plugins.mailer         import AuraMailer
27
28
from src.plugins.monitor        import AuraMonitor
from src.plugins.trackservice   import TrackServiceHandler
29
from src.scheduling.fallback    import FallbackManager
30
31
32
33
34
35


class EventBinding():
    """
    A binding between the event dispatcher and some event handler.

36
    It allows you to subscribe to events in a chained way:
37
38
39
40
41
42
43
44
45

        ```
        binding = dispatcher.attach(AuraMonitor)
        binding.subscribe("on_boot").subscribe("on_play")
        ```
    """
    dispatcher = None
    instance = None

46

47
    def __init__(self, dispatcher, instance):
48
49
50
        """
        Constructor
        """
51
52
53
54
55
56
57
58
59
60
61
62
        self.dispatcher = dispatcher
        self.instance = instance


    def subscribe(self, event_type):
        """
        Subscribes the instance to some event identified by the `event_type` string.
        """
        self.dispatcher.subscribe(self.instance, event_type)
        return self


David Trattnig's avatar
David Trattnig committed
63
    def get_instance(self):
64
65
66
67
68
69
70
71
72
        """
        Returns the object within that binding.
        """
        return self.instance



class EngineEventDispatcher():
    """
73
    Executes handlers for engine events.
74
75
76
77
78
    """
    logger = None
    config = None

    subscriber_registry = None
79
    engine = None
80
    scheduler = None
David Trattnig's avatar
David Trattnig committed
81
    monitor = None
82
83


84
    def __init__(self, engine):
85
        """
86
        Constructor
87
88
89
        """
        self.subscriber_registry = dict()
        self.logger = logging.getLogger("AuraEngine")
90
91
        self.config = AuraConfig.config()
        self.engine = engine
92
        
David Trattnig's avatar
David Trattnig committed
93
94
95
96
        binding = self.attach(AuraMailer)
        binding.subscribe("on_critical")
        binding.subscribe("on_sick")
        binding.subscribe("on_resurrect")
97
        binding.subscribe("on_fallback_active")
David Trattnig's avatar
David Trattnig committed
98

99
100
        binding = self.attach(AuraMonitor)
        binding.subscribe("on_boot")
David Trattnig's avatar
David Trattnig committed
101
102
103
104
        binding.subscribe("on_sick")
        binding.subscribe("on_resurrect")

        binding = self.attach(TrackServiceHandler)
105
        binding.subscribe("on_timeslot_start")
106
        binding.subscribe("on_play")
107
108
        binding.subscribe("on_metadata")
        binding.subscribe("on_queue")
109
110


111
112
113
114
115
116

    #
    #   Methods
    #


117
118
    def attach(self, clazz):
        """
119
        Creates an instance of the given Class.
120
        """
121
        instance = clazz(self.engine)
122
123
124
125
126
127
128
129
130
131
132
133
        return EventBinding(self, instance)


    def subscribe(self, instance, event_type):
        """
        Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`.
        """
        if not event_type in self.subscriber_registry:
            self.subscriber_registry[event_type] = []
        self.subscriber_registry[event_type].append(instance)


134
    def call_event(self, event_type, *args):
135
136
137
138
139
140
141
142
143
144
145
        """
        Calls all subscribers for the given event type.
        """
        if not event_type in self.subscriber_registry:
            return
        listeners = self.subscriber_registry[event_type]
        if not listeners:
            return
        for listener in listeners:
            method = getattr(listener, event_type)
            if method:
146
147
                if args and len(args) > 0:
                    method(*args)
148
149
150
151
152
                else:
                    method()


    #
153
    #   Events
154
155
156
157
158
    #


    def on_initialized(self):
        """
159
        Called when the engine is initialized, just before
160
161

        Important: Subsequent events are called synchronously, hence blocking.
162
163
        """
        self.logger.debug("on_initialized(..)")
164
        from src.scheduling.scheduler import AuraScheduler
165
166
        self.fallback_manager = FallbackManager(self.engine)
        self.scheduler = AuraScheduler(self.engine, self.fallback_manager)
167
168
169
170
171
172
        self.call_event("on_initialized", None)


    def on_boot(self):
        """
        Called when the engine is starting up. This happens after the initialization step.
173
        Connection to Liquidsoap should be available here.
174
175

        Important: Subsequent events are called synchronously, hence blocking.
176
177
        """
        self.logger.debug("on_boot(..)")
178
        self.call_event("on_boot")
179
180
181
182


    def on_ready(self):
        """
183
        Called when the engine has finished booting and is ready to play.
184
        """
185
186
187
188
        def func(self, param):                                    
            self.logger.debug("on_ready(..)")
            self.scheduler.on_ready()
            self.call_event("on_ready", param)
189

190
191
        thread = Thread(target = func, args = (self, None))
        thread.start()             
192
193


194
195
196
    def on_timeslot_start(self, timeslot):
        """
        Called when a timeslot starts.
197
        """
198
        def func(self, timeslot):        
199
200
201
            self.logger.debug("on_timeslot_start(..)")
            self.fallback_manager.on_timeslot_start(timeslot)
            self.call_event("on_timeslot_start", timeslot)
202
203

        thread = Thread(target = func, args = (self, timeslot))
204
        thread.start() 
205

David Trattnig's avatar
David Trattnig committed
206

207
208
209
210
211
212
213
214
215
216
217
218
219
    def on_timeslot_end(self, timeslot):
        """
        Called when a timeslot ends.
        """
        def func(self, timeslot):        
            self.logger.debug("on_timeslot_end(..)")
            self.fallback_manager.on_timeslot_end(timeslot)
            self.call_event("on_timeslot_end", timeslot)

        thread = Thread(target = func, args = (self, timeslot))
        thread.start()


220
221
222
    def on_play(self, entry):
        """
        Event Handler which is called by the engine when some entry is actually playing. 
223

224
225
226
227
228
        Args:
            source (String):    The `PlaylistEntry` object
        """
        def func(self, entry):        
            self.logger.debug("on_play(..)")
David Trattnig's avatar
David Trattnig committed
229
230
231
            # Assign timestamp for play time
            entry.entry_start_actual = datetime.datetime.now()
            self.call_event("on_play", entry)
232

233
        thread = Thread(target = func, args = (self, entry))
David Trattnig's avatar
David Trattnig committed
234
        thread.start()    
235
236


237
238
    def on_metadata(self, data):
        """
239
240
241
        Event called by the soundsystem implementation (i.e. Liquidsoap) when some entry is actually playing. 
        This does not include live or stream sources, since they ain't have metadata and are triggered from 
        engine core (see `on_play(..)`).
242
243
244
245
246
247

        Args:
            data (dict):    A collection of metadata related to the current track
        """
        def func(self, data):        
            self.logger.debug("on_metadata(..)")
248
            self.fallback_manager.on_metadata(data)
249
250
251
252
253
254
            self.call_event("on_metadata", data)

        thread = Thread(target = func, args = (self, data))
        thread.start() 


255
256
257
258
    def on_stop(self, entry):
        """
        The entry on the assigned channel has been stopped playing.
        """
David Trattnig's avatar
David Trattnig committed
259
260
261
262
263
264
        def func(self, entry):        
            self.logger.debug("on_stop(..)")
            self.call_event("on_stop", entry)
        
        thread = Thread(target = func, args = (self, entry))
        thread.start() 
265
266


267
268
269
    def on_fallback_updated(self, playlist_uri):
        """
        Called when the scheduled fallback playlist has been updated.
270
        This event does not indicate that the fallback is actually playing.
271
        """
272
273
274
275
276
277
        def func(self, playlist_uri):        
            self.logger.debug("on_fallback_updated(..)")
            self.call_event("on_fallback_updated", playlist_uri)

        thread = Thread(target = func, args = (self, playlist_uri))
        thread.start() 
278
279
280
281
282


    def on_fallback_cleaned(self, cleaned_channel):
        """
        Called when the scheduled fallback queue has been cleaned up.
283
        This event does not indicate that some fallback is actually playing.
284
        """
285
286
287
        def func(self, cleaned_channel):        
            self.logger.debug("on_fallback_cleaned(..)")
            self.call_event("on_fallback_cleaned", cleaned_channel)
David Trattnig's avatar
David Trattnig committed
288

289
        thread = Thread(target = func, args = (self, cleaned_channel))
David Trattnig's avatar
David Trattnig committed
290
        thread.start() 
291
292


293
    def on_fallback_active(self, timeslot, fallback_type):
294
        """
295
296
        Called when a fallback is activated for the given timeslot,
        since no default playlist is available.
297
        """
298
299
300
        def func(self, timeslot, fallback_type):        
            self.logger.debug("on_fallback_active(..)")
            self.call_event("on_fallback_active", timeslot, fallback_type)
David Trattnig's avatar
David Trattnig committed
301

302
        thread = Thread(target = func, args = (self, timeslot, fallback_type))
David Trattnig's avatar
David Trattnig committed
303
        thread.start() 
304
305
306
307
308
309


    def on_queue(self, entries):
        """
        One or more entries have been queued and are currently pre-loaded.
        """
David Trattnig's avatar
David Trattnig committed
310
311
312
313
314
315
        def func(self, entries):        
            self.logger.debug("on_queue(..)")
            self.call_event("on_queue", entries)

        thread = Thread(target = func, args = (self, entries))
        thread.start() 
316
317


David Trattnig's avatar
David Trattnig committed
318
    def on_sick(self, data):
319
320
321
        """
        Called when the engine is in some unhealthy state.
        """
David Trattnig's avatar
David Trattnig committed
322
323
324
325
326
327
        def func(self, data):        
            self.logger.debug("on_sick(..)")
            self.call_event("on_sick", data)

        thread = Thread(target = func, args = (self, data))
        thread.start() 
328
329


David Trattnig's avatar
David Trattnig committed
330
    def on_resurrect(self, data):
331
332
333
        """
        Called when the engine turned healthy again after being sick.
        """
David Trattnig's avatar
David Trattnig committed
334
335
336
337
338
339
        def func(self, data):        
            self.logger.debug("on_resurrect(..)")
            self.call_event("on_resurrect", data)

        thread = Thread(target = func, args = (self, data))
        thread.start() 
340
341
342
343
344
345


    def on_critical(self, subject, message, data=None):
        """
        Callend when some critical event occurs
        """
David Trattnig's avatar
David Trattnig committed
346
347
348
349
350
351
        def func(self, subject, message, data):        
            self.logger.debug("on_critical(..)")
            self.call_event("on_critical", (subject, message, data))

        thread = Thread(target = func, args = (self, subject, message, data))
        thread.start()