events.py 10.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
21
import datetime
22

23
from threading                  import Thread
David Trattnig's avatar
David Trattnig committed
24

25
from src.base.config            import AuraConfig
David Trattnig's avatar
David Trattnig committed
26
from src.plugins.mailer         import AuraMailer
27
28
from src.plugins.monitor        import AuraMonitor
from src.plugins.trackservice   import TrackServiceHandler
29
from src.scheduling.fallback    import FallbackManager
30
31
32
33
34
35


class EventBinding():
    """
    A binding between the event dispatcher and some event handler.

36
    It allows you to subscribe to events in a chained way:
37
38
39
40
41
42
43
44
45

        ```
        binding = dispatcher.attach(AuraMonitor)
        binding.subscribe("on_boot").subscribe("on_play")
        ```
    """
    dispatcher = None
    instance = None

46

47
    def __init__(self, dispatcher, instance):
48
49
50
        """
        Constructor
        """
51
52
53
54
55
56
57
58
59
60
61
62
        self.dispatcher = dispatcher
        self.instance = instance


    def subscribe(self, event_type):
        """
        Subscribes the instance to some event identified by the `event_type` string.
        """
        self.dispatcher.subscribe(self.instance, event_type)
        return self


David Trattnig's avatar
David Trattnig committed
63
    def get_instance(self):
64
65
66
67
68
69
70
71
72
        """
        Returns the object within that binding.
        """
        return self.instance



class EngineEventDispatcher():
    """
73
    Executes handlers for engine events.
74
75
76
77
78
    """
    logger = None
    config = None

    subscriber_registry = None
79
    engine = None
80
    scheduler = None
David Trattnig's avatar
David Trattnig committed
81
    monitor = None
82
83


84
    def __init__(self, engine):
85
        """
86
        Constructor
87
88
89
        """
        self.subscriber_registry = dict()
        self.logger = logging.getLogger("AuraEngine")
90
91
        self.config = AuraConfig.config()
        self.engine = engine
92

David Trattnig's avatar
David Trattnig committed
93
94
95
96
        binding = self.attach(AuraMailer)
        binding.subscribe("on_critical")
        binding.subscribe("on_sick")
        binding.subscribe("on_resurrect")
97
        binding.subscribe("on_fallback_active")
David Trattnig's avatar
David Trattnig committed
98

99
100
        binding = self.attach(AuraMonitor)
        binding.subscribe("on_boot")
David Trattnig's avatar
David Trattnig committed
101
102
103
104
        binding.subscribe("on_sick")
        binding.subscribe("on_resurrect")

        binding = self.attach(TrackServiceHandler)
105
        binding.subscribe("on_timeslot_start")
106
        binding.subscribe("on_timeslot_end")
107
        binding.subscribe("on_play")
108
109
        binding.subscribe("on_metadata")
        binding.subscribe("on_queue")
110
111


112
113
114
115
116
117

    #
    #   Methods
    #


118
119
    def attach(self, clazz):
        """
120
        Creates an instance of the given Class.
121
        """
122
        instance = clazz(self.engine)
123
124
125
126
127
128
129
130
131
132
133
134
        return EventBinding(self, instance)


    def subscribe(self, instance, event_type):
        """
        Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`.
        """
        if not event_type in self.subscriber_registry:
            self.subscriber_registry[event_type] = []
        self.subscriber_registry[event_type].append(instance)


135
    def call_event(self, event_type, *args):
136
137
138
139
140
141
142
143
144
145
146
        """
        Calls all subscribers for the given event type.
        """
        if not event_type in self.subscriber_registry:
            return
        listeners = self.subscriber_registry[event_type]
        if not listeners:
            return
        for listener in listeners:
            method = getattr(listener, event_type)
            if method:
147
148
                if args and len(args) > 0:
                    method(*args)
149
150
151
152
153
                else:
                    method()


    #
154
    #   Events
155
156
157
158
159
    #


    def on_initialized(self):
        """
160
        Called when the engine is initialized, just before
161
162

        Important: Subsequent events are called synchronously, hence blocking.
163
164
        """
        self.logger.debug("on_initialized(..)")
165
        from src.scheduling.scheduler import AuraScheduler
166
167
        self.fallback_manager = FallbackManager(self.engine)
        self.scheduler = AuraScheduler(self.engine, self.fallback_manager)
168
169
170
171
172
173
        self.call_event("on_initialized", None)


    def on_boot(self):
        """
        Called when the engine is starting up. This happens after the initialization step.
174
        Connection to Liquidsoap should be available here.
175
176

        Important: Subsequent events are called synchronously, hence blocking.
177
178
        """
        self.logger.debug("on_boot(..)")
179
        self.call_event("on_boot")
180
181
182
183


    def on_ready(self):
        """
184
        Called when the engine has finished booting and is ready to play.
185
        """
186
        def func(self, param):
187
188
189
            self.logger.debug("on_ready(..)")
            self.scheduler.on_ready()
            self.call_event("on_ready", param)
190

191
        thread = Thread(target = func, args = (self, None))
192
        thread.start()
193
194


195
196
197
    def on_timeslot_start(self, timeslot):
        """
        Called when a timeslot starts.
198
        """
199
        def func(self, timeslot):
200
201
202
            self.logger.debug("on_timeslot_start(..)")
            self.fallback_manager.on_timeslot_start(timeslot)
            self.call_event("on_timeslot_start", timeslot)
203
204

        thread = Thread(target = func, args = (self, timeslot))
205
        thread.start()
206

David Trattnig's avatar
David Trattnig committed
207

208
209
210
211
    def on_timeslot_end(self, timeslot):
        """
        Called when a timeslot ends.
        """
212
        def func(self, timeslot):
213
214
215
216
217
218
219
220
            self.logger.debug("on_timeslot_end(..)")
            self.fallback_manager.on_timeslot_end(timeslot)
            self.call_event("on_timeslot_end", timeslot)

        thread = Thread(target = func, args = (self, timeslot))
        thread.start()


221
222
    def on_play(self, entry):
        """
223
        Event Handler which is called by the engine when some play command to Liquidsoap is issued.
Lars Kruse's avatar
Lars Kruse committed
224
        This does not indicate that Liquidsoap started playing actually, only that the command has
225
        been issued. To get the metadata update issued by Liquidsoap use `on_metadata` instead.
226

227
228
229
        Args:
            source (String):    The `PlaylistEntry` object
        """
230
        def func(self, entry):
231
            self.logger.debug("on_play(..)")
232
            # Assign timestamp indicating start play time. Use the actual playtime when possible.
David Trattnig's avatar
David Trattnig committed
233
            entry.entry_start_actual = datetime.datetime.now()
234
            self.scheduler.on_play(entry)
David Trattnig's avatar
David Trattnig committed
235
            self.call_event("on_play", entry)
236

237
        thread = Thread(target = func, args = (self, entry))
238
        thread.start()
239
240


241
242
    def on_metadata(self, data):
        """
243
244
        Event called by the soundsystem implementation (i.e. Liquidsoap) when some entry is actually playing.
        This does not include live or stream sources, since they ain't have metadata and are triggered from
245
        engine core (see `on_play(..)`).
246
247
248
249

        Args:
            data (dict):    A collection of metadata related to the current track
        """
250
        def func(self, data):
251
            self.logger.debug("on_metadata(..)")
252
            self.fallback_manager.on_metadata(data)
253
254
255
            self.call_event("on_metadata", data)

        thread = Thread(target = func, args = (self, data))
256
        thread.start()
257
258


259
260
261
262
    def on_stop(self, entry):
        """
        The entry on the assigned channel has been stopped playing.
        """
263
        def func(self, entry):
David Trattnig's avatar
David Trattnig committed
264
265
            self.logger.debug("on_stop(..)")
            self.call_event("on_stop", entry)
266

David Trattnig's avatar
David Trattnig committed
267
        thread = Thread(target = func, args = (self, entry))
268
        thread.start()
269
270


271
272
273
274
275
276
277
278
    # def on_fallback_updated(self, playlist_uri):
    #     """
    #     Called when the scheduled fallback playlist has been updated.
    #     This event does not indicate that the fallback is actually playing.
    #     """
    #     def func(self, playlist_uri):
    #         self.logger.debug("on_fallback_updated(..)")
    #         self.call_event("on_fallback_updated", playlist_uri)
279

280
281
    #     thread = Thread(target = func, args = (self, playlist_uri))
    #     thread.start()
282
283


284
285
286
287
288
289
290
291
    # def on_fallback_cleaned(self, cleaned_channel):
    #     """
    #     Called when the scheduled fallback queue has been cleaned up.
    #     This event does not indicate that some fallback is actually playing.
    #     """
    #     def func(self, cleaned_channel):
    #         self.logger.debug("on_fallback_cleaned(..)")
    #         self.call_event("on_fallback_cleaned", cleaned_channel)
David Trattnig's avatar
David Trattnig committed
292

293
294
    #     thread = Thread(target = func, args = (self, cleaned_channel))
    #     thread.start()
295
296


297
    def on_fallback_active(self, timeslot, fallback_type):
298
        """
299
300
        Called when a fallback is activated for the given timeslot,
        since no default playlist is available.
301
        """
302
        def func(self, timeslot, fallback_type):
303
304
            self.logger.debug("on_fallback_active(..)")
            self.call_event("on_fallback_active", timeslot, fallback_type)
David Trattnig's avatar
David Trattnig committed
305

306
        thread = Thread(target = func, args = (self, timeslot, fallback_type))
307
        thread.start()
308
309
310
311
312
313


    def on_queue(self, entries):
        """
        One or more entries have been queued and are currently pre-loaded.
        """
314
        def func(self, entries):
David Trattnig's avatar
David Trattnig committed
315
316
317
318
            self.logger.debug("on_queue(..)")
            self.call_event("on_queue", entries)

        thread = Thread(target = func, args = (self, entries))
319
        thread.start()
320
321


David Trattnig's avatar
David Trattnig committed
322
    def on_sick(self, data):
323
324
325
        """
        Called when the engine is in some unhealthy state.
        """
326
        def func(self, data):
David Trattnig's avatar
David Trattnig committed
327
328
329
330
            self.logger.debug("on_sick(..)")
            self.call_event("on_sick", data)

        thread = Thread(target = func, args = (self, data))
331
        thread.start()
332
333


David Trattnig's avatar
David Trattnig committed
334
    def on_resurrect(self, data):
335
336
337
        """
        Called when the engine turned healthy again after being sick.
        """
338
        def func(self, data):
David Trattnig's avatar
David Trattnig committed
339
340
341
342
            self.logger.debug("on_resurrect(..)")
            self.call_event("on_resurrect", data)

        thread = Thread(target = func, args = (self, data))
343
        thread.start()
344
345
346
347
348
349


    def on_critical(self, subject, message, data=None):
        """
        Callend when some critical event occurs
        """
350
        def func(self, subject, message, data):
David Trattnig's avatar
David Trattnig committed
351
352
353
354
            self.logger.debug("on_critical(..)")
            self.call_event("on_critical", (subject, message, data))

        thread = Thread(target = func, args = (self, subject, message, data))
355
        thread.start()