events.py 9.21 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import logging
21
import datetime
22

23
from threading                  import Thread
David Trattnig's avatar
David Trattnig committed
24

25
26
from src.base.config            import AuraConfig
from src.base.utils             import SimpleUtil as SU
David Trattnig's avatar
David Trattnig committed
27
from src.plugins.mailer         import AuraMailer
28
29
from src.plugins.monitor        import AuraMonitor
from src.plugins.trackservice   import TrackServiceHandler
30
31
32
33
34
35


class EventBinding():
    """
    A binding between the event dispatcher and some event handler.

36
    It allows you to subscribe to events in a chained way:
37
38
39
40
41
42
43
44
45

        ```
        binding = dispatcher.attach(AuraMonitor)
        binding.subscribe("on_boot").subscribe("on_play")
        ```
    """
    dispatcher = None
    instance = None

46

47
    def __init__(self, dispatcher, instance):
48
49
50
        """
        Constructor
        """
51
52
53
54
55
56
57
58
59
60
61
62
        self.dispatcher = dispatcher
        self.instance = instance


    def subscribe(self, event_type):
        """
        Subscribes the instance to some event identified by the `event_type` string.
        """
        self.dispatcher.subscribe(self.instance, event_type)
        return self


David Trattnig's avatar
David Trattnig committed
63
    def get_instance(self):
64
65
66
67
68
69
70
71
72
        """
        Returns the object within that binding.
        """
        return self.instance



class EngineEventDispatcher():
    """
73
    Executes handlers for engine events.
74
75
76
77
78
    """
    logger = None
    config = None

    subscriber_registry = None
79
    engine = None
80
    scheduler = None
David Trattnig's avatar
David Trattnig committed
81
    monitor = None
82
83


84
    def __init__(self, engine):
85
        """
86
        Constructor
87
88
89
        """
        self.subscriber_registry = dict()
        self.logger = logging.getLogger("AuraEngine")
90
91
        self.config = AuraConfig.config()
        self.engine = engine
92
        
David Trattnig's avatar
David Trattnig committed
93
94
95
96
97
        binding = self.attach(AuraMailer)
        binding.subscribe("on_critical")
        binding.subscribe("on_sick")
        binding.subscribe("on_resurrect")

98
99
        binding = self.attach(AuraMonitor)
        binding.subscribe("on_boot")
David Trattnig's avatar
David Trattnig committed
100
101
102
103
        binding.subscribe("on_sick")
        binding.subscribe("on_resurrect")

        binding = self.attach(TrackServiceHandler)
104
        binding.subscribe("on_timeslot")
105
        binding.subscribe("on_play")
106
107
        binding.subscribe("on_metadata")
        binding.subscribe("on_queue")
108
109
110
111


    def attach(self, clazz):
        """
112
        Creates an instance of the given Class.
113
        """
114
        instance = clazz(self.engine)
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
        return EventBinding(self, instance)


    def subscribe(self, instance, event_type):
        """
        Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`.
        """
        if not event_type in self.subscriber_registry:
            self.subscriber_registry[event_type] = []
        self.subscriber_registry[event_type].append(instance)


    def call_event(self, event_type, args):
        """
        Calls all subscribers for the given event type.
        """
        if not event_type in self.subscriber_registry:
            return
        listeners = self.subscriber_registry[event_type]
        if not listeners:
            return
        for listener in listeners:
            method = getattr(listener, event_type)
            if method:
                if args:
                    method(args)
                else:
                    method()


    #
    # Events
    #


    def on_initialized(self):
        """
152
        Called when the engine is initialized, just before
153
154
        """
        self.logger.debug("on_initialized(..)")
155
        from src.scheduling.scheduler import AuraScheduler
156
        self.scheduler = AuraScheduler(self.engine)
157
158
159
160
161
162
        self.call_event("on_initialized", None)


    def on_boot(self):
        """
        Called when the engine is starting up. This happens after the initialization step.
163
        Connection to Liquidsoap should be available here.
164
165
166
167
168
169
170
171
172
        """
        self.logger.debug("on_boot(..)")
        self.call_event("on_boot", None)


    def on_ready(self):
        """
        Called when the engine is booted and ready to play.
        """
173
        self.logger.debug("on_ready(..)")
174
175
176
        self.scheduler.on_ready()


177
    def on_timeslot(self, timeslot):
178
        """
179
        Event Handler which is called by the scheduler when the current timeslot is refreshed.
180
181

        Args:
182
            source (String):    The `PlaylistEntry` object
183
        """
184
185
186
187
188
189
190
        def func(self, timeslot):        
            self.logger.debug("on_timeslot(..)")
            self.call_event("on_timeslot", timeslot)

        thread = Thread(target = func, args = (self, timeslot))
        thread.start()    

David Trattnig's avatar
David Trattnig committed
191

192
193
194
    def on_play(self, entry):
        """
        Event Handler which is called by the engine when some entry is actually playing. 
195

196
197
198
199
200
        Args:
            source (String):    The `PlaylistEntry` object
        """
        def func(self, entry):        
            self.logger.debug("on_play(..)")
David Trattnig's avatar
David Trattnig committed
201
202
203
            # Assign timestamp for play time
            entry.entry_start_actual = datetime.datetime.now()
            self.call_event("on_play", entry)
204

205
        thread = Thread(target = func, args = (self, entry))
David Trattnig's avatar
David Trattnig committed
206
        thread.start()    
207
208


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
    def on_metadata(self, data):
        """
        Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
        when some entry is actually playing.

        Args:
            data (dict):    A collection of metadata related to the current track
        """
        def func(self, data):        
            self.logger.debug("on_metadata(..)")
            self.call_event("on_metadata", data)

        thread = Thread(target = func, args = (self, data))
        thread.start() 


225
226
227
228
    def on_stop(self, entry):
        """
        The entry on the assigned channel has been stopped playing.
        """
David Trattnig's avatar
David Trattnig committed
229
230
231
232
233
234
        def func(self, entry):        
            self.logger.debug("on_stop(..)")
            self.call_event("on_stop", entry)
        
        thread = Thread(target = func, args = (self, entry))
        thread.start() 
235
236


237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
    def on_fallback_updated(self, playlist_uri):
        """
        Called when the scheduled fallback playlist has been updated.
        """
        self.logger.debug("on_fallback_updated(..)")
        self.call_event("on_fallback_updated", playlist_uri)


    def on_fallback_cleaned(self, cleaned_channel):
        """
        Called when the scheduled fallback queue has been cleaned up.
        """
        self.logger.debug("on_fallback_cleaned(..)")
        self.call_event("on_fallback_cleaned", cleaned_channel)


253
254
255
256
    def on_idle(self):
        """
        Callend when no entry is playing
        """
David Trattnig's avatar
David Trattnig committed
257
258
259
260
261
262
263
        def func(self):
            self.logger.debug("on_idle(..)")
            self.logger.error(SU.red("Currently there's nothing playing!"))
            self.call_event("on_idle", None)

        thread = Thread(target = func, args = (self, ))
        thread.start() 
264
265


266
    def on_timeslot_change(self, timeslot):
267
        """
268
        Called when the playlist or entries of the current timeslot have changed.
269
        """
270
271
272
        def func(self, timeslot):        
            self.logger.debug("on_timeslot_change(..)")
            self.call_event("on_timeslot_change", timeslot)
David Trattnig's avatar
David Trattnig committed
273

274
        thread = Thread(target = func, args = (self, timeslot))
David Trattnig's avatar
David Trattnig committed
275
        thread.start() 
276
277
278
279
280
281


    def on_queue(self, entries):
        """
        One or more entries have been queued and are currently pre-loaded.
        """
David Trattnig's avatar
David Trattnig committed
282
283
284
285
286
287
        def func(self, entries):        
            self.logger.debug("on_queue(..)")
            self.call_event("on_queue", entries)

        thread = Thread(target = func, args = (self, entries))
        thread.start() 
288
289


David Trattnig's avatar
David Trattnig committed
290
    def on_sick(self, data):
291
292
293
        """
        Called when the engine is in some unhealthy state.
        """
David Trattnig's avatar
David Trattnig committed
294
295
296
297
298
299
        def func(self, data):        
            self.logger.debug("on_sick(..)")
            self.call_event("on_sick", data)

        thread = Thread(target = func, args = (self, data))
        thread.start() 
300
301


David Trattnig's avatar
David Trattnig committed
302
    def on_resurrect(self, data):
303
304
305
        """
        Called when the engine turned healthy again after being sick.
        """
David Trattnig's avatar
David Trattnig committed
306
307
308
309
310
311
        def func(self, data):        
            self.logger.debug("on_resurrect(..)")
            self.call_event("on_resurrect", data)

        thread = Thread(target = func, args = (self, data))
        thread.start() 
312
313
314
315
316
317


    def on_critical(self, subject, message, data=None):
        """
        Callend when some critical event occurs
        """
David Trattnig's avatar
David Trattnig committed
318
319
320
321
322
323
        def func(self, subject, message, data):        
            self.logger.debug("on_critical(..)")
            self.call_event("on_critical", (subject, message, data))

        thread = Thread(target = func, args = (self, subject, message, data))
        thread.start()