Skip to content
Snippets Groups Projects
Commit 54791655 authored by David Trattnig's avatar David Trattnig
Browse files

Test: Show-aware insert and merge

parent 35581337
No related branches found
No related tags found
1 merge request!3Liquidsoap 2 migration
......@@ -26,13 +26,42 @@ url = "https://securestream.o94.at/live.mp3"
in_stream = input.http(id="in_stream", start=true, url)
in_stream = insert_metadata(id="in_stream", in_stream)
def on_metadata_notification(metadata) =
print("METADATA: \n#{metadata}")
print("Metadata Notification: \n#{metadata}")
end
# Save the callback
last_metadata = in_stream.last_metadata
imcb = in_stream.insert_metadata
# Check for show-specific metadata
def has_show_meta(meta) =
key_show_id = "show_id"
list.assoc.mem(key_show_id, meta) ? true : false
end
# Check if the current show metadata is same as the prev one
def is_same_show(last_meta, current_meta) =
key_show_id = "show_id"
if has_show_meta(last_meta) then
if not has_show_meta(current_meta) then
# No current show meta: handle as same show
true
else
# A new show has started
false
end
else
# Last show has no show meta
if not has_show_meta(current_meta) then
# And the current one either: handle as same show
true
else
# A new show has started
false
end
end
end
# Ability to insert and merge with previous metadata
def insert_merged_meta(meta) =
lm = (last_metadata() ?? [])
......@@ -53,9 +82,38 @@ def insert_merged_meta(meta) =
imcb(!merged)
end
# Handles either insert or merge & insert of metadata
def process_meta_insert(meta) =
lm = (last_metadata() ?? [])
if is_same_show(lm, meta) then
print("SAME SHOW: merge & insert")
insert_merged_meta(meta)
else
print("NEW SHOW: insert -> #{meta}")
imcb(meta)
end
end
in_stream = source.on_metadata(id="in_stream", in_stream, on_metadata_notification)
output.alsa(id="lineout", device="default", mksafe(in_stream))
# Later:
meta = [("foo", "bar")]
thread.run(every=3., { insert_merged_meta(meta) })
# Test Case 1: Normal insert and merge
# Provide new metadata every 3 seconds
def case1() =
print("Running Test Case 1 ...")
meta = [("foo", "bar")]
process_meta_insert(meta)
end
thread.run(delay=2., every=3., { case1() })
# Test Case 1: Insert metadata with new show
# Provide new metadata after 23 seconds
def case2() =
print("Running Test Case 2 ...")
meta = [("show_id", "333"), ("fooBar", "2000")]
process_meta_insert(meta)
end
thread.run(delay=23., { case2() })
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment