Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
engine
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
AURA
engine
Commits
3545ca25
Commit
3545ca25
authored
1 year ago
by
David Trattnig
Browse files
Options
Downloads
Patches
Plain Diff
refactor: scheduler for new domain classes
parent
90360007
Branches
Branches containing commit
Tags
Tags containing commit
1 merge request
!35
ORM-less scheduling
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/aura_engine/scheduling/scheduler.py
+21
-88
21 additions, 88 deletions
src/aura_engine/scheduling/scheduler.py
with
21 additions
and
88 deletions
src/aura_engine/scheduling/scheduler.py
+
21
−
88
View file @
3545ca25
...
...
@@ -108,8 +108,6 @@ class TimeslotCommand(EngineExecutor):
Indicate the start of the timeslot by sending a `on_timeslot_end` event.
Also resetting the used channel.
TODO refactor
"""
self
.
logger
.
info
(
SU
.
cyan
(
f
"
=== on_timeslot_end(
'
{
timeslot
}
'
) ===
"
))
self
.
engine
.
event_dispatcher
.
on_timeslot_end
(
timeslot
)
...
...
@@ -168,8 +166,6 @@ class PlayCommand(EngineExecutor):
Args:
items ([PlaylistItem]): The set of playlist items to be pre-loaded.
TODO refactor
"""
items_str
=
ResourceUtil
.
get_items_string
(
items
)
try
:
...
...
@@ -194,8 +190,6 @@ class PlayCommand(EngineExecutor):
Args:
items ([PlaylistItem]): The set of playlist items to be played.
TODO refactor
"""
items_str
=
ResourceUtil
.
get_items_string
(
items
)
self
.
logger
.
info
(
SU
.
cyan
(
f
"
=== play(
'
{
items_str
}
'
) ===
"
))
...
...
@@ -275,15 +269,13 @@ class AuraScheduler(threading.Thread):
Called when thread is started via `start()`. It does the following:
1.
`self.fetch_new_programme()`
periodically from the API depending on the
1.
Refresh timetable
periodically from the API depending on the
`fetching_frequency` defined in the engine configuration.
2. Loads the latest programme from the database and sets the instance state
`self.timetable` with current timeslots.
3. Queues all timeslots of the programme, if the soundssystem is ready to accept
commands.
2. Merge the timetable with the latest, local version.
3. Queue all timeslots of the timetable
On every cycle the configuration file is reloaded, to allow modifications while running
the
engine.
On every cycle the configuration file is reloaded, to allow modifications while running
the
engine.
"""
while
not
self
.
exit_event
.
is_set
():
try
:
...
...
@@ -338,7 +330,6 @@ class AuraScheduler(threading.Thread):
Args:
item (PlaylistItem):
"""
# Nothing to do atm
...
...
@@ -357,9 +348,7 @@ class AuraScheduler(threading.Thread):
"""
return
self
.
timetable
def
play_active_item
(
self
,
):
def
play_active_item
(
self
):
"""
Play currently active playlist item, as per timetable.
...
...
@@ -430,22 +419,20 @@ class AuraScheduler(threading.Thread):
Playlists of every timeslot are queued by creating timed commands to the sound-system to
enable the individual tracks of playlists.
TODO refactor
"""
# Get a clean set of the timeslots within the scheduling window
timeslots
=
self
.
timetable
.
get_next_timeslots
()
timeslots
=
self
.
filter_scheduling_window
(
timeslots
)
timeslots
=
self
.
timetable
.
get_next_timeslots
(
window_aware
=
True
)
# Queue the timeslots, their playlists and items
if
timeslots
:
next_timeslot
:
Timeslot
for
next_timeslot
in
timeslots
:
# Create command timer to indicate the start of the timeslot
AuraScheduler
.
TimeslotCommandClass
(
self
.
engine
,
next_timeslot
)
playlist
_type
,
playlist
=
self
.
timetable
.
get_current_playlist
(
next_timeslot
)
playlist
=
next_timeslot
.
get_current_playlist
()
if
playlist
:
self
.
queue_playlist_items
(
next_timeslot
,
playlist
.
items
,
False
,
True
)
self
.
queue_playlist_items
(
next_timeslot
,
playlist
.
items
)
self
.
logger
.
info
(
SU
.
green
(
"
Finished queuing programme.
"
))
...
...
@@ -454,21 +441,19 @@ class AuraScheduler(threading.Thread):
Queue all items after the one currently playing upon startup.
Don
'
t use this method in any other scenario, as it doesn
'
t respect the scheduling window.
TODO refactor
"""
current_timeslot
=
self
.
timetable
.
get_current_timeslot
()
current_timeslot
:
Timeslot
=
self
.
timetable
.
get_current_timeslot
()
# Queue the (rest of the) currently playing timeslot upon startup
if
current_timeslot
:
playlist_type
,
current_playlist
=
self
.
timetable
.
get_current_playlist
(
current_timeslot
)
current_playlist
=
current_timeslot
.
get_current_playlist
()
if
current_playlist
:
active_item
=
self
.
timetable
.
get_current_item
()
active_item
:
PlaylistItem
=
self
.
timetable
.
get_current_item
()
if
active_item
:
# Queue open items for current playlist
rest_of_playlist
=
active_item
.
get_next
_items
(
True
)
self
.
queue_playlist_items
(
current_timeslot
,
rest_of_playlist
,
False
,
True
)
rest_of_playlist
=
active_item
.
get_
all_
next
(
True
)
self
.
queue_playlist_items
(
current_timeslot
,
rest_of_playlist
)
def
queue_playlist_items
(
self
,
timeslot
:
Timeslot
,
items
:
[
PlaylistItem
]):
"""
...
...
@@ -497,15 +482,14 @@ class AuraScheduler(threading.Thread):
Args:
timeslot (Timeslot): The timeslot this items belong to.
items ([PlaylistItem]): The playlist items to be scheduled for playout.
TODO refactor
"""
item_groups
=
[]
item_groups
.
append
([])
previous_item
=
None
index
=
0
# Group/aggregate all filesystem items, allowing them to be queued at once
item
:
PlaylistItem
previous_item
:
PlaylistItem
=
None
for
item
in
items
:
if
previous_item
is
None
or
(
previous_item
.
get_content_type
()
==
item
.
get_content_type
()
...
...
@@ -517,73 +501,22 @@ class AuraScheduler(threading.Thread):
item_groups
.
append
([])
item_groups
[
index
].
append
(
item
)
previous_item
=
item
self
.
logger
.
info
(
"
Built
%s
item
group
(
s)
"
%
len
(
item
_
groups
))
self
.
logger
.
info
(
f
"
Built
{
len
(
item
_
groups
)
}
item
group
(
s)
"
)
# Timeslot function calls
if
len
(
items
)
>
0
and
len
(
item_groups
)
>
0
:
for
items
in
item_groups
:
if
not
isinstance
(
items
,
list
):
raise
ValueError
(
"
Invalid Item Group:
%s
"
%
str
(
items
))
raise
ValueError
(
f
"
Invalid Item Group:
{
str
(
items
)
}
"
)
# Create command timers for each item group
AuraScheduler
.
PlayCommandClass
(
self
.
engine
,
items
)
else
:
self
.
logger
.
warn
(
SU
.
red
(
"
Nothing to schedule ...
"
))
def
filter_scheduling_window
(
self
,
timeslots
:
[
Timeslot
])
->
[
Timeslot
]:
"""
Filter only timeslots within the scheduling window.
Ignore timeslots which are before the start of scheduling window
(start of timeslot - `scheduling_window_start`) or after the end of the scheduling window
(end of timeslot -`scheduling_window_end`).
Before the scheduling window:
- Timeslots can still be deleted in Steering and the playout will respect this
During the scheduling window:
- Timeslots and it
'
s playlists are queued as timed commands
After the scheduling window:
- Such timeslots are ignored, because it doesn
'
t make sense anymore to schedule them
before the next timeslot starts
Args:
timeslots ([Timeslots]): The timeslots to be filtered.
Returns:
([Timeslots]): The reduced list of timeslots.
TODO refactor
"""
if
not
timeslots
:
return
timeslots
now_unix
=
Engine
.
engine_time
()
len_before
=
len
(
timeslots
)
window_start
=
self
.
config
.
get
(
"
scheduling_window_start
"
)
window_end
=
self
.
config
.
get
(
"
scheduling_window_end
"
)
timeslots
=
list
(
filter
(
lambda
t
:
(
t
.
start_unix
-
window_start
)
<
now_unix
and
now_unix
<
(
t
.
end_unix
-
window_end
),
timeslots
,
)
)
len_after
=
len
(
timeslots
)
self
.
logger
.
info
(
"
For now, skipped %s future timeslot(s) which are out of the scheduling window
"
"
(T¹-%ss to T²-%ss)
"
,
(
len_before
-
len_after
),
window_start
,
window_end
,
)
return
timeslots
self
.
logger
.
warn
(
SU
.
red
(
f
"
Nothing to schedule for timeslot
{
timeslot
}
"
))
def
terminate
(
self
):
"""
Call
this method
when thread is stopped or
a signal to terminate
is received.
Call when thread is stopped or
termination signal
is received.
"""
self
.
logger
.
info
(
SU
.
yellow
(
"
[Scheduler] Shutting down...
"
))
self
.
timetable
.
terminate
()
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment