scheduler.py 19.4 KB
Newer Older
David Trattnig's avatar
David Trattnig committed
1

2
#
David Trattnig's avatar
David Trattnig committed
3
# Aura Engine (https://gitlab.servus.at/aura/engine)
4
#
David Trattnig's avatar
David Trattnig committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20

David Trattnig's avatar
David Trattnig committed
21
22
import logging
import threading
23
24
import time

25
26
27

from src.base.config            import AuraConfig
from src.base.utils             import SimpleUtil as SU
28
from src.scheduling.models      import AuraDatabaseModel
29
from src.base.exceptions        import NoActiveTimeslotException, LoadSourceException
David Trattnig's avatar
David Trattnig committed
30
31
32
33
from src.control                import EngineExecutor
from src.engine                 import Engine
from src.channels               import ChannelType, TransitionType, EntryPlayState
from src.resources              import ResourceClass, ResourceUtil
34

David Trattnig's avatar
David Trattnig committed
35
from src.scheduling.utils       import TimeslotRenderer
36
from src.scheduling.programme   import ProgrammeService
David Trattnig's avatar
David Trattnig committed
37
38
39



David Trattnig's avatar
David Trattnig committed
40
class AuraScheduler(threading.Thread):
41
42
    """
    Aura Scheduler Class
43

44
    - Retrieves data from Steering and Tank
45
    - Executes engine actions in an automated fashion
46

47
    """
48
49
    config = None
    logger = None
David Trattnig's avatar
David Trattnig committed
50
    engine = None
51
    exit_event = None
52
    timeslot_renderer = None
53
    programme = None
54
    message_timer = []
David Trattnig's avatar
David Trattnig committed
55
    fallback = None
David Trattnig's avatar
David Trattnig committed
56
57
    is_initialized = None
    is_initialized = None
58

59

60

61
    def __init__(self, engine, fallback_manager):
62
63
        """
        Constructor
64
65

        Args:
66
            config (AuraConfig):        Reads the engine configuration
David Trattnig's avatar
David Trattnig committed
67
            engine (Engine):            The engine to play the schedule on
68
            func_on_init (Function):    The function to be called when the scheduler is initialized
69
        """
70
        self.config = AuraConfig.config()
71
        self.logger = logging.getLogger("AuraEngine")
72
        self.programme = ProgrammeService()
73
        self.timeslot_renderer = TimeslotRenderer(self)
74
        self.fallback = fallback_manager
David Trattnig's avatar
David Trattnig committed
75
76
        self.engine = engine
        self.engine.scheduler = self
77
        self.is_soundsytem_init = False
78

79
        # Scheduler Initialization
80
        AuraDatabaseModel.init_database()
81
        self.is_initialized = False
82
        self.is_engine_ready = False
83

84
        # Init scheduling thread
85
        threading.Thread.__init__(self)
86
        self.exit_event = threading.Event()
87
88
        self.start()

89

90

91
    def run(self):
92
        """
David Trattnig's avatar
David Trattnig committed
93
        Called when thread is started via `start()`. It does the following:
94
95

            1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration.
96
97
            2. Loads the latest programme from the database and sets the instance state `self.programme` with current timeslots.
            3. Queues all timeslots of the programme, if the soundssystem is ready to accept commands.
98
99

        On every cycle the configuration file is reloaded, to allow modifications while running the engine.
100
        """
101
        while not self.exit_event.is_set():
102
            try:
103
                self.config.load_config()
104
                seconds_to_wait = int(self.config.get("fetching_frequency"))
105
                self.logger.info(SU.cyan(f"== start fetching new timeslots (every {seconds_to_wait} seconds) =="))
106

107
                # Load some stuff from the API in any case
David Trattnig's avatar
David Trattnig committed
108
                self.programme.refresh()
109

David Trattnig's avatar
David Trattnig committed
110
                # Queue only when the engine is ready to play
111
                if self.is_initialized == True:
112
                    self.queue_programme()
113

114
            except Exception as e:
David Trattnig's avatar
David Trattnig committed
115
                self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"))
116
                # Keep on working anyway
117

118
            EngineExecutor.log_commands()
119
            self.exit_event.wait(seconds_to_wait)
120

121

122

123
    #
124
    #   EVENT HANDLERS
125
    #
126

127
128

    def on_ready(self):
129
        """
130
        Called when the engine has finished booting and is ready to play.
131
        """
David Trattnig's avatar
David Trattnig committed
132
        self.is_initialized = True
133
        self.logger.info(self.timeslot_renderer.get_ascii_timeslots())
David Trattnig's avatar
David Trattnig committed
134
135
136
137

        try:
            self.play_active_entry()
            self.queue_startup_entries()
138
        except NoActiveTimeslotException:
David Trattnig's avatar
David Trattnig committed
139
            # That's not good, but keep on working...
140
            pass
141
142
143
144


    def on_play(self, entry):
        """
145
        Event Handler which is called by the engine when some entry is actually playing.
146
147
        Ignores entries which are part of a scheduled fallback, because they handle their
        stuff by themselves.
David Trattnig's avatar
David Trattnig committed
148

149
150
151
152
153
154
        Args:
            source (String):    The `PlaylistEntry` object
        """
        if entry.channel in ChannelType.FALLBACK_QUEUE.channels:
            return

155
156
157
158
        current_timeslot = self.programme.get_current_timeslot()
        if current_timeslot:
            current_timeslot.set_active_entry(entry)

159
160


161
162
163
164
165
    #
    #   METHODS
    #


David Trattnig's avatar
David Trattnig committed
166
167
168
169
170
    def get_programme(self):
        """
        Returns the current programme.
        """
        return self.programme
171

David Trattnig's avatar
David Trattnig committed
172

173
174
    def play_active_entry(self):
        """
175
        Plays the entry scheduled for the very current moment and forwards to the scheduled position in time.
David Trattnig's avatar
David Trattnig committed
176
        Usually called when the Engine boots.
David Trattnig's avatar
David Trattnig committed
177
178

        Raises:
179
            (NoActiveTimeslotException):    If there's no timeslot in the programme, within the scheduling window
180
181
        """
        sleep_offset = 10
David Trattnig's avatar
David Trattnig committed
182
        active_timeslot = self.programme.get_current_timeslot()
David Trattnig's avatar
David Trattnig committed
183
184

        # Schedule any available fallback playlist
185
        if active_timeslot:
186
187
            # Create command timer to indicate the start of the timeslot
            TimeslotCommand(self.engine, active_timeslot)
188
            self.fallback.queue_fallback_playlist(active_timeslot)
David Trattnig's avatar
David Trattnig committed
189

David Trattnig's avatar
David Trattnig committed
190
        active_entry = self.programme.get_current_entry()
191
        if not active_entry:
192
            raise NoActiveTimeslotException
193

194
        # In case of a file-system source, we need to fast-foward to the current marker as per timeslot
David Trattnig's avatar
David Trattnig committed
195
        if active_entry.get_content_type() in ResourceClass.FILE.types:
196
197

            # Calculate the seconds we have to fast-forward
David Trattnig's avatar
David Trattnig committed
198
            now_unix = Engine.engine_time()
199
200
            seconds_to_seek = now_unix - active_entry.start_unix

201
            # If the seek exceeds the length of the current track,
202
203
204
            # there's no need to do anything - the scheduler takes care of the rest
            if (seconds_to_seek + sleep_offset) > active_entry.duration:
                self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.")
205
            else:
David Trattnig's avatar
David Trattnig committed
206
                # Preload and play active entry
207
                PlayCommand(self.engine, [active_entry])
208
209
210
211

                # Fast-forward to the scheduled position
                if seconds_to_seek > 0:
                    # Without plenty of timeout (10s) the seek doesn't work
David Trattnig's avatar
David Trattnig committed
212
213
214
215
216
217
218
219
                    def async_cue_seek(seconds_to_seek):
                        seconds_to_seek += sleep_offset
                        time.sleep(sleep_offset)
                        self.logger.info("Going to fast-forward %s seconds" % seconds_to_seek)
                        response = self.engine.player.queue_seek(active_entry.channel, seconds_to_seek)
                        self.logger.info("Sound-system seek response: " + response)

                    thread = threading.Thread(target = async_cue_seek, args = (seconds_to_seek,))
220
                    thread.start()
David Trattnig's avatar
David Trattnig committed
221
222
223

        elif active_entry.get_content_type() in ResourceClass.STREAM.types \
            or active_entry.get_content_type() in ResourceClass.LIVE.types:
224

David Trattnig's avatar
David Trattnig committed
225
            # Preload and play active entry
226
            PlayCommand(self.engine, [active_entry])
227

David Trattnig's avatar
David Trattnig committed
228
229
        else:
            self.logger.critical("Unknown Entry Type: %s" % active_entry)
230

231
232


David Trattnig's avatar
David Trattnig committed
233
    def get_active_playlist(self):
David Trattnig's avatar
David Trattnig committed
234
        """
David Trattnig's avatar
David Trattnig committed
235
        Retrieves the currently playing playlist.
David Trattnig's avatar
David Trattnig committed
236

David Trattnig's avatar
David Trattnig committed
237
238
        Returns:
            (FallbackType, Playlist): The resolved playlist
239
        """
David Trattnig's avatar
David Trattnig committed
240
241
242
243
        timeslot = self.programme.get_current_timeslot()
        if timeslot:
            return self.fallback.resolve_playlist(timeslot)
        return (None, None)
244
245
246



247
248
    def queue_programme(self):
        """
249
        Queues the current programme (playlists as per timeslot) by creating
250
        timed commands to the sound-system to enable the individual tracks of playlists.
251
252
        """

253
        # Get a clean set of the timeslots within the scheduling window
David Trattnig's avatar
David Trattnig committed
254
        timeslots = self.programme.get_next_timeslots()
255
        timeslots = self.filter_scheduling_window(timeslots)
David Trattnig's avatar
David Trattnig committed
256

257
258
259
        # Queue the timeslots, their playlists and entries
        if timeslots:
            for next_timeslot in timeslots:
260
261
262
                # Create command timer to indicate the start of the timeslot
                TimeslotCommand(self.engine, next_timeslot)
                # Schedule any available fallback playlist
263
                self.fallback.queue_fallback_playlist(next_timeslot)
David Trattnig's avatar
David Trattnig committed
264

265
266
267
                playlist = self.programme.get_current_playlist(next_timeslot)
                if playlist:
                    self.queue_playlist_entries(next_timeslot, playlist.entries, False, True)
268

David Trattnig's avatar
David Trattnig committed
269
        self.logger.info(SU.green("Finished queuing programme."))
270

David Trattnig's avatar
David Trattnig committed
271
272
273
274
275
276
277


    def queue_startup_entries(self):
        """
        Queues all entries after the one currently playing upon startup. Don't use
        this method in any other scenario, as it doesn't respect the scheduling window.
        """
David Trattnig's avatar
David Trattnig committed
278
        current_timeslot = self.programme.get_current_timeslot()
279

280
281
        # Queue the (rest of the) currently playing timeslot upon startup
        if current_timeslot:
282
            current_playlist = self.programme.get_current_playlist(current_timeslot)
David Trattnig's avatar
David Trattnig committed
283
284

            if current_playlist:
David Trattnig's avatar
David Trattnig committed
285
                active_entry = self.programme.get_current_entry()
David Trattnig's avatar
David Trattnig committed
286
287
288
                if active_entry:
                    # Queue open entries for current playlist
                    rest_of_playlist = active_entry.get_next_entries(True)
289
                    self.queue_playlist_entries(current_timeslot, rest_of_playlist, False, True)
290
291


David Trattnig's avatar
David Trattnig committed
292

293
    def queue_playlist_entries(self, timeslot, entries, fade_in, fade_out):
294
        """
295
        Creates sound-system player commands for all playlist items to be executed at the scheduled time.
David Trattnig's avatar
David Trattnig committed
296
297

        Since each scheduled playlist can consist of multiple entry types such as *file*, *live*,
298
299
        and *stream*, the play-out of the timeslot is actually a bit more complex. Before any playlist
        entries of the timeslot can be turned into sound, they need to be aggregated, queued and pre-loaded.
David Trattnig's avatar
David Trattnig committed
300
301
302
303
304
305
306
307
308
309
310
311
312

        1. First, all entries are aggregated when they hold filesystem entries.
            Given you have a playlist with 10 entries, the first 4 are consisting of files, the next two
            of a a stream and a live source. The last 4 are files again. These entries are now
            aggregated into 4 groups: one for the files, one for the stream, one for the live entry
            and another one for files. For each group a timer for executing the next step is created.

        2. Now, the playlist entries are going to be "pre-loaded". This means that filesystem
            entries are queued and pre-loaded and entries which are based on audio streams are buffered.
            This is required to allow a seamless play-out, when its time to do so (in the next stage).
            Due to their nature, playlist entries which hold live audio sources are not affected by
            this stage at all.

313
        Args:
314
            timeslot (Timeslot):        The timeslot this entries belong to
315
316
317
            entries ([PlaylistEntry]):  The playlist entries to be scheduled for playout
            fade_in (Boolean):          Fade-in at the beginning of the set of entries
            fade_out (Boolean):         Fade-out at the end of the set of entries
318
319

        Returns:
320
            (String):                   Formatted string to display playlist entries in log
321
        """
David Trattnig's avatar
David Trattnig committed
322
323
324
325
        entry_groups = []
        entry_groups.append([])
        previous_entry = None
        index = 0
326

327
        # Mark entries which start after the end of their timeslot or are cut
328
        # clean_entries = self.preprocess_entries(entries, True)
329

330
        # Group/aggregate all filesystem entries, allowing them to be queued at once
331
        for entry in entries:
David Trattnig's avatar
David Trattnig committed
332
333
            if previous_entry == None or \
                (previous_entry != None and \
David Trattnig's avatar
David Trattnig committed
334
335
                 previous_entry.get_content_type() == entry.get_content_type() and \
                 entry.get_content_type() in ResourceClass.FILE.types):
336

David Trattnig's avatar
David Trattnig committed
337
338
339
340
341
342
343
                entry_groups[index].append(entry)
            else:
                index += 1
                entry_groups.append([])
                entry_groups[index].append(entry)
            previous_entry = entry
        self.logger.info("Built %s entry group(s)" % len(entry_groups))
344

345
        # Timeslot function calls
346
        if len(entries) > 0 and len(entry_groups) > 0:
347
348
349
            for entries in entry_groups:
                if not isinstance(entries, list):
                    raise ValueError("Invalid Entry Group: %s" % str(entries))
David Trattnig's avatar
David Trattnig committed
350

351
352
                # Create command timers for each entry group
                PlayCommand(self.engine, entries)
353
        else:
David Trattnig's avatar
David Trattnig committed
354
            self.logger.warn(SU.red("Nothing to schedule ..."))
355
356
357



358
359
    def filter_scheduling_window(self, timeslots):
        """
360
361
        Ignore timeslots which are before the start of scheduling window (start of timeslot - `scheduling_window_start`)
        or after the end of the scheduling window (end of timeslot -`scheduling_window_end`).
362

363
364
365
366
367
        Before the scheduling window:
            - Timeslots can still be deleted in Steering and the playout will respect this

        During the scheduling window:
            - Timeslots and it's playlists are queued as timed commands
368

369
370
371
        After the scheduling window:
            - Such timeslots are ignored, because it doesn't make sense anymore to schedule them before the next
              timeslot starts
372

373
374
375
376
377
378
379
380
        """
        if not timeslots:
            return timeslots

        now_unix = Engine.engine_time()
        len_before = len(timeslots)
        window_start = self.config.get("scheduling_window_start")
        window_end = self.config.get("scheduling_window_end")
381
        timeslots = list(filter(lambda t: (t.start_unix - window_start) < now_unix and now_unix < (t.end_unix - window_end), timeslots))
382
        len_after = len(timeslots)
383
        self.logger.info("For now, skipped %s future timeslot(s) which are out of the scheduling window (T¹-%ss to T²-%ss)" % ((len_before - len_after), window_start, window_end))
384
385
386
387

        return timeslots


388

389
    def terminate(self):
390
        """
391
        Called when thread is stopped or a signal to terminate is received.
392
        """
393
        self.logger.info("Shutting down scheduler ...")
394
395
        self.programme.terminate()
        self.exit_event.set()
396

397
398


David Trattnig's avatar
David Trattnig committed
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419


#
#   EngineExecutor Commands
#


class TimeslotCommand(EngineExecutor):
    """
    Command for triggering start and end of timeslot events.
    """
    engine = None
    config = None

    def __init__(self, engine, timeslot):
        """
        Constructor

        Args:
            engine (Engine):        The engine
            timeslot (Timeslot):    The timeslot which is starting at this time
420
421
        """
        self.config = AuraConfig()
David Trattnig's avatar
David Trattnig committed
422
423
424
425
        self.engine = engine

        fade_out_time = float(self.config.get("fade_out_time"))
        start_fade_out = timeslot.end_unix - fade_out_time
426
427
        self.logger.info(f"Fading out timeslot in {start_fade_out} seconds at {timeslot.timeslot_end} | Timeslot: {timeslot}")
        # Initialize the "fade in" EngineExecuter and instatiate a connected child EngineExecuter for "fade out" when the parent is ready
David Trattnig's avatar
David Trattnig committed
428
        super().__init__("TIMESLOT", None, timeslot.start_unix, self.do_start_timeslot, timeslot)
429
        self.on_ready(lambda: EngineExecutor("TIMESLOT", self, start_fade_out, self.do_end_timeslot, timeslot))
David Trattnig's avatar
David Trattnig committed
430
431
432
433
434
435
436
437
438
439
440
441
442


    def do_start_timeslot(self, timeslot):
        """
        Initiates the start of the timeslot.
        """
        self.logger.info(SU.cyan(f"=== on_timeslot_start('{timeslot}') ==="))
        self.engine.event_dispatcher.on_timeslot_start(timeslot)


    def do_end_timeslot(self, timeslot):
        """
        Initiates the end of the timeslot.
443
        """
David Trattnig's avatar
David Trattnig committed
444
445
446
447
448
449
450
451
        self.logger.info(SU.cyan(f"=== on_timeslot_end('{timeslot}') ==="))
        self.engine.event_dispatcher.on_timeslot_end(timeslot)

        recent_entry = timeslot.get_recent_entry()
        if recent_entry:
            self.engine.player.stop(recent_entry, TransitionType.FADE)
        else:
            self.logger.warning(SU.red(f"Interestingly timeslot {timeslot} has no entry to be faded out?"))
452

David Trattnig's avatar
David Trattnig committed
453
454
455
456


class PlayCommand(EngineExecutor):
    """
457
    Command for triggering timed preloading and playing as a child command.
David Trattnig's avatar
David Trattnig committed
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
    """
    engine = None
    config = None

    def __init__(self, engine, entries):
        """
        Constructor

        Args:
            engine (Engine):            The engine
            entries (PlaylistEntry):    One or more playlist entries to be started
        """
        self.config = AuraConfig()
        self.engine = engine

        start_preload = entries[0].start_unix - self.config.get("preload_offset")
        start_play = entries[0].start_unix
475
476
        # Initialize the "preload" EngineExecuter and attach a child `PlayCommand` to the "on_ready" event handler
        preload_timer = super().__init__("PRELOAD", None, start_preload, self.do_preload, entries)
477
        self.on_ready(lambda: EngineExecutor("PLAY", self, start_play, self.do_play, entries))
478

David Trattnig's avatar
David Trattnig committed
479
480
481
482

    def do_preload(self, entries):
        """
        Preload the entries.
483
        """
David Trattnig's avatar
David Trattnig committed
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
        try:
            if entries[0].get_content_type() in ResourceClass.FILE.types:
                self.logger.info(SU.cyan("=== preload_group('%s') ===" % ResourceUtil.get_entries_string(entries)))
                self.engine.player.preload_group(entries, ChannelType.QUEUE)
            else:
                self.logger.info(SU.cyan("=== preload('%s') ===" % ResourceUtil.get_entries_string(entries)))
                self.engine.player.preload(entries[0])
        except LoadSourceException as e:
            self.logger.critical(SU.red("Could not preload entries %s" % ResourceUtil.get_entries_string(entries)), e)

        if entries[-1].status != EntryPlayState.READY:
            self.logger.critical(SU.red("Entries didn't reach 'ready' state during preloading (Entries: %s)" % ResourceUtil.get_entries_string(entries)))


    def do_play(self, entries):
        """
        Play the entries.
501
        """
David Trattnig's avatar
David Trattnig committed
502
503
        self.logger.info(SU.cyan("=== play('%s') ===" % ResourceUtil.get_entries_string(entries)))
        if entries[-1].status != EntryPlayState.READY:
504
            # Let 'em play anyway ...
David Trattnig's avatar
David Trattnig committed
505
            self.logger.critical(SU.red("PLAY: The entry/entries are not yet ready to be played (Entries: %s)" % ResourceUtil.get_entries_string(entries)))
506
            while (entries[-1].status != EntryPlayState.READY):
David Trattnig's avatar
David Trattnig committed
507
508
509
                self.logger.info("PLAY: Wait a little until preloading is done ...")
                time.sleep(2)

510
        self.engine.player.play(entries[0], TransitionType.FADE)
David Trattnig's avatar
David Trattnig committed
511
512
513
        self.logger.info(self.engine.scheduler.timeslot_renderer.get_ascii_timeslots())