Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
aura-engine
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Lars Kruse
aura-engine
Commits
654ad051
Commit
654ad051
authored
5 years ago
by
David Trattnig
Browse files
Options
Downloads
Patches
Plain Diff
Fixed scheduling logic to allow first playout.
parent
310073dc
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
modules/scheduling/scheduler.py
+266
-171
266 additions, 171 deletions
modules/scheduling/scheduler.py
with
266 additions
and
171 deletions
modules/scheduling/scheduler.py
+
266
−
171
View file @
654ad051
#
#
e
ngine
#
Aura E
ngine
#
# Playout Daemon for autoradio project
#
...
...
@@ -28,10 +28,7 @@ __license__ = "GNU General Public License (GPL) Version 3"
__version_info__
=
(
0
,
0
,
1
)
__author__
=
'
Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
'
"""
Aura Scheduler
Is holding the eventqueue
"""
import
time
import
json
import
datetime
...
...
@@ -69,38 +66,51 @@ def alchemyencoder(obj):
class
AuraScheduler
(
ExceptionLogger
,
threading
.
Thread
):
"""
Aura Scheduler Class
Gets data from pv and importer, stores and fires events
- Gets data from Steering and Tanks
- Stores and fires events for LiquidSoap
Attributes:
config (AuraConfig): Holds the Engine Configuration
logger: The logger
exit_event(threading.Event): Used to exit the thread if requested
liquidsoapcommunicator: Stores the connection to LiquidSoap
last_successful_fetch (datetime): Stores the last time a fetch from Steering/Tank was successful
programme: The current radio programme to be played as defined in the local engine database
active_entry(Show, Track): This is a Tuple consisting of the currently played `Show` and `Track`
message_timer(Array<threading.Timer>): The message queue of tracks to be played
"""
redismessenger
=
None
message_timer
=
[]
job_result
=
{}
# stores the conn to liquidsoap
config
=
None
logger
=
None
exit_event
=
None
liquidsoapcommunicator
=
None
# stores the last time when a fetch from pv/tank gone right
last_successful_fetch
=
None
schedule_entries
=
None
active_entry
=
None
exit_event
=
None
programme
=
None
active_entry
=
None
message_timer
=
[]
#schedule_entries = None
client
=
None
logger
=
None
config
=
None
def
__init__
(
self
,
config
):
"""
Constructor
@type config: ConfigReader
@param config: read engine.ini
Args:
config (AuraConfig): Reads the engine configuration
"""
self
.
config
=
config
# init database ?
self
.
logger
=
logging
.
getLogger
(
"
AuraEngine
"
)
self
.
init_error_messages
()
self
.
init_database
()
self
.
redismessenger
=
RedisMessenger
(
config
)
self
.
logger
=
logging
.
getLogger
(
"
AuraEngine
"
)
# init threading
threading
.
Thread
.
__init__
(
self
)
...
...
@@ -108,88 +118,226 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
# init messenger.. probably not needed anymore
self
.
redismessenger
.
set_channel
(
'
scheduler
'
)
self
.
redismessenger
.
set_section
(
'
execjob
'
)
# load error messages
error_file
=
self
.
config
.
get
(
"
install_dir
"
)
+
"
/errormessages/scheduler_error.js
"
f
=
open
(
error_file
)
self
.
error_data
=
json
.
load
(
f
)
f
.
close
()
#self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
#
c
reate exit event
#
C
reate exit event
self
.
exit_event
=
threading
.
Event
()
#
s
tart load
ing
new programm every hour
#
S
tart
thread to
load new programm
e info
every hour
self
.
start
()
# ------------------------------------------------------------------------------------------ #
def
init_database
(
self
):
if
self
.
config
.
get
(
"
recreate_db
"
)
is
not
None
:
AuraDatabaseModel
.
recreate_db
(
systemexit
=
True
)
# check if tables do exist. if not create them
try
:
Playlist
.
select_all
()
except
sqlalchemy
.
exc
.
ProgrammingError
as
e
:
errcode
=
e
.
orig
.
args
[
0
]
if
errcode
==
1146
:
# error for no such table
x
=
AuraDatabaseModel
()
x
.
recreate_db
()
else
:
raise
# ------------------------------------------------------------------------------------------ #
def
run
(
self
):
"""
Called when thread is started via `start()`. It calls `self.fetch_new_program()`
periodically depending on the `fetching_frequency` define engine configuration.
"""
while
not
self
.
exit_event
.
is_set
():
# set seconds to wait
seconds_to_wait
=
int
(
self
.
config
.
get
(
"
fetching_frequency
"
))
# calc next time
next_time
=
datetime
.
datetime
.
now
()
+
datetime
.
timedelta
(
seconds
=
seconds_to_wait
)
# write to logger
self
.
logger
.
info
(
"
Fetch new programmes every
"
+
str
(
seconds_to_wait
)
+
"
s started. Going to start next time
"
+
str
(
next_time
))
self
.
logger
.
info
(
"
Fetch new programmes every %ss. Next fetching in %ss.
"
%
(
str
(
seconds_to_wait
),
str
(
next_time
)))
# empty database
# self.logger.info("emptying database")
# ScheduleEntry.truncate()
# fetch new programme
self
.
fetch_new_programme
()
# and wait
self
.
exit_event
.
wait
(
seconds_to_wait
)
# ------------------------------------------------------------------------------------------ #
def
stop
(
self
):
self
.
exit_event
.
set
()
# ------------------------------------------------------------------------------------------ #
#
# PUBLIC METHODS
#
def
get_active_entry
(
self
):
now_unix
=
time
.
mktime
(
datetime
.
datetime
.
now
().
timetuple
())
lastentry
=
None
"""
Retrieves the current `Show` and `Track` tuple being played.
Externally called via `LiquidSoapCommunicator`.
# load programme if necessary
if
self
.
programme
is
None
:
self
.
logger
.
debug
(
"
want to get active channel, but have to load programme first
"
)
self
.
load_programme_from_db
()
Returns:
(Show, Entry): The show and track to be played next.
"""
# now_unix = time.mktime(datetime.datetime.now().timetuple())
# lastentry = None
# # Load programme if necessary
# if self.programme is None:
# self.logger.info("Next track requested: Need to load programme from database first.")
# self.load_programme_from_db()
# # Get the entry currently being played
# for show in self.programme:
# for entry in show.playlist:
# # check if lastentry is set and if act entry is in the future
# if lastentry is not None and entry.start_unix > now_unix:
# # return entry if so
# return (show,entry) # actsource = entry.source
# lastentry = entry
# return None, None
# FIXME active_entry logic
if
not
self
.
active_entry
:
self
.
logger
.
warning
(
"
No active entry set! Is currently nothing or a fallback playing?
"
)
return
(
None
,
None
)
else
:
return
self
.
active_entry
# get active source
for
show
in
self
.
programme
:
for
entry
in
show
.
playlist
:
# check if lastentry is set and if act entry is in the future
if
lastentry
is
not
None
and
entry
.
entry_start_unix
>
now_unix
:
# return entry if so
return
(
show
,
entry
)
# actsource = entry.source
lastentry
=
entry
def
get_act_programme_as_string
(
self
):
"""
Fetches the latest programme and returns it as `String`.
Also used by `ServerRedisAdapter`.
Return:
(String): Programme
Raises:
(Exception): In case the programme cannot be converted to String
"""
programme_as_string
=
""
if
self
.
programme
is
None
or
len
(
self
.
programme
)
==
0
:
self
.
fetch_new_program
()
try
:
programme_as_string
=
json
.
dumps
([
p
.
_asdict
()
for
p
in
self
.
programme
],
default
=
alchemyencoder
)
# FIXME Change to more specific exception
except
Exception
as
e
:
self
.
logger
.
error
(
"
Cannot transform programme into JSON String. Reason:
"
+
str
(
e
))
traceback
.
print_exc
()
return
programme_as_string
def
print_message_queue
(
self
):
"""
Prints the current message queue i.e. tracks in the queue to be played.
"""
message_queue
=
""
messages
=
sorted
(
self
.
message_timer
,
key
=
attrgetter
(
'
diff
'
))
if
not
messages
:
self
.
logger
.
warning
(
"
There
'
s nothing in the Message Queue!
"
)
else
:
for
msg
in
messages
:
message_queue
+=
str
(
msg
)
+
"
\n
"
self
.
logger
.
info
(
"
Message Queue:
"
+
message_queue
)
return
None
,
None
# ------------------------------------------------------------------------------------------ #
def
load_programme_from_db
(
self
,
silent
=
False
):
def
set_next_file_for
(
self
,
playlistname
):
self
.
logger
.
critical
(
"
HAVE TO <SET> NEXT FILE FOR:
"
+
playlistname
)
self
.
logger
.
critical
(
str
(
self
.
get_active_entry
()))
if
playlistname
==
"
station
"
:
file
=
"
/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3
"
elif
playlistname
==
"
timeslot
"
:
file
=
"
/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3
"
elif
playlistname
==
"
show
"
:
file
=
"
/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3
"
else
:
file
=
""
self
.
logger
.
critical
(
"
Should set next fallback file for
"
+
playlistname
+
"
, but this playlist is unknown!
"
)
self
.
logger
.
info
(
"
Set next fallback file for
"
+
playlistname
+
"
:
"
+
file
)
self
.
redismessenger
.
set_next_file_for
(
playlistname
,
file
)
return
file
def
get_next_file_for
(
self
,
fallbackname
):
"""
Evaluates the next fallback file to be played for a given fallback-type.
Valid fallback-types are:
* timeslot
* show
* station
Returns:
(String): Absolute path to the file to be played as a fallback.
"""
self
.
logger
.
critical
(
"
HAVE TO <GET> NEXT FILE FOR:
"
+
fallbackname
)
(
show
,
entry
)
=
self
.
get_active_entry
()
self
.
logger
.
critical
(
str
(
show
)
+
"
"
+
str
(
entry
))
if
fallbackname
==
"
timeslot
"
:
file
=
"
/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3
"
elif
fallbackname
==
"
show
"
:
file
=
"
/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3
"
elif
fallbackname
==
"
station
"
:
file
=
"
/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3
"
else
:
file
=
""
self
.
logger
.
critical
(
"
Should set next fallback file for
"
+
fallbackname
+
"
, but this playlist is unknown!
"
)
#set_next_file_thread = SetNextFile(fallbackname, show)
#set_next_file_thread.start()
self
.
logger
.
info
(
"
Got next fallback file for
'"
+
fallbackname
+
"'
:
"
+
file
)
# self.redismessenger.set_next_file_for(playlistname, file)
return
file
#
# PRIVATE METHODS
#
def
fetch_new_programme
(
self
):
"""
Fetch the latest programme from `AuraCalendarService`.
In case no programme is successfully returned, it is tried
to retrieve the programme from Engine
'
s database.
"""
self
.
logger
.
info
(
"
Trying to fetch new program...
"
)
acs
=
AuraCalendarService
(
self
.
config
)
queue
=
acs
.
get_queue
()
acs
.
start
()
# start fetching thread
response
=
queue
.
get
()
# wait for the end
# Reset last successful fetch state
lsf
=
self
.
last_successful_fetch
self
.
last_successful_fetch
=
None
if
response
is
None
:
self
.
logger
.
warning
(
"
Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.
"
)
elif
type
(
response
)
is
list
:
self
.
programme
=
response
if
self
.
programme
is
not
None
and
len
(
self
.
programme
)
>
0
:
self
.
last_successful_fetch
=
datetime
.
datetime
.
now
()
if
len
(
self
.
programme
)
==
0
:
self
.
logger
.
critical
(
"
Programme fetched from Steering/Tank has no entries!
"
)
# return self.get_act_programme_as_string()
elif
response
.
startswith
(
"
fetching_aborted
"
):
# TODO Check why the 16th entry is logged only
self
.
logger
.
warning
(
"
Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason:
"
+
response
[
16
:])
else
:
self
.
logger
.
warning
(
"
Trying to load programme from database, because i got an unknown response from AuraCalendarService:
"
+
response
)
# if somehow the programme could not be fetched => try to load it from database
#if self.last_successful_fetch is None:
self
.
last_successful_fetch
=
lsf
self
.
load_programme_from_db
()
def
load_programme_from_db
(
self
):
"""
Loads the programme from Engine
'
s database and enables
them via `self.enable_entries(..)`. After that, the
current message queue is printed to the console.
"""
self
.
programme
=
Schedule
.
select_act_programme
()
if
self
.
programme
is
None
or
len
(
self
.
programme
)
==
0
:
...
...
@@ -212,8 +360,8 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
planned_entries
.
append
(
p
)
self
.
enable_entries
(
planned_entries
)
self
.
logger
.
warning
(
self
.
print_message_queue
())
self
.
print_message_queue
()
# ------------------------------------------------------------------------------------------ #
def
enable_entries
(
self
,
playlist
):
...
...
@@ -225,11 +373,16 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
# old entry for fading out
old_entry
=
None
for
entry
in
playlist
:
# FIXME Correct timing behaviour
time_marker
=
playlist
[
0
].
start_unix
for
entry
in
playlist
[
0
].
entries
:
track_len
=
(
entry
.
duration
/
1000000
/
60
)
time_marker
+=
track_len
# since we get also programmes from the past, filter these out
if
entry
.
entry_start_unix
>
now_unix
:
if
time_marker
>
now_unix
:
# when do we have to start?
diff
=
entry
.
entry_start_unix
-
now_unix
diff
=
time_marker
-
now_unix
diff
=
diff
/
100
# testing purpose
...
...
@@ -239,6 +392,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
# store the old entry for fading out
old_entry
=
entry
# ------------------------------------------------------------------------------------------ #
def
enable_timer
(
self
,
diff
,
entry
,
old_entry
):
# create the activation threads and run them after <diff> seconds
self
.
logger
.
critical
(
"
ENABLING SWITCHTIMER FOR
"
+
str
(
entry
))
...
...
@@ -263,6 +417,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
new_entry
.
fadeintimer
=
self
.
create_timer
(
diff
,
self
.
liquidsoapcommunicator
.
fade_in
,
[
new_entry
],
fadein
=
True
)
self
.
logger
.
critical
(
"
ENABLING FADEINTIMER FOR
"
+
str
(
new_entry
))
# ------------------------------------------------------------------------------------------ #
def
add_or_update_timer
(
self
,
diff
,
func
,
parameters
):
timer
=
None
...
...
@@ -270,6 +425,8 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
planned_timer
=
self
.
is_something_planned_at_time
(
entry
.
schedule_start
)
# if something is planned on entry.entry_start
#FIXME
#if 1==0:
if
planned_timer
:
planned_entry
=
planned_timer
.
entry
...
...
@@ -324,110 +481,48 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
return
t
return
False
# ------------------------------------------------------------------------------------------ #
def
get_act_programme_as_string
(
self
):
programme_as_string
=
""
if
self
.
programme
is
None
or
len
(
self
.
programme
)
==
0
:
self
.
fetch_new_programme
()
try
:
programme_as_string
=
json
.
dumps
([
p
.
_asdict
()
for
p
in
self
.
programme
],
default
=
alchemyencoder
)
except
Exception
as
e
:
self
.
logger
.
error
(
"
Cannot transform programme into JSON String. Reason:
"
+
str
(
e
))
traceback
.
print_exc
()
return
programme_as_string
# ------------------------------------------------------------------------------------------ #
def
print_message_queue
(
self
):
message_queue
=
""
for
t
in
sorted
(
self
.
message_timer
,
key
=
attrgetter
(
'
diff
'
)):
message_queue
+=
str
(
t
)
+
"
\n
"
return
message_queue
# ------------------------------------------------------------------------------------------ #
def
fetch_new_programme
(
self
):
self
.
logger
.
info
(
"
trying to fetch new programme
"
)
acs
=
AuraCalendarService
(
self
.
config
)
queue
=
acs
.
get_queue
()
# start fetching thread
acs
.
start
()
# wait for the end
response
=
queue
.
get
()
# reset
lsf
=
self
.
last_successful_fetch
self
.
last_successful_fetch
=
None
if
response
is
None
:
self
.
logger
.
warning
(
"
Trying to load programme from database, because i got an EMPTY (None) response from AuraCalendarService.
"
)
elif
type
(
response
)
is
list
:
self
.
programme
=
response
def
init_error_messages
(
self
):
"""
Load error messages
"""
error_file
=
self
.
config
.
get
(
"
install_dir
"
)
+
"
/errormessages/scheduler_error.js
"
f
=
open
(
error_file
)
self
.
error_data
=
json
.
load
(
f
)
f
.
close
()
if
self
.
programme
is
not
None
and
len
(
self
.
programme
)
>
0
:
self
.
last_successful_fetch
=
datetime
.
datetime
.
now
()
if
len
(
self
.
programme
)
==
0
:
self
.
logger
.
critical
(
"
Programme fetched from pv/tank has no entries!
"
)
# return self.get_act_programme_as_string()
elif
response
.
startswith
(
"
fetching_aborted
"
):
self
.
logger
.
warning
(
"
Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason:
"
+
response
[
16
:])
else
:
self
.
logger
.
warning
(
"
Trying to load programme from database, because i got an unknown response from AuraCalendarService:
"
+
response
)
# if somehow the programme could not be fetched => try to load it from database
if
self
.
last_successful_fetch
is
None
:
self
.
last_successful_fetch
=
lsf
self
.
load_programme_from_db
()
def
init_database
(
self
):
"""
Initializes the database.
# ------------------------------------------------------------------------------------------ #
def
set_next_file_for
(
self
,
playlistname
):
self
.
logger
.
critical
(
"
HAVE TO SET NEXT FILE FOR:
"
+
playlistname
)
self
.
logger
.
critical
(
str
(
self
.
get_active_entry
()))
Raises:
sqlalchemy.exc.ProgrammingError: In case the DB model is invalid
"""
if
self
.
config
.
get
(
"
recreate_db
"
)
is
not
None
:
AuraDatabaseModel
.
recreate_db
(
systemexit
=
True
)
if
playlistname
==
"
station
"
:
file
=
"
/var/audio/fallback/eins.zwo.bombe.flac
"
elif
playlistname
==
"
timeslot
"
:
file
=
"
/var/audio/fallback/ratm.killing.flac
"
elif
playlistname
==
"
show
"
:
file
=
"
/var/audio/fallback/weezer.hash.pipe.flac
"
else
:
file
=
""
self
.
logger
.
critical
(
"
Should set next fallback file for
"
+
playlistname
+
"
, but this playlist is unknown!
"
)
# Check if tables exists, if not create them
try
:
Playlist
.
select_all
()
except
sqlalchemy
.
exc
.
ProgrammingError
as
e
:
errcode
=
e
.
orig
.
args
[
0
]
self
.
logger
.
info
(
"
Set next fallback file for
"
+
playlistname
+
"
:
"
+
file
)
self
.
redismessenger
.
set_next_file_for
(
playlistname
,
file
)
return
file
if
errcode
==
1146
:
# Error for no such table
x
=
AuraDatabaseModel
()
x
.
recreate_db
()
else
:
raise
# ------------------------------------------------------------------------------------------ #
def
get_next_file_for
(
self
,
fallbackname
):
self
.
logger
.
critical
(
"
HAVE TO SET NEXT FILE FOR:
"
+
fallbackname
)
(
show
,
entry
)
=
self
.
get_active_entry
()
self
.
logger
.
critical
(
str
(
show
)
+
"
"
+
str
(
entry
))
if
fallbackname
==
"
station
"
:
file
=
"
/home/david/Code/aura/engine/testing/content/1.flac
"
elif
fallbackname
==
"
timeslot
"
:
file
=
"
/home/david/Code/aura/engine/testing/content/2.flac
"
elif
fallbackname
==
"
show
"
:
file
=
"
/home/david/Code/aura/engine/testing/content/3.flac
"
else
:
file
=
""
self
.
logger
.
critical
(
"
Should set next fallback file for
"
+
fallbackname
+
"
, but this playlist is unknown!
"
)
#set_next_file_thread = SetNextFile(fallbackname, show)
#set_next_file_thread.start()
def
stop
(
self
):
"""
Called when thread is stopped.
"""
self
.
exit_event
.
set
()
# self.logger.info("Set next fallback file for " + playlistname + ": " + file)
# self.redismessenger.set_next_file_for(playlistname, file)
return
file
# ------------------------------------------------------------------------------------------ #
class
SetNextFile
(
threading
.
Thread
):
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment