scheduler.py 41.1 KB
Newer Older
David Trattnig's avatar
David Trattnig committed
1

2
#
David Trattnig's avatar
David Trattnig committed
3
# Aura Engine (https://gitlab.servus.at/aura/engine)
4
#
David Trattnig's avatar
David Trattnig committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

20

David Trattnig's avatar
David Trattnig committed
21
22
import logging
import threading
23
import time
24
import sqlalchemy
25

26
27
28
29
30
31
32
33
34
35
36
37
38
39
from enum                       import Enum
from operator                   import attrgetter
from datetime                   import datetime, timedelta

from src.base.config            import AuraConfig
from src.base.utils             import SimpleUtil as SU
from src.base.models            import AuraDatabaseModel, Schedule, Playlist
from src.base.exceptions        import NoActiveScheduleException, LoadSourceException
from src.core.control           import EngineExecutor
from src.core.engine            import Engine
from src.core.channels          import ChannelType, TransitionType, EntryPlayState
from src.core.resources         import ResourceClass, ResourceUtil
from src.scheduling.calendar    import AuraCalendarService
from src.scheduling.fallback    import FallbackManager
David Trattnig's avatar
David Trattnig committed
40

41

42

David Trattnig's avatar
David Trattnig committed
43
44
45
46
47
48
49
50
51
52
53

class EntryQueueState(Enum):
    """
    Types of playlist entry behaviours.
    """
    OKAY = "ok"
    CUT = "cut"
    OUT_OF_SCHEDULE = "oos"



David Trattnig's avatar
David Trattnig committed
54
class AuraScheduler(threading.Thread):
55
56
    """
    Aura Scheduler Class
57

58
    - Retrieves data from Steering and Tank
59
    - Executes engine actions in an automated fashion
60
61
62
63

    Attributes:
        config (AuraConfig):                    Holds the Engine Configuration
        logger:                                 The logger
64
        exit_event(threading.Event):            Used to exit the thread if requested
David Trattnig's avatar
David Trattnig committed
65
        engine:                            Virtual mixer
66
67
68
69
        last_successful_fetch (datetime):       Stores the last time a fetch from Steering/Tank was successful

        programme:                              The current radio programme to be played as defined in the local engine database
        active_entry(Show, Track):              This is a Tuple consisting of the currently played `Show` and `Track`
70
        message_timer(List<threading.Timer>):   The timer queue of sound-system commands for playlists/entries to be played
71
    """
72
73
    config = None
    logger = None
David Trattnig's avatar
David Trattnig committed
74
    engine = None
75
76
77
78
    exit_event = None    
    is_initialized = None
    is_initialized = None

79
    last_successful_fetch = None
80
    programme = None
81
    message_timer = []
David Trattnig's avatar
David Trattnig committed
82
    fallback = None
83

84

85

86
87

    def __init__(self, engine):
88
89
        """
        Constructor
90
91

        Args:
92
            config (AuraConfig):        Reads the engine configuration
David Trattnig's avatar
David Trattnig committed
93
            engine (Engine):            The engine to play the schedule on
94
            func_on_init (Function):    The function to be called when the scheduler is initialized
95
        """
96
        self.config = AuraConfig.config()
97
        self.logger = logging.getLogger("AuraEngine")
98

99
        AuraScheduler.init_database()
David Trattnig's avatar
David Trattnig committed
100
        self.fallback = FallbackManager(self)
David Trattnig's avatar
David Trattnig committed
101
102
        self.engine = engine
        self.engine.scheduler = self
103
        self.is_soundsytem_init = False
104
        
105
106
        # Scheduler Initialization
        self.is_initialized = False
107
        self.is_engine_ready = False
108

109
        # Init scheduling thread
110
        threading.Thread.__init__(self)
111
        self.exit_event = threading.Event()
112
113
        self.start()

114

115

116
    def run(self):
117
        """
David Trattnig's avatar
David Trattnig committed
118
        Called when thread is started via `start()`. It does the following:
119
120
121
        
            1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration. 
            2. Loads the latest programme from the database and sets the instance state `self.programme` with current schedules.
David Trattnig's avatar
David Trattnig committed
122
            3. Queues all schedules of the programme, if the soundssystem is ready to accept commands.
123
124

        On every cycle the configuration file is reloaded, to allow modifications while running the engine.
125
        """
126
        while not self.exit_event.is_set():
127
            try:
128
                self.config.load_config()
129
                seconds_to_wait = int(self.config.get("fetching_frequency"))
130
                self.logger.info(SU.cyan(f"== start fetching new schedules (every {seconds_to_wait} seconds) =="))
131
132
                
                # Load some stuff from the API in any case
133
134
                self.fetch_new_programme()

135
136
137
138
139
140
141
                # Called upon first boot only
                if self.is_engine_ready:
                    if not self.is_initialized:

                        # Queue the start items
                        self.is_initialized = True
                        self.on_scheduler_ready()
142

143
                    # Queue all the other ones
144
                    self.queue_programme()
145

146
            except Exception as e:
147
148
                self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"), e)
                # Keep on working anyway
149

150
151
            self.clean_timer_queue()
            self.print_timer_queue()
152
153
154
155
156
157
158
                        
            # FIXME better location for call
            if self.engine.event_dispatcher:
                current_timeslot = self.get_active_schedule()
                self.engine.event_dispatcher.on_timeslot(current_timeslot)
            
            EngineExecutor.log_commands()            
159
            self.exit_event.wait(seconds_to_wait)
160

161

162

163
164
165
166
#
#   PUBLIC METHODS
#

167
168

    def on_ready(self):
169
        """
170
        Called when the engine is ready.
171
        """
172
        self.is_engine_ready = True
173

174

175
    def on_scheduler_ready(self):
176
        """
177
        Called when the scheduler is ready.
178
179
        """
        self.logger.info(self.get_ascii_programme())
David Trattnig's avatar
David Trattnig committed
180
181
182
183
184
185
186
187

        try:
            self.play_active_entry()
            self.queue_startup_entries()
        except NoActiveScheduleException:
            # That's not good, but keep on working...
            pass

188
189
190
191


    def play_active_entry(self):
        """
David Trattnig's avatar
David Trattnig committed
192
193
        Plays the entry scheduled for the very current moment and forwards to the scheduled position in time. 
        Usually called when the Engine boots.
David Trattnig's avatar
David Trattnig committed
194
195
196

        Raises:
            (NoActiveScheduleException):    If there's no schedule in the programme, within the scheduling window
197
198
        """
        sleep_offset = 10
David Trattnig's avatar
David Trattnig committed
199
        active_schedule = self.get_active_schedule()
David Trattnig's avatar
David Trattnig committed
200
201
202

        # Schedule any available fallback playlist
        if active_schedule:
David Trattnig's avatar
David Trattnig committed
203
            self.fallback.queue_fallback_playlist(active_schedule)
David Trattnig's avatar
David Trattnig committed
204
205
206
207
            # Queue the fade-out of the schedule
            if not active_schedule.fadeouttimer:
                self.queue_end_of_schedule(active_schedule, True)

208
209
210
211
212
        active_entry = self.get_active_entry()
        if not active_entry:
            raise NoActiveScheduleException

        # In case of a file-system source, we need to fast-foward to the current marker as per schedule
David Trattnig's avatar
David Trattnig committed
213
        if active_entry.get_content_type() in ResourceClass.FILE.types:
214
215

            # Calculate the seconds we have to fast-forward
David Trattnig's avatar
David Trattnig committed
216
            now_unix = Engine.engine_time()
217
218
219
220
221
222
            seconds_to_seek = now_unix - active_entry.start_unix

            # If the seek exceeds the length of the current track, 
            # there's no need to do anything - the scheduler takes care of the rest
            if (seconds_to_seek + sleep_offset) > active_entry.duration:
                self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.")
David Trattnig's avatar
David Trattnig committed
223
            else:                
David Trattnig's avatar
David Trattnig committed
224
                # Pre-roll and play active entry
225
                self.engine.player.preload(active_entry)
David Trattnig's avatar
David Trattnig committed
226
                self.engine.player.play(active_entry, TransitionType.FADE)
227
228
229
230

                # Fast-forward to the scheduled position
                if seconds_to_seek > 0:
                    # Without plenty of timeout (10s) the seek doesn't work
David Trattnig's avatar
David Trattnig committed
231
232
233
234
235
236
237
238
239
240
241
242
                    def async_cue_seek(seconds_to_seek):
                        seconds_to_seek += sleep_offset
                        time.sleep(sleep_offset)
                        self.logger.info("Going to fast-forward %s seconds" % seconds_to_seek)
                        response = self.engine.player.queue_seek(active_entry.channel, seconds_to_seek)
                        self.logger.info("Sound-system seek response: " + response)

                    thread = threading.Thread(target = async_cue_seek, args = (seconds_to_seek,))
                    thread.start()                         

        elif active_entry.get_content_type() in ResourceClass.STREAM.types \
            or active_entry.get_content_type() in ResourceClass.LIVE.types:
243

244
            # Pre-roll and play active entry
245
            self.engine.player.preload(active_entry)
David Trattnig's avatar
David Trattnig committed
246
            self.engine.player.play(active_entry, TransitionType.FADE)
247

David Trattnig's avatar
David Trattnig committed
248
249
        else:
            self.logger.critical("Unknown Entry Type: %s" % active_entry)
David Trattnig's avatar
David Trattnig committed
250
        
251
252
253



254
    def get_active_entry(self):
255
        """
256
        Retrieves the current `PlaylistEntry` which should be played as per programme. 
257

258
        Returns:
259
            (PlaylistEntry): The track which is (or should) currently being played
260
        """
David Trattnig's avatar
David Trattnig committed
261
        now_unix = Engine.engine_time()
262
263
264
265
266

        # Load programme if necessary
        if not self.programme:
            self.load_programme_from_db()

David Trattnig's avatar
David Trattnig committed
267
268
        # Check for current schedule
        current_schedule = self.get_active_schedule()
269
        if not current_schedule:
David Trattnig's avatar
David Trattnig committed
270
            self.logger.warning(SU.red("There's no active timeslot"))
271
            return None
David Trattnig's avatar
David Trattnig committed
272
273

        # Check for scheduled playlist
David Trattnig's avatar
David Trattnig committed
274
        current_playlist = current_schedule.get_playlist()
275
        if not current_playlist:
David Trattnig's avatar
David Trattnig committed
276
            msg = "There's no playlist assigned to the current timeslot. Most likely a fallback will make things okay again."
David Trattnig's avatar
David Trattnig committed
277
            self.logger.warning(SU.red(msg))
278
279
280
281
282
            return None

        # Iterate over playlist entries and store the current one
        current_entry = None
        for entry in current_playlist.entries:
283
            if entry.start_unix <= now_unix and now_unix <= entry.end_unix:
284
285
                current_entry = entry
                break       
286
287
      
        if not current_entry:
288
            # Nothing playing ... fallback will kick-in
David Trattnig's avatar
David Trattnig committed
289
290
            msg = "There's no entry scheduled for playlist '%s' at %s" % (str(current_playlist), SU.fmt_time(now_unix))
            self.logger.warning(SU.red(msg))
291
            return None
292

293
294
295
296
        return current_entry



David Trattnig's avatar
David Trattnig committed
297
    def get_active_schedule(self):
298
        """
David Trattnig's avatar
David Trattnig committed
299
300
        Retrieves the schedule currently to be played. 
        
301
        Returns:
David Trattnig's avatar
David Trattnig committed
302
            (Schedule): The current schedule
303
304
        """
        current_schedule = None
David Trattnig's avatar
David Trattnig committed
305
        now_unix = Engine.engine_time()
306

David Trattnig's avatar
David Trattnig committed
307
        # Iterate over all schedules and find the one to be played right now
308
309
310
311
312
        if self.programme:
            for schedule in self.programme:
                if schedule.start_unix <= now_unix and now_unix < schedule.end_unix:
                    current_schedule = schedule
                    break
313
        
David Trattnig's avatar
David Trattnig committed
314
        return current_schedule
315

316

317

318
    def get_next_timeslots(self, max_count=0):
319
        """
320
        Retrieves the timeslots to be played after the current one.
321
322

        Args:
323
            max_count (Integer): Maximum of timeslots to return, if `0` all exitsing ones are returned
324
        Returns:
325
            ([Timeslot]): The next timeslots
326
        """
David Trattnig's avatar
David Trattnig committed
327
        now_unix = Engine.engine_time()
328
329
330
331
332
333
334
335
336
337
338
339
        next_schedules = []

        for schedule in self.programme:
            if schedule.start_unix > now_unix:
                if (len(next_schedules) < max_count) or max_count == 0:
                    next_schedules.append(schedule)
                else:
                    break
                
        return next_schedules


340
341
342
343
344
345
346
347
    def get_active_playlist(self):
        """
        Retrieves the currently playing playlist.

        Returns:
            (Playlist): The resolved playlist
        """
        schedule = self.get_active_schedule()
David Trattnig's avatar
David Trattnig committed
348
349
350
        if schedule:
            return schedule.get_playlist()
        return None
351

352

353

354
    def print_timer_queue(self):
355
        """
356
        Prints the current timer queue i.e. playlists in the queue to be played.
357
358
359
360
        """
        message_queue = ""
        messages = sorted(self.message_timer, key=attrgetter('diff'))
        if not messages:
David Trattnig's avatar
David Trattnig committed
361
            self.logger.warning(SU.red("There's nothing in the Timer Queue!"))
362
363
364
365
        else:
            for msg in messages:
                message_queue += str(msg)+"\n"

366
367
368
369
370
371
372
373
374
375
376
            self.logger.info("Timer queue: \n" + message_queue) 



    def clean_timer_queue(self):
        """
        Removes inactive timers from the queue.
        """
        len_before = len(self.message_timer)
        self.message_timer[:] = [m for m in self.message_timer if m.is_alive()]
        len_after = len(self.message_timer)
David Trattnig's avatar
David Trattnig committed
377
        self.logger.debug("Removed %s finished timer objects from queue" % (len_before - len_after))
378

379

380

381

382
    def get_ascii_programme(self):
383
        """
384
        Creates a printable version of the current programme (playlists and entries as per schedule) 
David Trattnig's avatar
David Trattnig committed
385

386
        Returns:
David Trattnig's avatar
David Trattnig committed
387
            (String):   An ASCII representation of the programme
388
        """
David Trattnig's avatar
David Trattnig committed
389
        active_schedule = self.get_active_schedule()
390

David Trattnig's avatar
David Trattnig committed
391
        s = "\n\n   SCHEDULED NOW:"
392
393
        s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────"
        if active_schedule:
David Trattnig's avatar
David Trattnig committed
394
395
            planned_playlist = None
            if active_schedule.playlist:
David Trattnig's avatar
David Trattnig committed
396
397
                planned_playlist = active_schedule.playlist[0]

David Trattnig's avatar
David Trattnig committed
398
            (fallback_type, resolved_playlist) = self.fallback.resolve_playlist(active_schedule)
David Trattnig's avatar
David Trattnig committed
399

David Trattnig's avatar
David Trattnig committed
400
            s += "\n│   Playing timeslot %s         " % active_schedule
David Trattnig's avatar
David Trattnig committed
401
402
403
404
            if planned_playlist: 
                if resolved_playlist and resolved_playlist.playlist_id != planned_playlist.playlist_id:
                    s += "\n│       └── Playlist %s         " % planned_playlist
                    s += "\n│       "
David Trattnig's avatar
David Trattnig committed
405
                    s += SU.red("↑↑↑ That's the originally planned playlist.") + ("Instead playing the fallback playlist below ↓↓↓")
David Trattnig's avatar
David Trattnig committed
406

407
            if resolved_playlist:                                  
David Trattnig's avatar
David Trattnig committed
408
409
                if not planned_playlist:
                    s += "\n│                         "
David Trattnig's avatar
David Trattnig committed
410
                    s += SU.red("No playlist assigned to timeslot. Instead playing the `%s` playlist below ↓↓↓" % SU.cyan(str(fallback_type)))
David Trattnig's avatar
David Trattnig committed
411
412
413

                s += "\n│       └── Playlist %s         " % resolved_playlist

414
                active_entry = self.get_active_entry()
415

416
                # Finished entries
David Trattnig's avatar
David Trattnig committed
417
                for entry in resolved_playlist.entries:
418
419
420
421
                    if active_entry == entry:
                        break
                    else:
                        s += self.build_entry_string("\n│         └── ", entry, True)
422

423
424
425
                # Entry currently being played
                if active_entry:
                    s += "\n│         └── Entry %s | %s         " % \
David Trattnig's avatar
David Trattnig committed
426
                        (str(active_entry.entry_num+1), SU.green("PLAYING > "+str(active_entry)))
427

428
429
430
431
432
433
                    # Open entries for current playlist
                    rest_of_playlist = active_entry.get_next_entries(False)
                    entries = self.preprocess_entries(rest_of_playlist, False)
                    s += self.build_playlist_string(entries)
                    
            else:
David Trattnig's avatar
David Trattnig committed
434
                s += "\n│       └── %s" % (SU.red("No active playlist. There should be at least some fallback playlist running..."))
435
436
437
        else:
            s += "\n│   Nothing.           "
        s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────"
438

David Trattnig's avatar
David Trattnig committed
439
        s += "\n   SCHEDULED NEXT:"
440
        s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────"
441

442
        next_schedules = self.get_next_timeslots()
David Trattnig's avatar
David Trattnig committed
443
        if not next_schedules:
444
445
            s += "\n│   Nothing.         "
        else:
David Trattnig's avatar
David Trattnig committed
446
            for schedule in next_schedules:        
David Trattnig's avatar
David Trattnig committed
447
                (fallback_type, resolved_playlist) = self.fallback.resolve_playlist(schedule)
448
                if resolved_playlist:
David Trattnig's avatar
David Trattnig committed
449
450
451

                    s += "\n│   Queued timeslot %s         " % schedule
                    s += "\n│      └── Playlist %s         (Type: %s)" % (resolved_playlist, SU.cyan(str(fallback_type)))
452
453
                    if resolved_playlist.end_unix > schedule.end_unix:
                        s += "\n│          %s!              " % \
David Trattnig's avatar
David Trattnig committed
454
                        (SU.red("↑↑↑ Playlist #%s ends after timeslot #%s!" % (resolved_playlist.playlist_id, schedule.schedule_id)))
455
456
457
                    
                    entries = self.preprocess_entries(resolved_playlist.entries, False)
                    s += self.build_playlist_string(entries)
458

459
460
        s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────\n\n"
        return s
461

David Trattnig's avatar
David Trattnig committed
462

David Trattnig's avatar
David Trattnig committed
463

464
465
466
467
468
469
    def build_playlist_string(self, entries):
        """
        Returns a stringified list of entries
        """
        s = ""
        is_out_of_schedule = False
David Trattnig's avatar
David Trattnig committed
470

471
472
473
        for entry in entries:
            if entry.queue_state == EntryQueueState.OUT_OF_SCHEDULE and not is_out_of_schedule:
                s += "\n│             %s" % \
David Trattnig's avatar
David Trattnig committed
474
                    SU.red("↓↓↓ These entries won't be played because they are out of schedule.")
475
                is_out_of_schedule = True
476

477
            s += self.build_entry_string("\n│         └── ", entry, is_out_of_schedule)
478

479
        return s
480

481

David Trattnig's avatar
David Trattnig committed
482

483
484
485
486
487
488
    def build_entry_string(self, prefix, entry, strike):
        """
        Returns an stringified entry.
        """
        s = ""
        if entry.queue_state == EntryQueueState.CUT:
David Trattnig's avatar
David Trattnig committed
489
            s = "\n│             %s" % SU.red("↓↓↓ This entry is going to be cut.")
490

491
        if strike:
David Trattnig's avatar
David Trattnig committed
492
            entry_str = SU.strike(entry)
493
494
        else:
            entry_str = str(entry)
495

496
497
        entry_line = "%sEntry %s | %s" % (prefix, str(entry.entry_num+1), entry_str)
        return s + entry_line
498
499


500

501
502
503
504
#
#   PRIVATE METHODS
#

David Trattnig's avatar
David Trattnig committed
505

506

David Trattnig's avatar
David Trattnig committed
507
508
    def filter_scheduling_window(self, schedules):
        """
509
        Ignore schedules which are beyond the scheduling window. The end of the scheduling window
David Trattnig's avatar
David Trattnig committed
510
511
512
        is defined by the config option `scheduling_window_end`. This value defines the seconds
        minus the actual start time of the schedule.
        """
David Trattnig's avatar
David Trattnig committed
513
        now_unix = Engine.engine_time()
David Trattnig's avatar
David Trattnig committed
514
515
516
517
518
519
        len_before = len(schedules)
        window_start = self.config.get("scheduling_window_start")
        window_end = self.config.get("scheduling_window_end")
        schedules = list(filter(lambda s: (s.start_unix - window_end) > now_unix and (s.start_unix - window_start) < now_unix, schedules))
        len_after = len(schedules)
        self.logger.info("For now, skipped %s future schedule(s) which are out of the scheduling window (-%ss <-> -%ss)" % ((len_before - len_after), window_start, window_end))
520

David Trattnig's avatar
David Trattnig committed
521
        return schedules
522
523


David Trattnig's avatar
David Trattnig committed
524
525

    def is_schedule_in_window(self, schedule):
526
        """
David Trattnig's avatar
David Trattnig committed
527
        Checks if the schedule is within the scheduling window.
528
        """
David Trattnig's avatar
David Trattnig committed
529
        now_unix = Engine.engine_time()
David Trattnig's avatar
David Trattnig committed
530
531
        window_start = self.config.get("scheduling_window_start")
        window_end = self.config.get("scheduling_window_end")
532

David Trattnig's avatar
David Trattnig committed
533
534
535
536
537
        if schedule.start_unix - window_start < now_unix and \
            schedule.start_unix - window_end > now_unix:

            return True
        return False
538
539
540



541
542
543
    def queue_programme(self):
        """
        Queues the current programme (playlists as per schedule) by creating
544
        timed commands to the sound-system to enable the individual tracks of playlists. 
545
546
        """

547
548
        # Get a clean set of the timeslots within the scheduling window
        schedules = self.get_next_timeslots()
David Trattnig's avatar
David Trattnig committed
549
550
551
552
553
        schedules = self.filter_scheduling_window(schedules)

        # Queue the schedules, their playlists and entries
        if schedules:
            for next_schedule in schedules:
David Trattnig's avatar
David Trattnig committed
554
                # Schedule any available fallback playlist
David Trattnig's avatar
David Trattnig committed
555
                self.fallback.queue_fallback_playlist(next_schedule)
David Trattnig's avatar
David Trattnig committed
556
557
558
559

                if next_schedule.playlist:
                    self.queue_playlist_entries(next_schedule, next_schedule.get_playlist().entries, False, True)
                                
David Trattnig's avatar
David Trattnig committed
560
561
562
                # Queue the fade-out of the schedule
                if not next_schedule.fadeouttimer:
                    self.queue_end_of_schedule(next_schedule, True)
563

David Trattnig's avatar
David Trattnig committed
564
        self.logger.info(SU.green("Finished queuing programme."))
565

David Trattnig's avatar
David Trattnig committed
566
567
568
569
570
571
572
573


    def queue_startup_entries(self):
        """
        Queues all entries after the one currently playing upon startup. Don't use
        this method in any other scenario, as it doesn't respect the scheduling window.
        """
        current_schedule = self.get_active_schedule()
574
        
David Trattnig's avatar
David Trattnig committed
575
576
        # Queue the (rest of the) currently playing schedule upon startup
        if current_schedule:
David Trattnig's avatar
David Trattnig committed
577
            current_playlist = current_schedule.get_playlist()
David Trattnig's avatar
David Trattnig committed
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592

            if current_playlist:
                active_entry = self.get_active_entry()

                # Finished entries
                for entry in current_playlist.entries:
                    if active_entry == entry:
                        break

                # Entry currently being played
                if active_entry:

                    # Queue open entries for current playlist
                    rest_of_playlist = active_entry.get_next_entries(True)
                    self.queue_playlist_entries(current_schedule, rest_of_playlist, False, True)
593

David Trattnig's avatar
David Trattnig committed
594
                    # Store them for later reference
David Trattnig's avatar
David Trattnig committed
595
596
597
                    current_schedule.queued_entries = [active_entry]
                    if rest_of_playlist:
                        current_schedule.queued_entries.append(rest_of_playlist)
598
599


David Trattnig's avatar
David Trattnig committed
600
601

    def queue_playlist_entries(self, schedule, entries, fade_in, fade_out):
602
        """
603
        Creates sound-system player commands for all playlist items to be executed at the scheduled time.
David Trattnig's avatar
David Trattnig committed
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620

        Since each scheduled playlist can consist of multiple entry types such as *file*, *live*,
        and *stream*, the play-out of the schedule is actually a bit more complex. Before any playlist 
        entries of the schedule can be turned into sound, they need to be grouped, queued and pre-loaded.

        1. First, all entries are aggregated when they hold filesystem entries.
            Given you have a playlist with 10 entries, the first 4 are consisting of files, the next two
            of a a stream and a live source. The last 4 are files again. These entries are now
            aggregated into 4 groups: one for the files, one for the stream, one for the live entry
            and another one for files. For each group a timer for executing the next step is created.

        2. Now, the playlist entries are going to be "pre-loaded". This means that filesystem
            entries are queued and pre-loaded and entries which are based on audio streams are buffered.
            This is required to allow a seamless play-out, when its time to do so (in the next stage).
            Due to their nature, playlist entries which hold live audio sources are not affected by
            this stage at all.

621
        Args:
David Trattnig's avatar
David Trattnig committed
622
            schedule (Schedule):        The schedule this entries belong to
623
624
625
            entries ([PlaylistEntry]):  The playlist entries to be scheduled for playout
            fade_in (Boolean):          Fade-in at the beginning of the set of entries
            fade_out (Boolean):         Fade-out at the end of the set of entries
626
627

        Returns:
628
629
            (String):                   Formatted string to display playlist entries in log
        """       
David Trattnig's avatar
David Trattnig committed
630
631
632
633
        entry_groups = []
        entry_groups.append([])
        previous_entry = None
        index = 0
634
635
636

        # Mark entries which start after the end of their schedule or are cut
        clean_entries = self.preprocess_entries(entries, True)
637

638
        # Group/aggregate all filesystem entries, allowing them to be queued at once
639
        for entry in clean_entries:
David Trattnig's avatar
David Trattnig committed
640
641
            if previous_entry == None or \
                (previous_entry != None and \
David Trattnig's avatar
David Trattnig committed
642
643
                 previous_entry.get_content_type() == entry.get_content_type() and \
                 entry.get_content_type() in ResourceClass.FILE.types):
David Trattnig's avatar
David Trattnig committed
644
645
646
647
648
649
650
651
652
653
                
                entry_groups[index].append(entry)
            else:
                index += 1
                entry_groups.append([])
                entry_groups[index].append(entry)
            previous_entry = entry
        self.logger.info("Built %s entry group(s)" % len(entry_groups))
         
        # Schedule function calls
David Trattnig's avatar
David Trattnig committed
654
        do_queue_schedule_end = False
655
656
657
658
        if len(clean_entries) > 0 and len(entry_groups) > 0:
            for entries in entry_groups:
                if not isinstance(entries, list):
                    raise ValueError("Invalid Entry Group: %s" % str(entries))
David Trattnig's avatar
David Trattnig committed
659
660

                # Create timers for each entry group
661
                self.set_entries_timer(entries, fade_in, fade_out)
662

David Trattnig's avatar
David Trattnig committed
663
664
665
                # Store them for later reference
                schedule.queued_entries = clean_entries

666
        else:
David Trattnig's avatar
David Trattnig committed
667
            self.logger.warn(SU.red("Nothing to schedule ..."))
668
669
670



671
    def set_entries_timer(self, entries, fade_in, fade_out):
David Trattnig's avatar
David Trattnig committed
672
        """
David Trattnig's avatar
David Trattnig committed
673
        Creates timer for loading and playing one or multiple entries. Existing timers are 
David Trattnig's avatar
David Trattnig committed
674
        updated.
David Trattnig's avatar
David Trattnig committed
675
676
677

        Args:
            entries ([]): List of multiple filesystem entries, or a single entry of other types
David Trattnig's avatar
David Trattnig committed
678
        """
David Trattnig's avatar
David Trattnig committed
679
        play_timer = self.is_something_planned_at_time(entries[0].start_unix)
David Trattnig's avatar
David Trattnig committed
680
        now_unix = Engine.engine_time()
David Trattnig's avatar
David Trattnig committed
681
        diff = entries[0].start_unix - now_unix
David Trattnig's avatar
David Trattnig committed
682
683

        # Play function to be called by timer
David Trattnig's avatar
David Trattnig committed
684
        def do_play(entries):
David Trattnig's avatar
David Trattnig committed
685
            self.logger.info(SU.cyan("=== play('%s') ===" % ResourceUtil.get_entries_string(entries)))
David Trattnig's avatar
David Trattnig committed
686
687
688
689
            transition_type = TransitionType.INSTANT
            if fade_in:
                transition_type = TransitionType.FADE

David Trattnig's avatar
David Trattnig committed
690
            if entries[-1].status != EntryPlayState.READY:
David Trattnig's avatar
David Trattnig committed
691
                self.logger.critical(SU.red("PLAY: For some reason the entry/entries are not yet ready to be played (Entries: %s)" % ResourceUtil.get_entries_string(entries)))
David Trattnig's avatar
David Trattnig committed
692
                # Let 'em play anyway ...
David Trattnig's avatar
David Trattnig committed
693

David Trattnig's avatar
David Trattnig committed
694
            self.engine.player.play(entries[0], transition_type)
David Trattnig's avatar
David Trattnig committed
695
696
697
698
699
            self.logger.info(self.get_ascii_programme())


        if play_timer:
            # Check if the Playlist IDs are different
David Trattnig's avatar
David Trattnig committed
700
            if self.have_entries_changed(play_timer, entries):
David Trattnig's avatar
David Trattnig committed
701
702
703
                # If not, stop and remove the old timer, create a new one
                self.stop_timer(play_timer)                
            else:
David Trattnig's avatar
David Trattnig committed
704
                # If the playlist entries do not differ => reuse the old timer and do nothing
David Trattnig's avatar
David Trattnig committed
705
                self.logger.debug("Playlist Entry %s is already scheduled - no new timer created." % ResourceUtil.get_entries_string(entries))
David Trattnig's avatar
David Trattnig committed
706
707
708
                return
        
        # If nothing is planned at given time, create a new timer
709
        (entries[0].switchtimer, entries[0].loadtimer) = self.create_timer(diff, do_play, entries, switcher=True)
David Trattnig's avatar
David Trattnig committed
710
711
712



David Trattnig's avatar
David Trattnig committed
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
    def have_entries_changed(self, timer, new_entries):
        """
        Checks if the new entries and playlists are matching the existing queued ones,
        or if they should be updated.
        
        Args:
            timer (CallFunctionTimer):      The timer holding queued entries
            new_entries ([PlaylistEntry]):  The possibly updated entries

        Returns:
            (Boolean):  `True` if it has changed
        """
        old_entries = timer.entries

        if old_entries[0].playlist and new_entries[0].playlist:
            if old_entries[0].playlist.playlist_id != new_entries[0].playlist.playlist_id:
                return True
        if len(old_entries) != len(new_entries):
            return True

        for old_entry, new_entry in zip(old_entries, new_entries):
            if old_entry.source != new_entry.source:
                return True
        
        return False


740
741
742
743
744
745
    def preprocess_entries(self, entries, cut_oos):
        """
        Analyses and marks entries which are going to be cut or excluded.

        Args:
            entries ([PlaylistEntry]):  The playlist entries to be scheduled for playout
David Trattnig's avatar
David Trattnig committed
746
            cut_oos (Boolean):  If `True` entries which are 'out of schedule' are not returned
747
748
749
750
751
752
753
754
755
756

        Returns:
            ([PlaylistEntry]):  The list of processed playlist entries
        """
        clean_entries = []

        for entry in entries:

            if entry.entry_start >= entry.playlist.schedule.schedule_end:
                msg = "Filtered entry (%s) after end-of schedule (%s) ... SKIPPED" % (entry, entry.playlist.schedule)
David Trattnig's avatar
David Trattnig committed
757
                self.logger.warning(SU.red(msg))
758
759
760
                entry.queue_state = EntryQueueState.OUT_OF_SCHEDULE
            elif entry.end_unix > entry.playlist.schedule.end_unix:
                entry.queue_state = EntryQueueState.CUT
761
            else:
762
763
764
765
                entry.queue_state = EntryQueueState.OKAY
            
            if not entry.queue_state == EntryQueueState.OUT_OF_SCHEDULE or not cut_oos:
                clean_entries.append(entry)
766

767
768
769
770
        return clean_entries



David Trattnig's avatar
David Trattnig committed
771
    def queue_end_of_schedule(self, schedule, fade_out):
772
        """
David Trattnig's avatar
David Trattnig committed
773
        Queues a engine action to stop/fade-out the given schedule.
774
775

        Args:
David Trattnig's avatar
David Trattnig committed
776
777
            schedule (PlaylistEntry):  The schedule
            fade_out (Boolean):        If the schedule should be faded-out
778
        """
David Trattnig's avatar
David Trattnig committed
779
780
        schedule_end = schedule.schedule_end
        schedule_end_unix = schedule.end_unix
David Trattnig's avatar
David Trattnig committed
781
        now_unix = Engine.engine_time()
782
783
784
        fade_out_time = 0

        # Stop function to be called when schedule ends
David Trattnig's avatar
David Trattnig committed
785
        def do_stop(schedule):
David Trattnig's avatar
David Trattnig committed
786
787
788
789
790
791
792
            if schedule.has_queued_entries():
                last_entry = schedule.queued_entries[-1]
                self.logger.info(SU.cyan("=== stop('%s') ===" % str(last_entry.playlist.schedule)))
                transition_type = TransitionType.INSTANT
                if fade_out:
                    transition_type = TransitionType.FADE
                self.engine.player.stop(last_entry, transition_type)
793
794

        if fade_out == True:
David Trattnig's avatar
David Trattnig committed
795
            fade_out_time = int(round(float(self.config.get("fade_out_time")))) #TODO Use float
796
        
David Trattnig's avatar
David Trattnig committed
797
798
799
800
801
802
        # Stop any existing fade-out timer
        if schedule.fadeouttimer:
            schedule.fadeouttimer.cancel()
            self.message_timer.remove(schedule.fadeouttimer)

        # Create timer to fade-out
803
        start_fade_out = schedule_end_unix - now_unix - fade_out_time
David Trattnig's avatar
David Trattnig committed
804
805
        # last_entry = schedule.queued_entries[-1]
        schedule.fadeouttimer = self.create_timer(start_fade_out, do_stop, schedule, fadeout=True)
806

David Trattnig's avatar
David Trattnig committed
807
        self.logger.info("Fading out schedule in %s seconds at %s | Schedule: %s" % (str(start_fade_out), str(schedule_end), schedule))
808
809
810



811
812
    def fetch_new_programme(self):
        """
813
814
        Fetch the latest programme from `AuraCalendarService` which stores it to the database.
        After that, the programme is in turn loaded from the database and stored in `self.programme`.
815
816
        """

817
        # Fetch programme from API endpoints
David Trattnig's avatar
David Trattnig committed
818
        self.logger.debug("Trying to fetch new programe from API endpoints...")
819
820
821
822
        acs = AuraCalendarService(self.config)
        queue = acs.get_queue()
        acs.start() # start fetching thread
        response = queue.get() # wait for the end
David Trattnig's avatar
David Trattnig committed
823
        self.logger.debug("... Programme fetch via API done!")
824
825
826
827
828
829

        # Reset last successful fetch state
        lsf = self.last_successful_fetch
        self.last_successful_fetch = None

        if response is None:
David Trattnig's avatar
David Trattnig committed
830
            msg = SU.red("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.")
David Trattnig's avatar
David Trattnig committed
831
            self.logger.warning(msg)
832
833
834
        elif type(response) is list:
            self.programme = response
            if self.programme is not None and len(self.programme) > 0:
David Trattnig's avatar
David Trattnig committed
835
                self.last_successful_fetch = datetime.now()
David Trattnig's avatar
David Trattnig committed
836
                self.logger.info(SU.green("Finished fetching current programme from API"))
837
838
839
            if len(self.programme) == 0:
                self.logger.critical("Programme fetched from Steering/Tank has no entries!")
        elif response.startswith("fetching_aborted"):
David Trattnig's avatar
David Trattnig committed
840
            msg = SU.red("Trying to load programme from database only, because fetching was being aborted from AuraCalendarService! Reason: ")
David Trattnig's avatar
David Trattnig committed
841
            self.logger.warning(msg + response[16:])
842
        else:
David Trattnig's avatar
David Trattnig committed
843
            msg = SU.red("Trying to load programme from database only, because of an unknown response from AuraCalendarService: " + response)
David Trattnig's avatar
David Trattnig committed
844
            self.logger.warning(msg)
845

846
        # Always load latest programme from the database
847
848
        self.last_successful_fetch = lsf
        self.load_programme_from_db()
David Trattnig's avatar
David Trattnig committed
849
        self.logger.info(SU.green("Finished loading current programme from database (%s schedules)" % str(len(self.programme))))
850
        for schedule in self.programme:
David Trattnig's avatar
David Trattnig committed
851
            self.logger.debug("\tSchedule %s with Playlist %s" % (str(schedule), str(schedule.playlist)))
852

853
854
855
856
857
858
859
860
861



    def load_programme_from_db(self):
        """
        Loads the programme from Engine's database and enables
        them via `self.enable_entries(..)`. After that, the
        current message queue is printed to the console.
        """
862
        self.programme = Schedule.select_programme()
863

864
        if not self.programme:
David Trattnig's avatar
David Trattnig committed
865
            self.logger.critical(SU.red("Could not load programme from database. We are in big trouble my friend!"))
866
867
            return

868

David Trattnig's avatar
David Trattnig committed
869

870
871
872
873
874
    def is_something_planned_at_time(self, given_time):
        """
        Checks for existing timers at the given time.
        """
        for t in self.message_timer:
David Trattnig's avatar
David Trattnig committed
875
876
877
            if t.fadein or t.switcher:
                if t.entries[0].start_unix == given_time:
                    return t
878
879
880
        return False


881

David Trattnig's avatar
David Trattnig committed
882
    def create_timer(self, diff, func, param, fadein=False, fadeout=False, switcher=False):
883
884
885
886
887
888
        """
        Creates a new timer for timed execution of mixer commands.

        Args:
            diff (Integer):     The difference in seconds from now, when the call should happen
            func (Function):    The function to call
David Trattnig's avatar
David Trattnig committed
889
            param ([]):         A schedule or list of entries
890

David Trattnig's avatar
David Trattnig committed
891
892
893
        Returns:
            (CallFunctionTimer, CallFunctionTimer):     In case of a "switch" command, the switch and pre-roll timer is returned
            (CallFunctionTimer):                        In all other cases only the timer for the command is returned
894
        """
895
        if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
896
            raise ValueError("You have to call me with either fadein=true, fadeout=true or switcher=True")
David Trattnig's avatar
David Trattnig committed
897
898
        if not isinstance(param, list) and not isinstance(param, Schedule):
            raise ValueError("No list of entries nor schedule passed!")
899

David Trattnig's avatar
David Trattnig committed
900
        t = CallFunctionTimer(diff=diff, func=func, param=param, fadein=fadein, fadeout=fadeout, switcher=switcher)
901
902
        self.message_timer.append(t)
        t.start()
David Trattnig's avatar
David Trattnig committed
903
904

        if switcher:
905
            # Pre-roll function to be called by timer
906
            def do_preload(entries):
David Trattnig's avatar
David Trattnig committed
907
                try:
David Trattnig's avatar
David Trattnig committed
908
                    if entries[0].get_content_type() in ResourceClass.FILE.types:
909
910
                        self.logger.info(SU.cyan("=== preload_group('%s') ===" % ResourceUtil.get_entries_string(entries)))
                        self.engine.player.preload_group(entries, ChannelType.QUEUE)
David Trattnig's avatar
David Trattnig committed
911
                    else:
912
913
                        self.logger.info(SU.cyan("=== preload('%s') ===" % ResourceUtil.get_entries_string(entries)))
                        self.engine.player.preload(entries[0])
David Trattnig's avatar
David Trattnig committed
914
                except LoadSourceException as e: