Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Show changes
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.plugins.monitor import AuraMonitor
#
# Mock
#
class MockedMixer:
"""
Mocked version of mixer.
"""
def get_inputs(self) -> dict:
state = {
"ready": True,
"selected": True,
"single": True,
"volume": 0,
"remaining": 0.0,
}
return {
"in_queue_0": state,
"in_queue_1": state,
"in_stream_0": state,
"in_stream_1": state,
"aura_engine_line_in_0": state,
"aura_engine_line_in_1": state,
}
def get_outputs(self) -> list:
return ["aura_engine_line_out_0", "out_http_0"]
class MockedPlayer:
"""
Mocked version of engine.
"""
mixer = MockedMixer()
class MockedEventDispather:
"""
Mocked version of event dispatcher.
"""
def on_sick(self, data):
pass
class MockedEngine:
"""
Mocked version of engine.
"""
player = MockedPlayer()
event_dispatcher = MockedEventDispather()
def init_version(self):
versions = {"control": 1, "core": 1, "liquidsoap": 1}
AuraConfig.instance.init_version(versions)
def update_playout_state(self):
return True
class TestPluginsMonitor(unittest.TestCase):
"""
Testing the monitor plugin.
"""
monitor: AuraMonitor
config: AuraConfig
# Setup and teardown
def setUp(self):
self.config = AuraConfig.instance.config
self.monitor = AuraMonitor(MockedEngine())
def tearDown(self):
pass
#
# Test Cases
#
def test_has_valid_status_vitality_only(self):
print(self._testMethodName)
# will fail due to missing info
status = self.monitor.has_valid_status(update_vitality_only=True)
self.assertFalse(status)
def test_has_valid_status(self):
print(self._testMethodName)
# mock valid status by swizzling some methonds
def validate_url_connection(url: str) -> bool:
return True
def validate_directory(dir_path: str) -> dict:
return {"path": dir_path, "exists": True, "has_content": True}
def get_url_response(url: str) -> dict:
return {"auth": "string", "importer": "string", "store": "string"}
def get_ip() -> str:
return "0.0.0.0"
self.monitor.validate_url_connection = validate_url_connection
self.monitor.validate_directory = validate_directory
self.monitor.get_url_response = get_url_response
self.monitor.get_ip = get_ip
status = self.monitor.has_valid_status(update_vitality_only=False)
self.assertTrue(status)
def test_validate_url_connection(self):
print(self._testMethodName)
self.assertTrue(self.monitor.validate_url_connection("https://debian.org/"))
self.assertFalse(self.monitor.validate_url_connection("https://debian.123/"))
def test_validate_directory(self):
print(self._testMethodName)
path = "./tests"
status = self.monitor.validate_directory(path)
self.assertEqual(status["path"], path)
self.assertEqual(status["exists"], True)
self.assertEqual(status["has_content"], True)
path = "./tests_"
status = self.monitor.validate_directory(path)
self.assertEqual(status["path"], path)
self.assertEqual(status["exists"], False)
self.assertEqual(status["has_content"], False)
def test_post_health(self):
print(self._testMethodName)
self.config.monitoring.heartbeat.frequency = 0 # disable timer
self.monitor.post_health(data=None, is_healthy=True)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import unittest
from datetime import datetime, timedelta
from unittest import mock
from aura_engine.base.api import SimpleCachedRestApi
from aura_engine.base.config import AuraConfig
from aura_engine.scheduling.api import ApiFetcher, ApiResult
from aura_engine.scheduling.domain import PlaylistType, Timeslot
class TestSchedulingApiFetcher(unittest.TestCase):
"""
Testing the API fetcher.
"""
config = None
api_fetcher = None
mocked_steering_json = None
mocked_tank_json = None
#
# Mock
#
# Mock `requests.get` in `SimpleRestApi`
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
print(f"Calling mocked 'requests.get' with '{args[0]}'")
if "/api/v1/program/playout" in args[0]:
return MockResponse(TestSchedulingApiFetcher.mocked_steering_json, 200)
elif "/api/v1/playlists/1" in args[0]:
return MockResponse(TestSchedulingApiFetcher.mocked_tank_json1, 200)
elif "/api/v1/playlists/2" in args[0]:
return MockResponse(TestSchedulingApiFetcher.mocked_tank_json2, 200)
elif "/api/v1/playlists/3" in args[0]:
return MockResponse(TestSchedulingApiFetcher.mocked_tank_json3, 200)
return MockResponse(None, 404)
# Mock `requests.get` in `SimpleRestApi`
def mocked_requests_no_data(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
print(f"Calling mocked 'requests.get' with '{args[0]}'")
if "/api/v1/playout" in args[0]:
return MockResponse(None, 200)
elif "/api/v1/playlists/1" in args[0]:
return MockResponse(None, 200)
return MockResponse(None, 404)
# Mock `requests.get` in `SimpleRestApi`
def mocked_requests_get_exception(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
invalid_timeslots = [{"id": 18, "start": "2023-05-16T13:00:00"}]
return invalid_timeslots
print(f"Calling mocked 'requests.get' with '{args[0]}'")
return MockResponse(None, 200)
#
# Setup and teardown
#
def setUp(self):
self.config = AuraConfig.instance
self.api_fetcher = ApiFetcher()
self.api_fetcher.m3u_processor.playlist_folder = "./tests/m3u/"
with open("./tests/json/steering-api-v1-program-playout.json", "r") as file:
TestSchedulingApiFetcher.mocked_steering_json = json.load(file)
with open("./tests/json/tank-api-v1-playlists-1.json", "r") as file:
TestSchedulingApiFetcher.mocked_tank_json1 = json.load(file)
with open("./tests/json/tank-api-v1-playlists-2.json", "r") as file:
TestSchedulingApiFetcher.mocked_tank_json2 = json.load(file)
with open("./tests/json/tank-api-v1-playlists-3.json", "r") as file:
TestSchedulingApiFetcher.mocked_tank_json3 = json.load(file)
# Update dates that they are in the future
# Format e.g. "start":"2023-05-16T10:06:00"
now = datetime.now()
hour: int = -1
for timeslot in TestSchedulingApiFetcher.mocked_steering_json:
start = (now + timedelta(hours=hour)).strftime("%Y-%m-%dT%H:%M:%S")
end = (now + timedelta(hours=hour + 1)).strftime("%Y-%m-%dT%H:%M:%S")
timeslot["start"] = start
timeslot["end"] = end
hour += 1
def tearDown(self):
# Clean up cache when finished
self.api_fetcher.api.prune_cache_dir()
#
# Test Cases
#
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_fetch_data(self, mock_get):
print(self._testMethodName)
# Clear cache to keep test cases isolated
self.api_fetcher.api.prune_cache_dir()
# Start fetching thread
self.api_fetcher.start()
response: ApiResult = self.api_fetcher.fetch()
print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
print("[test_fetch_data] response message: " + str(response.message))
if response.exception:
print("[test_fetch_data] response exception: \n" + str(response.exception))
# Test Timetable
self.assertEqual(ApiResult, type(response))
self.assertEqual(0, response.code)
# FIXME:
# self.assertEqual(5, len(response.timeslots))
# Test Timeslot 1
ts1: Timeslot = response.timeslots[0]
self.assertEqual(1, ts1.show.id)
self.assertEqual("Show 1", ts1.show.name)
self.assertEqual("Episode 1", ts1.episode.title)
self.assertEqual(1, ts1.playlists.timeslot.get_id())
self.assertEqual("test", ts1.playlists.timeslot.get_description())
self.assertEqual(False, ts1.playlists.timeslot.do_shuffle())
self.assertEqual(False, ts1.is_virtual())
# Test Timeslot 1 - timeslot playlist
pl = ts1.playlists.timeslot
self.assertEqual(PlaylistType.TIMESLOT, pl.type)
pl_items = pl.get_items()
self.assertEqual(1, pl.get_id())
self.assertEqual(1, len(pl_items))
self.assertEqual(False, pl.do_shuffle())
self.assertEqual("test", pl.get_description())
self.assertEqual(199.04, pl.get_duration())
self.assertEqual(None, pl_items[0].prev)
self.assertEqual(None, pl_items[0].next)
self.assertEqual(100, pl_items[0].volume)
self.assertEqual("file://musikprogramm/2", pl_items[0].source)
self.assertEqual(199.04, pl_items[0].duration)
self.assertEqual("Test Artist", pl_items[0].metadata.artist)
self.assertEqual("Test Album", pl_items[0].metadata.album)
self.assertEqual("Test Track Title", pl_items[0].metadata.title)
# Test Timeslot 1 - schedule playlist
pl = ts1.playlists.schedule
self.assertEqual(PlaylistType.SCHEDULE, pl.type)
pl_items = pl.get_items()
self.assertEqual(1, pl.get_id())
self.assertEqual(1, len(pl_items))
self.assertEqual(False, pl.do_shuffle())
self.assertEqual("test", pl.get_description())
self.assertEqual(199.04, pl.get_duration())
self.assertEqual(None, pl_items[0].prev)
self.assertEqual(None, pl_items[0].next)
self.assertEqual(100, pl_items[0].volume)
self.assertEqual("file://musikprogramm/2", pl_items[0].source)
self.assertEqual(199.04, pl_items[0].duration)
self.assertEqual("Test Artist", pl_items[0].metadata.artist)
self.assertEqual("Test Album", pl_items[0].metadata.album)
self.assertEqual("Test Track Title", pl_items[0].metadata.title)
# Test Timeslot 1 - show playlist
pl = ts1.playlists.show
self.assertEqual(PlaylistType.SHOW, pl.type)
pl_items = pl.get_items()
self.assertEqual(1, pl.get_id())
self.assertEqual(1, len(pl_items))
self.assertEqual(False, pl.do_shuffle())
self.assertEqual("test", pl.get_description())
self.assertEqual(199.04, pl.get_duration())
self.assertEqual(None, pl_items[0].prev)
self.assertEqual(None, pl_items[0].next)
self.assertEqual(100, pl_items[0].volume)
self.assertEqual("file://musikprogramm/2", pl_items[0].source)
self.assertEqual(199.04, pl_items[0].duration)
self.assertEqual("Test Artist", pl_items[0].metadata.artist)
self.assertEqual("Test Album", pl_items[0].metadata.album)
self.assertEqual("Test Track Title", pl_items[0].metadata.title)
# Test Timeslot 2
ts2: Timeslot = response.timeslots[1]
self.assertEqual(2, ts2.show.id)
self.assertEqual("Show 2", ts2.show.name)
self.assertEqual("Episode 6", ts2.episode.title)
self.assertEqual(111, ts2.playlists.timeslot.get_id())
self.assertEqual("playlist 111", ts2.playlists.timeslot.get_description())
self.assertEqual(True, ts2.playlists.timeslot.do_shuffle())
self.assertEqual(1, ts2.playlists.schedule.id)
self.assertEqual("test", ts2.playlists.schedule.get_description())
self.assertEqual(False, ts2.playlists.schedule.do_shuffle())
self.assertEqual(1, ts2.playlists.show.id)
self.assertEqual("test", ts2.playlists.show.get_description())
self.assertEqual(False, ts2.playlists.show.do_shuffle())
self.assertEqual(False, ts2.is_virtual())
# Test Timeslot 2 - timeslot playlist
pl = ts2.playlists.timeslot
self.assertEqual(PlaylistType.TIMESLOT, pl.type)
pl_items = pl.get_items()
self.assertEqual(111, pl.get_id())
self.assertEqual(True, pl.do_shuffle())
self.assertEqual("playlist 111", pl.get_description())
self.assertEqual(1386, pl.get_duration())
# Playlist Item 1
self.assertEqual(100, pl_items[0].volume)
self.assertEqual("file://musikprogramm/303", pl_items[0].source)
self.assertEqual(199, pl_items[0].duration)
# Faulty metadata is set to an empty string
self.assertEqual("", pl_items[0].metadata.artist)
self.assertEqual("", pl_items[0].metadata.album)
self.assertEqual("", pl_items[0].metadata.title)
# Playlist Item 2
self.assertEqual(100, pl_items[1].volume)
self.assertEqual("file://musikprogramm/808", pl_items[1].source)
self.assertEqual(299, pl_items[1].duration)
self.assertEqual("Peaches", pl_items[1].metadata.artist)
self.assertEqual("The Teaches of Peaches", pl_items[1].metadata.album)
self.assertEqual("Rock Show", pl_items[1].metadata.title)
# Playlist Item 3-6 (spreaded M3U playlist containing 4 items)
self.assertEqual(6, len(pl_items))
self.assertEqual("file:///media/music-lib/Unser Album/Unser Lied.flac", pl_items[2].source)
self.assertEqual("Alle", pl_items[2].metadata.artist)
self.assertEqual("Unser Lied", pl_items[2].metadata.title)
self.assertEqual("file://music-lib/Unser Album/Dein Lied.ogg", pl_items[3].source)
self.assertEqual("Alle", pl_items[3].metadata.artist)
self.assertEqual("Dein Lied", pl_items[3].metadata.title)
self.assertEqual("file://../music-lib/Alle/Unser Album/Liedlos.m4a", pl_items[4].source)
self.assertEqual("Alle", pl_items[4].metadata.artist)
self.assertEqual("Liedlos", pl_items[4].metadata.title)
self.assertEqual(
"file://http://www.example.org/music/Alle-Unser_Album-Euer_Lied.mp3",
pl_items[5].source,
)
self.assertEqual("Alle", pl_items[5].metadata.artist)
self.assertEqual("Euer Lied", pl_items[5].metadata.title)
# Test linked list
self.assertIsNone(pl_items[0].prev)
self.assertIsNotNone(pl_items[0].next)
self.assertEqual(pl_items[1], pl.items[0].next)
self.assertEqual(pl_items[0], pl.items[1].prev)
self.assertIsNotNone(pl_items[1].next)
self.assertIsNotNone(pl_items[2].next)
self.assertIsNotNone(pl_items[3].next)
self.assertIsNotNone(pl_items[4].next)
self.assertIsNone(pl_items[5].next)
# FIXME: reimplement tests once virtual timeslots are enabled again
# @mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
# def test_fetch_virtual_timeslot(self, mock_get):
# print(self._testMethodName)
# # Clear cache to keep test cases isolated
# self.api_fetcher.api.prune_cache_dir()
# # Start fetching thread
# self.api_fetcher.start()
# response: ApiResult = self.api_fetcher.fetch()
# print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
# print("[test_fetch_data] response message: " + str(response.message))
# if response.exception:
# print("[test_fetch_data] response exception: \n" + str(response.exception))
# # Test Timetable
# self.assertEqual(ApiResult, type(response))
# self.assertEqual(0, response.code)
# self.assertEqual(5, len(response.timeslots))
# # Test Timeslot 4
# ts: Timeslot = response.timeslots[3]
# self.assertEqual(4, ts.show.id)
# self.assertEqual("Show 4 - Fallback show with virtual timeslot", ts.show.name)
# self.assertIsNone(ts.episode)
# self.assertEqual(True, ts.is_virtual())
# Test Timeslot 1 - there are no playlists assigned
# self.assertIsNone(ts.playlists.timeslot)
# self.assertIsNone(ts.playlists.schedule)
# self.assertIsNone(ts.playlists.show)
# FIXME: reimplement test once virtual timeslots are enabled again
# @mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
# def test_fetch_with_expanded_duration(self, mock_get):
# print(self._testMethodName)
# # Clear cache to keep test cases isolated
# self.api_fetcher.api.prune_cache_dir()
# # Start fetching thread
# self.api_fetcher.start()
# response: ApiResult = self.api_fetcher.fetch()
# print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
# print("[test_fetch_data] response message: " + str(response.message))
# if response.exception:
# print("[test_fetch_data] response exception: \n" + str(response.exception))
# # Test Timetable
# self.assertEqual(ApiResult, type(response))
# self.assertEqual(0, response.code)
# self.assertEqual(5, len(response.timeslots))
# # Test Timeslot 3
# ts1: Timeslot = response.timeslots[2]
# self.assertEqual(3, ts1.show.id)
# self.assertEqual("Show 3", ts1.show.name)
# self.assertEqual("Episode 39", ts1.episode.title)
# self.assertEqual(False, ts1.playlists.timeslot.do_shuffle())
# # Test Timeslot 3 - timeslot playlist
# pl = ts1.playlists.timeslot
# self.assertEqual(PlaylistType.TIMESLOT, pl.type)
# pl_items = pl.get_items()
# self.assertEqual(1, pl.get_id())
# self.assertEqual(2, len(pl_items))
# self.assertEqual(False, pl.do_shuffle())
# self.assertEqual("test", pl.get_description())
# # Test playlist item 1 with existing duration
# self.assertIsNotNone(pl.get_items()[0])
# self.assertEqual(180.0, pl.get_items()[0].duration) # 3mins
# # Test for expanded duration of playlist item 2
# self.assertIsNotNone(pl.get_items()[1])
# self.assertEqual(3420.0, pl.get_items()[1].duration) # 57min
# # Timeslot and playlist have the same length
# self.assertEqual(3600, pl.get_timeslot().get_duration()) # 1hour
# self.assertEqual(pl.get_timeslot().get_duration(), pl.get_duration())
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_no_data)
def test_fetch_no_data(self, mock_get):
print(self._testMethodName)
# Clear cache to keep test cases isolated
self.api_fetcher.api.prune_cache_dir()
# Fetch data
self.api_fetcher.start()
response: ApiResult = self.api_fetcher.fetch()
print("[test_fetch_data] response message: " + str(response.message))
print(str(response.exception))
# Test Timetable
self.assertEqual(ApiResult, type(response))
self.assertEqual(0, response.code)
self.assertEqual(0, len(response.timeslots))
self.assertEqual("Nothing fetched", response.message)
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get_exception)
def test_fetch_exception(self, mock_get):
print(self._testMethodName)
# Clear cache to keep test cases isolated
self.api_fetcher.api.prune_cache_dir()
# Fetch data
self.api_fetcher.start()
response: ApiResult = self.api_fetcher.fetch()
print("[test_fetch_data] response message: " + str(response.message))
# Test Timetable
self.assertEqual(ApiResult, type(response))
self.assertEqual(1, response.code)
self.assertEqual(0, len(response.timeslots))
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://code.aura.radio/)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import unittest
from datetime import datetime
from aura_engine.base.config import AuraConfig
from aura_steering_api.models.playout_program_entry import PlayoutProgramEntry
from aura_steering_api.types import UNSET
class TestApiSteering(unittest.TestCase):
"""
Testing the Configuration.
"""
config: AuraConfig
mocked_steering_json = None
def setUp(self):
self.config = AuraConfig.instance.config
with open("./tests/json/steering-api-v1-program-playout.json", "r") as file:
self.mocked_steering_json = json.load(file)
def test_api_steering_timeslot_model(self):
print(self._testMethodName)
print("Steering JSON: " + str(self.mocked_steering_json))
timetable = self.mocked_steering_json
self.assertIsNotNone(timetable)
self.assertEqual(5, len(timetable))
t1 = PlayoutProgramEntry.from_dict(timetable[0])
start = datetime.strptime("2024-07-25T18:00:10", "%Y-%m-%dT%H:%M:%S")
end = datetime.strptime("2024-07-25T19:00:00", "%Y-%m-%dT%H:%M:%S")
# self.assertEqual(f"{start}...{end}", t1.id)
self.assertEqual(start, t1.start)
self.assertEqual(end, t1.end)
self.assertEqual("Show 1", t1.show.name)
self.assertEqual(9, t1.schedule.id)
self.assertEqual(18, t1.timeslot.repetition_of_id)
self.assertEqual(1, t1.playlist_id)
self.assertEqual(1, t1.timeslot.playlist_id)
# self.assertEqual(None, t1.schedule.default_playlist_id)
self.assertEqual(1, t1.show.default_playlist_id)
self.assertEqual(1, t1.show.id)
def test_api_steering_current_timeslot_model(self):
"""
Test deserializing current steering timeslot model.
"""
with open("./tests/json/steering-api-v1-program-playout.json", "r") as file:
playout_json = json.load(file)
for obj in playout_json:
_t = PlayoutProgramEntry.from_dict(obj)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://code.aura.radio/)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import unittest
import validators
from aura_engine.base.config import AuraConfig
from aura_tank_api.models.playlist import Playlist
from aura_tank_api.types import UNSET
class TestApiTank(unittest.TestCase):
"""
Testing the Configuration.
"""
config: AuraConfig
mocked_tank_json = None
def setUp(self):
self.config = AuraConfig.instance.config
with open("./tests/json/tank-api-v1-playlists-1.json", "r") as file:
self.mocked_tank_json = json.load(file)
def test_api_tank_model(self):
print(self._testMethodName)
print("Tank JSON: " + str(self.mocked_tank_json))
playlist = Playlist()
self.assertEqual(UNSET, playlist.id)
playlist = playlist.from_dict(self.mocked_tank_json)
self.assertIsNotNone(playlist.to_dict())
self.assertEqual(1, playlist.id)
self.assertEqual("musikprogramm", playlist.show_name)
self.assertEqual("linear", playlist.playout_mode)
self.assertEqual("test", playlist.description)
self.assertEqual("2023-02-28T15:25:38.684803+01:00", playlist.created)
self.assertEqual("2023-02-28T15:25:38.684803+01:00", playlist.updated)
entries = playlist.entries
self.assertIsNotNone(1, playlist.entries)
self.assertEqual(1, len(entries))
self.assertEqual("file://musikprogramm/2", entries[0].uri)
self.assertEqual(199.04, entries[0].duration)
file = playlist.entries[0].file
self.assertEqual(2, file.id)
self.assertEqual(
"sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
file.source.hash_,
)
self.assertEqual("musikprogramm", file.show_name)
self.assertEqual("upload://some-audio-file.flac", file.source.uri)
self.assertEqual("Test Artist", file.metadata.artist)
self.assertEqual("Test Track Title", file.metadata.title)
self.assertEqual("Test Album", file.metadata.album)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.logger import AuraLogger
from aura_engine.core.channels import GenericChannel
from aura_engine.resources import ResourceType
from aura_engine.scheduling.domain import (
Episode,
Playlist,
PlaylistItem,
PlaylistType,
Show,
Timeslot,
)
class TestDomain(unittest.TestCase):
"""
Testing the Domain Models.
"""
aura_logger = None
def setUp(self):
self.config = AuraConfig.instance.config
self.aura_logger = AuraLogger(self.config)
def test_timeslot_init(self):
print(self._testMethodName)
show = Show(id=555, name="3angulation")
episode = Episode(id=3, title="SuperSonicShow 23/1", memo="")
ts = Timeslot(
id=333, repetition_id=888, start=1675463003, end=1675466003, show=show, episode=episode
)
ts.set_playlists(Playlist(1, "some description"), Playlist(2, ""), Playlist(3, ""))
def test_timeslot_to_string(self):
print(self._testMethodName)
show = Show(id=555, name="3angulation")
ts = Timeslot(id=333, repetition_id=888, start=1, end=2, show=show, episode=None)
self.assertEqual("ID#333: 01:00:01 - 01:00:02 [Show#555: 3angulation]", str(ts))
def test_playlist_init(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
pl.id = 333
pl.timeslot = Timeslot(1, None, 123, 456, None, None)
pl.type = PlaylistType.TIMESLOT
def test_assign_playlists_to_timeslot(self):
print(self._testMethodName)
ts = Timeslot(
id=333, repetition_id=888, start=1675463003, end=1675466009, show=None, episode=None
)
pl_timeslot = Playlist(1, "some description")
pl_schedule = Playlist(2, "some description")
pl_show = Playlist(3, "some description")
ts.set_playlists(pl_timeslot, None, None)
ts.set_playlists(None, pl_schedule, None)
ts.set_playlists(None, None, pl_show)
ts.set_playlists(pl_timeslot, pl_schedule, pl_show)
self.assertIsNotNone(ts.playlists.timeslot)
self.assertIsNotNone(ts.playlists.schedule)
self.assertIsNotNone(ts.playlists.show)
self.assertEqual(1675463003, ts.get_start())
self.assertEqual(1675466009, ts.get_end())
self.assertEqual(1675466009 - 1675463003, ts.get_duration())
self.assertEqual(PlaylistType.TIMESLOT, ts.playlists.timeslot.get_type())
self.assertEqual(PlaylistType.SCHEDULE, ts.playlists.schedule.get_type())
self.assertEqual(PlaylistType.SHOW, ts.playlists.show.get_type())
def test_get_current_playlist_from_timeslot(self):
print(self._testMethodName)
ts = Timeslot(
id=333, repetition_id=888, start=1675463003, end=1675466009, show=None, episode=None
)
pl_timeslot = Playlist(1, "playlist A")
pl_schedule = Playlist(2, "playlist B")
pl_show = Playlist(3, "playlist C")
ts.set_playlists(pl_timeslot, pl_schedule, pl_show)
self.assertIsNotNone(ts.playlists.timeslot)
self.assertIsNotNone(ts.playlists.schedule)
self.assertIsNotNone(ts.playlists.show)
self.assertEqual(pl_timeslot, ts.get_current_playlist())
ts.set_playlists(pl_timeslot, None, None)
self.assertIsNotNone(ts.playlists.timeslot)
self.assertIsNone(ts.playlists.schedule)
self.assertIsNone(ts.playlists.show)
self.assertEqual(pl_timeslot, ts.get_current_playlist())
ts.set_playlists(None, pl_schedule, pl_show)
self.assertIsNone(ts.playlists.timeslot)
self.assertIsNotNone(ts.playlists.schedule)
self.assertIsNotNone(ts.playlists.show)
self.assertEqual(pl_schedule, ts.get_current_playlist())
ts.set_playlists(None, None, pl_show)
self.assertIsNone(ts.playlists.timeslot)
self.assertIsNone(ts.playlists.schedule)
self.assertIsNotNone(ts.playlists.show)
self.assertEqual(pl_show, ts.get_current_playlist())
def test_get_current_playlist_from_virtual_timeslot(self):
print(self._testMethodName)
ts = Timeslot(
id=333,
virtual=True,
repetition_id=888,
start=1675463003,
end=1675466009,
show=None,
episode=None,
)
# Virtual timeslots do use playlists
self.assertTrue(ts.is_virtual())
self.assertIsNone(ts.get_current_playlist())
pl_timeslot = Playlist(1, "playlist A")
pl_schedule = Playlist(2, "playlist B")
pl_show = Playlist(3, "playlist C")
ts.set_playlists(pl_timeslot, pl_schedule, pl_show)
self.assertIsNotNone(ts.playlists.timeslot)
self.assertIsNotNone(ts.playlists.schedule)
self.assertIsNotNone(ts.playlists.show)
# Even though there are playlists assigned, they are not returned
self.assertIsNone(ts.get_current_playlist())
def test_playlist_duration_zero(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
self.assertEqual(0, pl.get_duration())
def test_playlist_add_items(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
pl.add(e1)
pl.add(e2)
pl.add(e3)
self.assertEqual(3, len(pl.get_items()))
def test_playlist_remove_item(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
pl.add(e1)
pl.add(e2)
pl.add(e3)
self.assertEqual(3, len(pl.get_items()))
e2.remove()
self.assertEqual(2, len(pl.get_items()))
self.assertEqual(e1, e3.prev)
self.assertEqual(e3, e1.next)
self.assertIsNone(e2.playlist)
e3.remove()
self.assertEqual(1, len(pl.get_items()))
self.assertIsNone(e1.next)
self.assertIsNone(e1.prev)
self.assertIsNone(e3.next)
self.assertIsNone(e3.prev)
def test_playlist_duration(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
pl.add(e1)
pl.add(e2)
pl.add(e3)
self.assertEqual(10, pl.get_duration())
def test_playlist_item_positions(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
pl.add(e1)
pl.add(e2)
pl.add(e3)
self.assertEqual(1, e1.get_position())
self.assertEqual(2, e2.get_position())
self.assertEqual(3, e3.get_position())
def test_playlist_all_prev_and_next(self):
print(self._testMethodName)
ts = Timeslot(id=333, repetition_id=888, start=100, end=120, show=None, episode=None)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e4 = PlaylistItem("file4.flac", 7, 100, PlaylistItem.Metadata("a4", "b4", "c4"))
e5 = PlaylistItem("file5.flac", 9, 100, PlaylistItem.Metadata("a5", "b5", "c5"))
e6 = PlaylistItem("file6.flac", 11, 100, PlaylistItem.Metadata("a6", "b6", "c6"))
ts.set_playlists(pl, None, None)
pl.add(e1)
pl.add(e2)
pl.add(e3)
pl.add(e4)
pl.add(e5)
pl.add(e6)
prev = e3.get_all_prev()
self.assertEqual(2, len(prev))
next = e3.get_all_next(True)
self.assertEqual(2, len(next))
next = e3.get_all_next(False)
self.assertEqual(3, len(next))
self.assertEqual(e4.source, next[0].source)
self.assertEqual(e5.source, next[1].source)
self.assertEqual(e6.source, next[2].source)
def test_playlist_item_start_end(self):
print(self._testMethodName)
ts = Timeslot(id=333, repetition_id=888, start=100, end=200, show=None, episode=None)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
ts.set_playlists(pl, None, None)
pl.add(e1)
pl.add(e2)
pl.add(e3)
self.assertEqual(100, e1.get_start())
self.assertEqual(102, e1.get_end())
self.assertEqual(102, e2.get_start())
self.assertEqual(105, e2.get_end())
self.assertEqual(105, e3.get_start())
self.assertEqual(110, e3.get_end())
def test_playlist_item_content_type(self):
print(self._testMethodName)
e1 = PlaylistItem("file://file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem(
"https://stream.your.radio", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2")
)
e3 = PlaylistItem("line://", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e4 = PlaylistItem("m3u://", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e5 = PlaylistItem("pool://", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e6 = PlaylistItem("unknown", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e7 = PlaylistItem(None, 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
self.assertEqual(ResourceType.FILE, e1.get_content_type())
self.assertEqual(ResourceType.STREAM_HTTP, e2.get_content_type())
self.assertEqual(ResourceType.LINE, e3.get_content_type())
self.assertEqual(ResourceType.M3U, e4.get_content_type())
self.assertEqual(ResourceType.POOL, e5.get_content_type())
self.assertEqual(None, e6.get_content_type())
self.assertEqual(None, e7.get_content_type())
def test_playlist_item_to_string(self):
print(self._testMethodName)
ts = Timeslot(id=333, repetition_id=888, start=100, end=200, show=None, episode=None)
pl = Playlist(1, "some description")
item = PlaylistItem(
"https://stream.your.radio", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2")
)
ts.set_playlists(pl, None, None)
pl.add(item)
self.assertEqual(
"PlaylistItem [01:01:40 - 01:01:43 | 3.00sec | Source: ...https://stream.your.radio]",
str(item),
)
def test_playlist_item_play_state(self):
print(self._testMethodName)
item = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
self.assertFalse(item.play.is_loading())
self.assertFalse(item.play.is_ready())
self.assertFalse(item.play.is_playing())
self.assertFalse(item.play.is_done())
self.assertIsNone(item.play.channel)
item.play.set_loading(GenericChannel(1, "main", None))
self.assertTrue(item.play.is_loading())
self.assertFalse(item.play.is_ready())
self.assertFalse(item.play.is_playing())
self.assertFalse(item.play.is_done())
self.assertIsNotNone(item.play.channel)
item.play.set_ready()
self.assertFalse(item.play.is_loading())
self.assertTrue(item.play.is_ready())
self.assertFalse(item.play.is_playing())
self.assertFalse(item.play.is_done())
self.assertIsNone(item.play.play_start)
item.play.set_playing()
self.assertFalse(item.play.is_loading())
self.assertFalse(item.play.is_ready())
self.assertTrue(item.play.is_playing())
self.assertFalse(item.play.is_done())
self.assertIsNotNone(item.play.play_start)
item.play.play_start = 1710445690.0
self.assertEqual(1710445690.0, item.play.get_play_start())
item.play.set_done()
self.assertFalse(item.play.is_loading())
self.assertFalse(item.play.is_ready())
self.assertFalse(item.play.is_playing())
self.assertTrue(item.play.is_done())
def test_update_playlist(self):
print(self._testMethodName)
ts = Timeslot(id=333, repetition_id=888, start=100, end=120, show=None, episode=None)
# Create existing playlists
pl_timeslot = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e4 = PlaylistItem("file4.flac", 7, 100, PlaylistItem.Metadata("a4", "b4", "c4"))
e5 = PlaylistItem("file5.flac", 9, 100, PlaylistItem.Metadata("a5", "b5", "c5"))
e6 = PlaylistItem("file6.flac", 11, 100, PlaylistItem.Metadata("a6", "b6", "c6"))
pl_timeslot.add(e1)
pl_timeslot.add(e2)
pl_timeslot.add(e3)
pl_timeslot.add(e4)
pl_timeslot.add(e5)
pl_timeslot.add(e6)
pl_show = Playlist(1, "some description")
e1 = PlaylistItem("file1_show.flac", 2, 100, PlaylistItem.Metadata("a1s", "b1s", "c1s"))
e2 = PlaylistItem("file2_show.flac", 3, 100, PlaylistItem.Metadata("a2s", "b2s", "c2s"))
e3 = PlaylistItem("file3_show.flac", 5, 100, PlaylistItem.Metadata("a3s", "b3s", "c3s"))
pl_show.add(e1)
pl_show.add(e2)
pl_show.add(e3)
ts.set_playlists(pl_timeslot, None, pl_show)
# Create new remote playlist
pl_timeslot_new = Playlist(1, "some description")
e1 = PlaylistItem(
"file://file1_new.flac", 2, 100, PlaylistItem.Metadata("a1_new", "b1", "c1")
)
e2 = PlaylistItem(
"https://stream.your.radio", 3, 100, PlaylistItem.Metadata("a2_new", "b2", "c2")
)
e3 = PlaylistItem("line://", 5, 100, PlaylistItem.Metadata("a3_new", "b3", "c3"))
e4 = PlaylistItem("m3u://", 5, 100, PlaylistItem.Metadata("a4_new", "b3", "c3"))
pl_timeslot_new.add(e1)
pl_timeslot_new.add(e2)
pl_timeslot_new.add(e3)
pl_timeslot_new.add(e4)
# Test initial state
self.assertEqual(6, len(pl_timeslot.items))
self.assertEqual("a1", pl_timeslot.items[0].metadata.artist)
self.assertEqual("file3.flac", pl_timeslot.items[2].source)
# Test playlist update
pl_timeslot.update_playlist(pl_timeslot_new)
self.assertEqual(4, len(pl_timeslot.items))
self.assertEqual("a1_new", pl_timeslot.items[0].metadata.artist)
self.assertEqual("line://", pl_timeslot.items[2].source)
def test_update_timeslot(self):
print(self._testMethodName)
show1 = Show(1, "show1")
show2 = Show(2, "show2")
episode1 = Episode(11, "title1", "my memo for this episode")
episode2 = Episode(22, "title2", None)
ts1 = Timeslot(id=333, repetition_id=888, start=100, end=120, show=show1, episode=episode1)
ts2 = Timeslot(id=444, repetition_id=999, start=100, end=120, show=show2, episode=episode2)
# Create existing playlists
pl_timeslot = Playlist(1, "some description")
e1 = PlaylistItem("file1.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e4 = PlaylistItem("file4.flac", 7, 100, PlaylistItem.Metadata("a4", "b4", "c4"))
e5 = PlaylistItem("file5.flac", 9, 100, PlaylistItem.Metadata("a5", "b5", "c5"))
e6 = PlaylistItem("file6.flac", 11, 100, PlaylistItem.Metadata("a6", "b6", "c6"))
pl_timeslot.add(e1)
pl_timeslot.add(e2)
pl_timeslot.add(e3)
pl_timeslot.add(e4)
pl_timeslot.add(e5)
pl_timeslot.add(e6)
pl_show = Playlist(2, "some description")
e1 = PlaylistItem("file1_show.flac", 2, 100, PlaylistItem.Metadata("a1s", "b1s", "c1s"))
e2 = PlaylistItem("file2_show.flac", 3, 100, PlaylistItem.Metadata("a2s", "b2s", "c2s"))
e3 = PlaylistItem("file3_show.flac", 5, 100, PlaylistItem.Metadata("a3s", "b3s", "c3s"))
pl_show.add(e1)
pl_show.add(e2)
pl_show.add(e3)
pl_timeslot2 = Playlist(1, "some description")
e1 = PlaylistItem("file1_fresh.flac", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file2_fresh.flac", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file3_fresh.flac", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
e4 = PlaylistItem("file4_fresh.flac", 7, 100, PlaylistItem.Metadata("a4", "b4", "c4"))
e5 = PlaylistItem("file5_fresh.flac", 9, 100, PlaylistItem.Metadata("a5", "b5", "c5"))
e6 = PlaylistItem("file6_fresh.flac", 11, 100, PlaylistItem.Metadata("a6", "b6", "c6"))
e7 = PlaylistItem("file7_fresh.flac", 11, 100, PlaylistItem.Metadata("a7", "b7", "c7"))
pl_timeslot2.add(e1)
pl_timeslot2.add(e2)
pl_timeslot2.add(e3)
pl_timeslot2.add(e4)
pl_timeslot2.add(e5)
pl_timeslot2.add(e6)
pl_timeslot2.add(e7)
pl_schedule = Playlist(3, "some fresh description")
e1 = PlaylistItem(
"fresh_schedule_file1.flac",
2,
100,
PlaylistItem.Metadata("a1fresh", "b1fresh", "c1fresh"),
)
e2 = PlaylistItem(
"fresh_schedule_file2.flac",
3,
100,
PlaylistItem.Metadata("a2fresh", "b2fresh", "c2fresh"),
)
e3 = PlaylistItem(
"fresh_schedule_file3.flac",
5,
100,
PlaylistItem.Metadata("a3fresh", "b3fresh", "c3fresh"),
)
e4 = PlaylistItem(
"fresh_schedule_file4.flac",
15,
100,
PlaylistItem.Metadata("a4fresh", "b4fresh", "c4fresh"),
)
pl_schedule.add(e1)
pl_schedule.add(e2)
pl_schedule.add(e3)
pl_schedule.add(e4)
ts1.set_playlists(pl_timeslot, None, pl_show)
ts2.set_playlists(pl_timeslot2, pl_schedule, None)
# Test initial state
self.assertEqual(333, ts1.get_id())
self.assertEqual(888, ts1.get_repetition_id())
self.assertEqual(1, ts1.get_show().id)
self.assertEqual("title1", ts1.get_episode().title)
self.assertEqual(6, len(ts1.playlists.get(PlaylistType.TIMESLOT).items))
self.assertIsNone(ts1.playlists.get(PlaylistType.SCHEDULE))
self.assertEqual(3, len(ts1.playlists.get(PlaylistType.SHOW).items))
self.assertEqual(6, len(ts1.get_current_playlist().items))
# Do update
ts1.update(ts2)
# Test timeslot update
self.assertEqual(444, ts1.get_id())
self.assertEqual(999, ts1.get_repetition_id())
self.assertEqual(2, ts1.get_show().id)
self.assertEqual("title2", ts1.get_episode().title)
self.assertEqual(7, len(ts1.playlists.get(PlaylistType.TIMESLOT).items))
self.assertEqual(4, len(ts1.playlists.get(PlaylistType.SCHEDULE).items))
self.assertIsNone(ts1.playlists.get(PlaylistType.SHOW))
self.assertEqual(7, len(ts1.get_current_playlist().items))
# Do update without a timeslot playlist
ts2.playlists.set(PlaylistType.TIMESLOT, None)
ts1.update(ts2)
# Test timeslot update
self.assertEqual(444, ts1.get_id())
self.assertEqual(999, ts1.get_repetition_id())
self.assertEqual(2, ts1.get_show().id)
self.assertEqual("title2", ts1.get_episode().title)
self.assertIsNone(ts1.playlists.get(PlaylistType.TIMESLOT))
self.assertEqual(4, len(ts1.playlists.get(PlaylistType.SCHEDULE).items))
self.assertIsNone(ts1.playlists.get(PlaylistType.SHOW))
self.assertEqual(4, len(ts1.get_current_playlist().items))
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect
import time
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.control import EngineExecutor
from aura_engine.core.channels import GenericChannel
from aura_engine.engine import Engine
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot
from aura_engine.scheduling.scheduler import AuraScheduler
from aura_engine.scheduling.utils import TimetableRenderer
class CommandRegistry:
commands: {} = {}
def register(f, cmd: EngineExecutor):
if not CommandRegistry.commands.get(f):
CommandRegistry.commands[f] = []
CommandRegistry.commands.get(f).append(cmd)
def get(f):
return CommandRegistry.commands.get(f)
def reset(f):
c: EngineExecutor
if CommandRegistry.commands.get(f):
for c in CommandRegistry.commands.get(f):
c.cancel()
CommandRegistry.commands[f] = None
def prune():
for key in CommandRegistry.commands.keys():
CommandRegistry.reset(key)
#
# Mocks
#
class MockedPlayer:
"""
Mocked version of engine.
"""
def roll(channel, seconds_to_roll: int):
print(f"called pre-rolling for {channel} seconds on player channel {seconds_to_roll}")
pass
class MockedEngine:
"""
Mocked version of engine.
"""
scheduler: AuraScheduler
player: MockedPlayer
def __init__(self):
"""
Init.
"""
self.scheduler = None
self.player = MockedPlayer
class MockedTimeslotCommand:
"""
Command for triggering start and end of timeslot events.
"""
engine: Engine = None
config: AuraConfig = None
timeslot: Timeslot
def __init__(self, engine: Engine, timeslot: Timeslot):
"""
Initialize the timeslot command.
Args:
engine (Engine): The engine
timeslot (Timeslot): The timeslot which is starting at this time
"""
self.config = AuraConfig.instance.config
self.engine = engine
caller = inspect.stack()[2].function
CommandRegistry.register(caller, self)
print(f"Created instance of MockedTimeslotCommand from {caller}")
self.timeslot = timeslot
self.do_start_timeslot(timeslot)
self.do_end_timeslot(timeslot)
def do_start_timeslot(self, timeslot: Timeslot):
"""
Indicate the start of the timeslot by sending a `on_timeslot_start` event.
"""
caller = inspect.stack()[2].function
print(f"Called do_play of MockedTimeslotCommand from {caller}")
def do_end_timeslot(self, timeslot: Timeslot):
"""
Indicate the start of the timeslot by sending a `on_timeslot_end` event.
Also resetting the used channel.
"""
caller = inspect.stack()[2].function
print(f"Called do_play of MockedTimeslotCommand from {caller}")
def cancel(self):
"""
Cancel the command.
"""
pass
class MockedPlayCommand:
"""
Mocked command for triggering timed preloading and playing.
"""
engine: Engine = None
config: AuraConfig = None
items: list[PlaylistItem]
def __init__(self, engine: Engine, items: list[PlaylistItem]):
"""
Initialize the play command.
Args:
engine (Engine): The engine
items ([PlaylistItem]): One or more playlist items to be started
"""
self.config = AuraConfig.instance.config
self.engine = engine
caller = inspect.stack()[2].function
CommandRegistry.register(caller, self)
print(f"Created instance of MockedPlayCommand from {caller}")
self.items = items
self.do_preload(items)
self.do_play(items)
def do_preload(self, items: list[PlaylistItem]):
"""
Preload the items.
Args:
items ([PlaylistItem]): The set of playlist items to be pre-loaded.
"""
caller = inspect.stack()[2].function
print(f"Called do_preload of MockedPlayCommand from {caller}")
items[0].play_channel = GenericChannel(1, "main channel", None)
def do_play(self, items: list[PlaylistItem]):
"""
Play the items.
Args:
items ([PlaylistItem]): The set of playlist items to be played.
"""
caller = inspect.stack()[2].function
print(f"Called do_play of MockedPlayCommand from {caller}")
def cancel(self):
"""
Cancel the command.
"""
pass
class TestSchedulingScheduler(unittest.TestCase):
"""
Testing the scheduling utils.
"""
engine: Engine
config: AuraConfig
# Setup and teardown
def setUp(self):
self.config = AuraConfig.instance.config
self.engine = Engine.get_instance()
# Init mocked command classes
AuraScheduler.TimeslotCommandClass = MockedTimeslotCommand
AuraScheduler.PlayCommandClass = MockedPlayCommand
def tearDown(self):
pass
#
# Utilities
#
def build_timetable(
self, scheduler: AuraScheduler, current_start, current_end, st="file://"
) -> list[Timeslot]:
# Delete the timetable file to isolate test case
f = scheduler.timetable.timetable_file
scheduler.timetable.timetable_file = f.replace("timetable.json", "timetable-test.json")
#
# Prune command registry
CommandRegistry.prune()
# Build some timetable
now = SU.timestamp()
ts1 = Timeslot(
id=1,
repetition_id=None,
start=now - 500,
end=now + current_start,
show=None,
episode=None,
)
ts2 = Timeslot(
id=2,
repetition_id=None,
start=now + current_start,
end=now + current_end,
show=None,
episode=None,
)
ts3 = Timeslot(
id=3,
repetition_id=None,
start=now + current_end,
end=now + 540,
show=None,
episode=None,
)
ts4 = Timeslot(
id=4, repetition_id=None, start=now + 540, end=now + 720, show=None, episode=None
)
# Playlist for timeslot 2
pl2 = Playlist("9999", "Playlist XYZ", False)
alpha = PlaylistItem(st + "alpha.flac", 100, 100, None)
beta = PlaylistItem(st + "beta.flac", current_end - current_start - 200, 100, None)
gamma = PlaylistItem(st + "gamma.flac", 50, 100, None)
delta = PlaylistItem(st + "delta.flac", 50, 100, None)
pl2.add(alpha)
pl2.add(beta)
pl2.add(gamma)
pl2.add(delta)
# Playlist for timeslot 3
pl3 = Playlist("7777", "Colours", False)
magenta = PlaylistItem(st + "magenta.flac", 100, 100, None)
black = PlaylistItem(st + "black.flac", current_end - current_start - 200, 100, None)
taupe = PlaylistItem(st + "taupe.flac", 100, 100, None)
pl3.add(magenta)
pl3.add(black)
pl3.add(taupe)
ts2.set_playlists(None, pl2, None)
ts3.set_playlists(pl3, None, None)
return [ts1, ts2, ts3, ts4]
#
# Test Cases
#
def test_start_and_terminate_scheduler(self):
print(self._testMethodName)
# Boot the scheduler
scheduler = AuraScheduler(self.engine)
scheduler.boot()
time.sleep(2)
self.assertTrue(scheduler.is_alive())
# Shut the scheduler down
scheduler.terminate()
time.sleep(10)
self.assertFalse(scheduler.is_alive())
def test_get_timetable(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +400)
tt = scheduler.get_timetable()
self.assertIsNotNone(tt)
self.assertEqual(4, len(tt.timetable))
def test_play_active_item_with_roll(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +400)
# Display timetable for debugging
tr = TimetableRenderer(scheduler)
report = tr.get_ascii_timeslots()
print(report)
# Before we proceed, ensure there is really an active item
active_item = scheduler.timetable.get_current_item()
self.assertIsNotNone(active_item)
scheduler.play_active_item()
cmds = CommandRegistry.get(self._testMethodName)
# Expecting at least one timeslot and a play command
self.assertEqual(2, len(cmds))
self.assertTrue(isinstance(cmds[0], MockedTimeslotCommand))
self.assertTrue(isinstance(cmds[1], MockedPlayCommand))
timeslot_cmd: MockedTimeslotCommand = cmds[0]
play_cmd: MockedPlayCommand = cmds[1]
self.assertEqual(2, timeslot_cmd.timeslot.get_id())
self.assertEqual(1, len(play_cmd.items))
channel: GenericChannel = play_cmd.items[0].play_channel
self.assertEqual("main channel", channel.name)
def test_play_active_item_no_roll(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +5)
scheduler.play_active_item()
cmds = CommandRegistry.get(self._testMethodName)
# Expecting at least one timeslot command
self.assertEqual(1, len(cmds))
self.assertTrue(isinstance(cmds[0], MockedTimeslotCommand))
def test_play_active_invalid_source(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +5, "3d_video://")
scheduler.play_active_item()
cmds = CommandRegistry.get(self._testMethodName)
# Expecting at least one timeslot command
self.assertEqual(1, len(cmds))
self.assertTrue(isinstance(cmds[0], MockedTimeslotCommand))
def test_queue_program_startup_items(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +400)
scheduler.queue_startup_items()
cmds = CommandRegistry.get("queue_startup_items")
# Expecting only one play command, is the timeslot command was created
# when scheduling the active item already
self.assertEqual(1, len(cmds))
self.assertTrue(isinstance(cmds[0], MockedPlayCommand))
play_cmd: MockedPlayCommand = cmds[0]
self.assertEqual(2, len(play_cmd.items))
channel: GenericChannel = play_cmd.items[0].play_channel
self.assertEqual("main channel", channel.name)
def test_queue_program_nothing_next_in_window(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +400)
scheduler.queue_program()
cmds = CommandRegistry.get(self._testMethodName)
# Expecting no command as everything is out of the scheduling window
self.assertIsNone(cmds)
def test_queue_program_valid_next_in_window(self):
print(self._testMethodName)
# Construct the scheduler
scheduler = AuraScheduler(MockedEngine())
# Build some timetable
scheduler.timetable.timetable = self.build_timetable(scheduler, -400, +30)
scheduler.queue_program()
cmds = CommandRegistry.get(self._testMethodName)
cmds += CommandRegistry.get("queue_program")
# Expecting one timeslot and one play command for the next timeslot
self.assertEqual(2, len(cmds))
self.assertTrue(isinstance(cmds[0], MockedTimeslotCommand))
self.assertTrue(isinstance(cmds[1], MockedPlayCommand))
timeslot_cmd: MockedTimeslotCommand = cmds[0]
play_cmd: MockedPlayCommand = cmds[1]
self.assertEqual(3, timeslot_cmd.timeslot.get_id())
self.assertEqual(3, len(play_cmd.items))
channel: GenericChannel = play_cmd.items[0].play_channel
self.assertEqual("main channel", channel.name)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from time import sleep
from aura_engine.base.config import AuraConfig
from aura_engine.base.logger import AuraLogger
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.channels import PlayoutStatusResponse
from aura_engine.engine import Engine, Player
from aura_engine.events import EngineEventDispatcher
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Show, Timeslot
from aura_engine.scheduling.scheduler import AuraScheduler, PlayCommand, TimeslotCommand
from tests.core_client_mock import PlayoutClientMock
#
# Mocks
#
class MockedEngine:
"""
Mocked version of engine.
"""
scheduler: AuraScheduler
player: Player
def __init__(self):
"""
Init.
"""
self.scheduler = None
self.player = Player()
class TestSchedulingSchedulerCoreMock(unittest.TestCase):
"""
Testing the scheduling utils.
"""
engine: Engine
client: PlayoutClientMock
scheduler: AuraScheduler
# Setup and teardown
def setUp(self):
config = AuraConfig.instance.config
AuraLogger(config)
engine = Engine.get_instance()
dispatcher = EngineEventDispatcher(engine=engine)
client = PlayoutClientMock(event_dispatcher=dispatcher)
scheduler = AuraScheduler(engine=engine)
engine.event_dispatcher = dispatcher
engine.player = Player(client, dispatcher)
self.engine = engine
self.client = client
self.scheduler = scheduler
def tearDown(self):
pass
#
# Utilities
#
def build_timetable(
self, current_start: int, current_end: int, next_duration: int = 5
) -> list[Timeslot]:
# Delete the timetable file to isolate test case
f = self.scheduler.timetable.timetable_file
self.scheduler.timetable.timetable_file = f.replace(
"timetable.json", "timetable-test.json"
)
#
# Build some timetable
now = SU.timestamp()
ts1 = Timeslot(
id=1,
repetition_id=None,
start=now,
end=now + current_start,
show=Show(id="1", name="Warmup Show"),
episode=None,
)
ts2 = Timeslot(
id=2,
repetition_id=None,
start=now + current_start,
end=now + current_end,
show=Show(id="2", name="Main Show"),
episode=None,
)
ts3 = Timeslot(
id=3,
repetition_id=None,
start=now + current_end,
end=now + current_end + next_duration,
show=Show(id="3", name="After Show"),
episode=None,
)
# Playlist for timeslot 2
item_duration = float(current_end - current_start) / 2.0
i2a = PlaylistItem("http://stream.local/", item_duration, 100, None)
i2b = PlaylistItem("http://stream2.local/", item_duration, 100, None)
pl2 = Playlist("2222", "Streams,", False)
pl2.add(i2a)
pl2.add(i2b)
# Playlist for timeslot 3
it3 = PlaylistItem("file://2", next_duration, 100, None)
pl3 = Playlist("3333", "Files", False)
pl3.add(it3)
ts2.set_playlists(pl2, None, None)
ts3.set_playlists(pl3, None, None)
return [ts2, ts3]
def run_scheduling_test(self, current_start: int, current_end: int, next_duration: int):
timeslots = self.build_timetable(current_start, current_end, next_duration)
next_timeslot: Timeslot
for next_timeslot in timeslots:
# Create command timer to indicate the start of the timeslot
AuraScheduler.TimeslotCommandClass(self.engine, next_timeslot)
playlist = next_timeslot.get_current_playlist()
if playlist:
self.scheduler.queue_playlist_items(next_timeslot, playlist.items)
# Finish test when scheduler is done
sleep(current_start + current_end + next_duration)
#
# Test Cases
#
def test_queue_playlist_items_immediately(self):
"""
Tests a scenario where the input stream source can't be reached.
Expected behavior: try to connect until duration is over; then invalidate item and stop related logging.
(Background thread should terminate when last timeslot has passed.)
"""
print(self._testMethodName)
# Configure client and scheduler
self.client.stream_status = PlayoutStatusResponse.STREAM_STATUS_POLLING
self.scheduler.config.scheduler.preload_offset = 10
# Set timetable
current_start = 5
current_end = 65
next_duration = 5
# Run test
self.run_scheduling_test(current_start, current_end, next_duration)
def test_queue_playlist_items_timer(self):
"""
Like testcase above but spawns a timer which should finish when next item starts.
"""
print(self._testMethodName)
# Configure client and scheduler
self.client.stream_status = PlayoutStatusResponse.STREAM_STATUS_POLLING
self.scheduler.config.scheduler.preload_offset = 5
# Set timetable
current_start = 10
current_end = 60
next_duration = 5
# Run test
self.run_scheduling_test(current_start, current_end, next_duration)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pathlib
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import DotDict
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.domain import (
Episode,
Playlist,
PlaylistItem,
PlaylistType,
Show,
Timeslot,
)
from aura_engine.scheduling.timetable import TimetableMerger, TimetableService
class TestSchedulingTimetable(unittest.TestCase):
"""
Testing the timetable service.
"""
config = None
timetable = None
api_fetcher = None
mocked_steering_json = None
mocked_tank_json = None
#
# Mock
#
class MockedApiFetcher:
def __init__(self, code, message, timeslots, exception):
self.code = code
self.message = message
self.timeslots = timeslots
self.exception = exception
def start(self):
print("Called mocked 'start()'")
def fetch(self):
print("Called mocked 'fetch()'")
return DotDict(
{
"code": self.code,
"message": self.message,
"timeslots": self.timeslots,
"exception": self.exception,
}
)
def terminate(self):
pass
#
# Setup and teardown
#
def setUp(self):
self.config = AuraConfig.instance.config
cache_location = self.config.general.cache_dir
self.timetable = TimetableService(cache_location)
f = self.timetable.timetable_file
self.timetable.timetable_file = f.replace("timetable.json", "timetable-test.json")
def tearDown(self):
pass
#
# Test Cases
#
def test_timetable_init(self):
print(self._testMethodName)
self.assertEqual("./.cache/timetable", self.timetable.cache_location)
self.assertEqual("./.cache/timetable/timetable-test.json", self.timetable.timetable_file)
def test_get_current_timeslot(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 60, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 60, end=now + 60, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 60, end=now + 120, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
self.assertEqual(ts2, self.timetable.get_current_timeslot())
def test_get_no_current_timeslot(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 60, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 60, end=now + 120, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
self.timetable.timetable = [ts1, ts3, ts4]
self.assertIsNone(self.timetable.get_current_timeslot())
def test_get_next_timeslots(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 60, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 60, end=now + 60, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 60, end=now + 120, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
next = self.timetable.get_next_timeslots()
self.assertEqual(2, len(next))
next = self.timetable.get_next_timeslots(1)
self.assertEqual(1, len(next))
def test_get_next_timeslots_in_window(self):
print(self._testMethodName)
self.config.scheduler.scheduling_window_start = 80
self.config.scheduler.scheduling_window_end = 10
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 60, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 60, end=now + 60, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 60, end=now + 120, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
next = self.timetable.get_next_timeslots(window_aware=True)
self.assertEqual(1, len(next))
next = self.timetable.get_next_timeslots(max_count=1, window_aware=True)
self.assertEqual(1, len(next))
def test_get_no_next_timeslots(self):
print(self._testMethodName)
self.timetable.timetable = []
next = self.timetable.get_next_timeslots()
self.assertEqual(0, len(next))
def test_get_current_item_from_timeslot_playlist(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 500, end=now - 400, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 400, end=now + 400, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 400, end=now + 540, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 540, end=now + 720, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 100, 100, None)
beta = PlaylistItem("beta.flac", 600, 100, None)
gamma = PlaylistItem("gamma.flac", 100, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
current = self.timetable.get_current_item()
print(f"Current item is '{current.source}'")
self.assertEqual(beta, current)
self.assertEqual(PlaylistType.TIMESLOT, current.playlist.get_type())
def test_get_current_item_from_schedule_playlist(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 500, end=now - 400, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 400, end=now + 400, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 400, end=now + 540, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 540, end=now + 720, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 100, 100, None)
beta = PlaylistItem("beta.flac", 600, 100, None)
gamma = PlaylistItem("gamma.flac", 100, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(None, pl, None)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
current = self.timetable.get_current_item()
print(f"Current item is '{current.source}'")
self.assertEqual(beta, current)
self.assertEqual(PlaylistType.SCHEDULE, current.playlist.get_type())
def test_is_timeslot_in_window(self):
print(self._testMethodName)
# Expecting a configured scheduling window of T1(-60) - T2(-60) seconds.
now = SU.timestamp()
ts = Timeslot(
id=333, repetition_id=None, start=now + 30, end=now + 180, show=None, episode=None
)
self.assertTrue(self.timetable.is_timeslot_in_window(ts))
ts = Timeslot(
id=333, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
self.assertFalse(self.timetable.is_timeslot_in_window(ts))
ts = Timeslot(
id=333, repetition_id=None, start=now - 360, end=now - 180, show=None, episode=None
)
self.assertFalse(self.timetable.is_timeslot_in_window(ts))
def test_timetable_persist_and_load(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=2, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=3, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=4, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(None, pl, None)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
# Store to file
self.timetable.persist_timetable()
self.assertTrue(pathlib.Path("./.cache/timetable/timetable-test.json").is_file())
self.assertIsNotNone(self.timetable.timetable)
self.assertEqual(1, self.timetable.timetable[0].get_id())
self.assertEqual(2, self.timetable.timetable[1].get_id())
self.assertEqual(3, self.timetable.timetable[2].get_id())
self.assertEqual(4, self.timetable.timetable[3].get_id())
ts2: Timeslot = self.timetable.timetable[1]
pl_item: PlaylistItem = ts2.get_current_playlist().items[2]
self.assertEqual("gamma.flac", pl_item.source)
def test_timetable_merge_and_persist(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=2, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=3, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=4, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
ts5 = Timeslot(
id=5, repetition_id=None, start=now + 360, end=now + 420, show=None, episode=None
)
ts6 = Timeslot(
id=6, repetition_id=None, start=now + 420, end=now + 540, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
# New timeslot introduced remotely, one deleted remotely
local_timeslots = [ts1, ts2, ts3, ts4, ts5]
remote_timeslots = [ts1, ts2, ts3, ts4, ts6]
self.timetable.timetable = local_timeslots
self.timetable.merge_timetable(remote_timeslots)
final_timeslots = self.timetable.timetable
self.assertEqual(4, len(final_timeslots))
# Dump to json file
self.timetable.persist_timetable()
def test_timetable_merge_past_timeslots_local(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
# Only one local timeslot from the past and one currently playing
local_timeslots = [ts1, ts2]
remote_timeslots = []
self.timetable.timetable = local_timeslots
self.timetable.merge_timetable(remote_timeslots)
final_timeslots = self.timetable.timetable
self.assertEqual(1, len(final_timeslots))
# Dump to json file
self.timetable.persist_timetable()
def test_timetable_merge_past_timeslot_remote(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
# New timeslot introduced remotely, but past scheduling window
local_timeslots = []
remote_timeslots = [ts1]
self.timetable.timetable = local_timeslots
self.timetable.merge_timetable(remote_timeslots)
final_timeslots = self.timetable.timetable
self.assertEqual(1, len(final_timeslots))
# Dump to json file
self.timetable.persist_timetable()
def test_fetch_data(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
timeslots = [ts1, ts2, ts3, ts4]
fetcher = self.MockedApiFetcher(0, "Success", timeslots, None)
self.timetable.api_fetcher = fetcher
self.timetable.refresh()
def test_fetch_no_data(self):
print(self._testMethodName)
fetcher = self.MockedApiFetcher(1, "No timeslots", None, None)
self.timetable.api_fetcher = fetcher
self.timetable.refresh()
def test_fetch_exception(self):
print(self._testMethodName)
fetcher = self.MockedApiFetcher(1, "Some weird exception", None, "Quack")
self.timetable.api_fetcher = fetcher
self.timetable.refresh()
def test_terminate(self):
print(self._testMethodName)
fetcher = self.MockedApiFetcher(1, "", None, "")
self.timetable.terminate()
self.timetable.api_fetcher = fetcher
self.timetable.refresh()
self.timetable.terminate()
def test_merger_remove_timeslots_deleted_remotely(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=2, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=3, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=4, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
# Nothing changed
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts1, ts2, ts3, ts4]
merger = TimetableMerger()
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(3, len(final_timeslots))
# One deleted remotely
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts1, ts2, ts4]
merger = TimetableMerger()
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(2, len(final_timeslots))
# One in the past was deleted
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts2, ts3, ts4]
merger = TimetableMerger()
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(3, len(final_timeslots))
# Two deleted remotely, while one started playing already => only one removed locally
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts1, ts4]
merger = TimetableMerger()
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(2, len(final_timeslots))
# Three deleted remotely, while one started playing already
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts4]
merger = TimetableMerger()
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(2, len(final_timeslots))
def test_merger_add_timeslots_remotely(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=2, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=3, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=4, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
# New Timeslot introduced remotely
ts5 = Timeslot(
id=5, repetition_id=None, start=now + 360, end=now + 420, show=None, episode=None
)
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts1, ts2, ts3, ts4, ts5]
merger = TimetableMerger()
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(4, len(final_timeslots))
def test_merger_update_timeslot(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=2, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=3, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=4, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts3.set_playlists(pl, None, None)
# Create updated timeslot
show = Show(22, "Updated Show")
episode = Episode("123", "Updated Episode", "My memo")
ts3_updated = Timeslot(
id=3, repetition_id=1234, start=now + 120, end=now + 180, show=show, episode=episode
)
pl = Playlist("888", "Updated Playlist", False)
alpha = PlaylistItem("alpha_new.flac", 20, 100, None)
beta = PlaylistItem("beta_new.flac", 100, 100, None)
gamma = PlaylistItem("gamma_new.flac", 20, 100, None)
delta = PlaylistItem("delta_new.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
pl.add(delta)
ts3_updated.set_playlists(pl, None, None)
local_timeslots = [ts1, ts2, ts3, ts4]
remote_timeslots = [ts1, ts2, ts3_updated, ts4]
merger = TimetableMerger()
# Check timeslot before merge
self.assertIsNone(ts3.get_show())
self.assertIsNone(ts3.get_episode())
self.assertEqual(None, ts3.get_repetition_id())
current_pl = ts3.get_current_playlist()
self.assertEqual("Playlist X", current_pl.get_description())
items: list[PlaylistItem] = current_pl.get_items()
self.assertEqual(3, len(items))
self.assertEqual("alpha.flac", items[0].source)
self.assertEqual("beta.flac", items[1].source)
self.assertEqual("gamma.flac", items[2].source)
final_timeslots = merger.merge(local_timeslots, remote_timeslots)
self.assertEqual(3, len(final_timeslots))
# Check timeslot after merge
self.assertIsNotNone(ts3.get_show())
self.assertEqual("Updated Show", ts3.get_show().name)
self.assertIsNotNone(ts3.get_episode())
self.assertEqual("Updated Episode", ts3.get_episode().title)
self.assertEqual(1234, ts3.get_repetition_id())
current_pl = ts3.get_current_playlist()
self.assertEqual("Updated Playlist", current_pl.get_description())
items: list[PlaylistItem] = current_pl.get_items()
self.assertEqual(4, len(items))
self.assertEqual("alpha_new.flac", items[0].source)
self.assertEqual("beta_new.flac", items[1].source)
self.assertEqual("gamma_new.flac", items[2].source)
self.assertEqual("delta_new.flac", items[3].source)
def test_persist_empty_timetable(self):
"""Try to store a empty timetable"""
# TODO: implement test
pass
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot
from aura_engine.scheduling.timetable import TimetableService
from aura_engine.scheduling.utils import M3UPlaylistProcessor, TimetableRenderer
class MockedScheduler:
timetable: TimetableService
def __init__(self, timetable: list[Timeslot]):
self.timetable = timetable
def get_timetable(self) -> TimetableService:
return self.timetable
class TestSchedulingUtils(unittest.TestCase):
"""
Testing the scheduling utils.
"""
config = None
m3u = None
timetable: TimetableService
# Setup and teardown
def setUp(self):
self.config = AuraConfig.instance.config
self.m3u = M3UPlaylistProcessor()
self.m3u.playlist_folder = "./tests/m3u/"
cache_location = self.config.general.cache_dir
self.timetable = TimetableService(cache_location)
def tearDown(self):
pass
#
# Test Cases
#
def test_spread_m3u_playlist(self):
print(self._testMethodName)
item = PlaylistItem("m3u://sample.m3u", None, 100, PlaylistItem.Metadata("", "", ""))
items = self.m3u.spread(item)
self.assertIsNotNone(items)
self.assertEqual(4, len(items))
self.assertEqual("file://music-lib/Unser Album/Dein Lied.ogg", items[1].source)
self.assertEqual("Alle", items[1].metadata.artist)
self.assertEqual("Dein Lied", items[1].metadata.title)
def test_spread_no_m3u_item(self):
print(self._testMethodName)
item = PlaylistItem("file://no-m3u.mp3", None, 100, PlaylistItem.Metadata("", "", ""))
items = self.m3u.spread(item)
self.assertIsNotNone(items)
self.assertEqual(1, len(items))
self.assertEqual("file://no-m3u.mp3", items[0].source)
def test_spread_no_m3u_playlist(self):
print(self._testMethodName)
item = PlaylistItem("m3u://404.m3u", None, 100, PlaylistItem.Metadata("", "", ""))
items = self.m3u.spread(item)
self.assertEqual(0, len(items))
def test_timetable_renderer(self):
print(self._testMethodName)
# Build timeslots
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 180, end=now - 120, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 120, end=now + 120, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 120, end=now + 180, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 180, end=now + 360, show=None, episode=None
)
# Set playlist for timeslot 2 (currently playing)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("alpha.flac", 20, 100, None)
beta = PlaylistItem("beta.flac", 100, 100, None)
gamma = PlaylistItem("gamma.flac", 20, 100, None)
delta = PlaylistItem("delta.flac", 200, 100, None)
epsilon = PlaylistItem("epsilon.flac", 300, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
pl.add(delta)
pl.add(epsilon)
ts2.set_playlists(None, pl, None)
# Set playlist for timeslot 3 (playing next)
pl = Playlist("555", "Playlist Y", False)
alpha = PlaylistItem("red.flac", 20, 100, None)
beta = PlaylistItem("green.flac", 100, 100, None)
gamma = PlaylistItem("blue.flac", 20, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts3.set_playlists(pl, None, None)
timeslots = [ts1, ts2, ts3, ts4]
self.timetable.timetable = timeslots
scheduler = MockedScheduler(self.timetable)
# Build and display report
tr = TimetableRenderer(scheduler)
report = tr.get_ascii_timeslots()
print(report)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import unittest
from unittest import mock
import requests
from aura_engine.base.api import SimpleRestApi
from aura_engine.base.config import AuraConfig
class TestApi(unittest.TestCase):
"""
Test the simple API wrapper.
"""
config = None
api = None
#
# Mock
#
# Mock `requests.get` in `SimpleRestApi`
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
print(f"Calling mocked 'requests.get' with '{args[0]}'")
if args[0] == "http://aura.test.available":
return MockResponse({"foo": "bar"}, 200)
if args[0] == "http://aura.test.available/bad-request":
return MockResponse({}, 400)
if args[0] == "http://aura.test.available/connection-error":
raise requests.exceptions.ConnectionError
if args[0] == "http://aura.test.available/timeout":
raise requests.exceptions.Timeout
if args[0] == "http://aura.test.available/exception":
raise Exception
if args[0] == "http://aura.test.available/not-found":
return MockResponse({}, 404)
if args[0] == "https://some.website.no.api":
return MockResponse({}, 405)
return MockResponse(None, 404)
# Mock `requests.put` in `SimpleRestApi`
def mocked_requests_put(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
print(f"Calling mocked 'requests.put' with '{args[0]}'")
if args[0] == "http://aura.test.available/bad-request":
return MockResponse({}, 400)
if args[0] == "http://aura.test.available/not-found":
return MockResponse({}, 404)
return MockResponse(None, 404)
#
# Setup
#
def setUp(self):
self.config = AuraConfig.instance.config
self.api = SimpleRestApi()
#
# Tests
#
def test_clean_dict(self):
print(self._testMethodName)
data = {"foo": {"bar": None}, "foo2": None}
self.assertEqual(2, len(data.keys()))
data = self.api.clean_dictionary(data)
self.assertEqual(1, len(data.keys()))
self.assertEqual(["foo"], list(data.keys()))
def test_serialize_json(self):
print(self._testMethodName)
import re
data = {"foo": "bar", "fooBar2000": None}
json_string = '{"foo":"bar"}'
# Success
result = self.api.serialize_json(data)
result = re.sub(r'[^a-z0-9\{\}\:"]+', "", result, flags=re.IGNORECASE)
self.assertEqual(json_string, result)
# Check for cleaned up key
try:
print(result.index("fooBar2000"))
self.assertFalse("Error: Value found in dict")
except ValueError:
self.assertTrue("Value not found in dict")
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get(self, mock_get):
print(self._testMethodName)
result = self.api.get("http://aura.test.available")
# Success
self.assertEqual(200, result.response.status_code)
self.assertEqual("bar", result.json["foo"])
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_bad_request(self, mock_get):
print(self._testMethodName)
result = self.api.get("http://aura.test.available/bad-request")
# Bad Request
self.assertEqual(400, result.response.status_code)
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_connection_error(self, mock_get):
print(self._testMethodName)
result = self.api.get("http://aura.test.available/connection-error")
# Bad Request
self.assertEqual(400, result.response.status_code)
@mock.patch("aura_engine.base.api.requests.post")
def test_post_success(self, mock_post):
mock_post.return_value.status_code = 201
mock_post.return_value.json.return_value = "mock response"
print(self._testMethodName)
data = {"foo": "bar"}
result = self.api.post("http://aura.test.available/api", data=data)
mock_post.assert_called_once_with(
"http://aura.test.available/api",
data=json.dumps({"foo": "bar"}, indent=4, sort_keys=True, default=str),
headers={"content-type": "application/json"},
timeout=5,
)
# print(result)
self.assertEqual(201, result.response.status_code)
@mock.patch("aura_engine.base.api.requests.put")
def test_put_success(self, mock_post):
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = "mock response"
print(self._testMethodName)
data = {"foo": "bar"}
result = self.api.put("http://aura.test.available/api", data=data)
mock_post.assert_called_once_with(
"http://aura.test.available/api",
data=json.dumps({"foo": "bar"}, indent=4, sort_keys=True, default=str),
headers={"content-type": "application/json"},
timeout=5,
)
# print(result)
self.assertEqual(200, result.response.status_code)
@mock.patch("aura_engine.base.api.requests.put", side_effect=mocked_requests_put)
def test_put(self, mock_put):
print(self._testMethodName)
data = {"foo": "bar"}
# Bad request: Invalid URL
result = self.api.put("http://aura.test.available/bad-request", data=data)
# print(result)
self.assertEqual(400, result.response.status_code)
# Not found
result = self.api.put("http://aura.test.available/not-found", data=data)
# print(result)
self.assertEqual(404, result.response.status_code)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import unittest
from unittest import mock
import confuse
from aura_engine.base.api import SimpleCachedRestApi, SimpleRestApi
from aura_engine.base.config import AuraConfig
class TestCachedApi(unittest.TestCase):
"""
Test the simple API wrapper.
"""
config: confuse.Configuration
api = None
#
# Mock
#
# Mock `requests.get` in `SimpleRestApi`
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
print(f"Calling mocked 'requests.get' with '{args[0]}'")
if args[0] == "http://aura.test.available/dummy-api/v1/playout":
return MockResponse({"foo": "bar"}, 200)
elif args[0] == "http://aura.test.404/dummy-api/v1/playout":
return MockResponse(None, 404)
elif args[0] == "http://aura.test.not-json/dummy-api/v1/not-json":
return MockResponse("{-that's-definitely-not-json}", 200)
return MockResponse(None, 404)
#
# Setup
#
def setUp(self):
self.config = AuraConfig.instance.config
cache_location = self.config.general.cache_dir
self.api = SimpleCachedRestApi(SimpleRestApi(), cache_location)
#
# Test Cases
#
def test_build_filename(self):
url = "https://dashboard.aura.radio/steering/api/v1/playout"
filename = self.api.build_filename(url)
# Success
expected = "steering-api-v1-playout.json"
self.assertEqual(expected, filename)
def test_prune_cache_dir(self):
dir = self.api.cache_location
f = dir + "dummy-file"
with open(f, "a"):
os.utime(f, None)
count = len(os.listdir(dir))
self.assertNotEqual(0, count)
self.api.prune_cache_dir()
count = len(os.listdir(dir))
self.assertEqual(0, count)
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_from_network(self, mock_get):
print(self._testMethodName)
self.api.prune_cache_dir()
url = "http://aura.test.available/dummy-api/v1/playout"
result = self.api.get(url)
# 200 - Success
self.assertEqual(200, result.response.status_code)
self.assertEqual("bar", result.json.get("foo"))
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_from_cache(self, mock_get):
print(self._testMethodName)
self.api.prune_cache_dir()
# Ensure a cached response is created
self.api.get("http://aura.test.available/dummy-api/v1/playout")
# Read the same endpoint from an invalid to domain, enforcing a local cache response
result = self.api.get("http://aura.test.404/dummy-api/v1/playout")
# Read from local cache: 304 - Not Modified
self.assertEqual(304, result.response.status_code)
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_not_existing_from_cache(self, mock_get):
print(self._testMethodName)
self.api.prune_cache_dir()
# Read the same endpoint from an invalid to domain, enforcing a local cache response
result = self.api.get("http://aura.test.404/dummy-api/v1/404")
# Read from local cache: 404 - Not Found
self.assertEqual(404, result.response.status_code)
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_from_cache_with_invalid_json(self, mock_get):
print(self._testMethodName)
self.api.prune_cache_dir()
# Get response with invalid JSON data
result = self.api.get("http://aura.test.not-json/dummy-api/v1/not-json")
self.assertEqual(200, result.response.status_code)
# self.assertEqual(None, result.json)
self.assertEqual("{-that's-definitely-not-json}", result.json)
# Read the same endpoint from an invalid to domain, enforcing a local cache response
result = self.api.get("http://aura.test.404/dummy-api/v1/not-json")
# Read from local cache: 304 - Not Found
self.assertEqual(304, result.response.status_code)
self.assertEqual("{-that's-definitely-not-json}", result.json)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import unittest
from unittest import mock
import requests
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
class TestSimpleUtil(unittest.TestCase):
"""
Test simple utility functions.
"""
config = None
#
# Setup
#
def setUp(self):
self.config = AuraConfig.instance.config
#
# Tests
#
def test_timestamp(self):
print(self._testMethodName)
ts = SU.timestamp()
self.assertIsNotNone(ts)
dt = datetime.datetime(2024, 3, 15, 14, 8, 33, 0)
ts = SU.timestamp(dt)
self.assertIsNotNone(ts)
self.assertEqual(1710508113.0, ts)
def test_timestamp_to_datetime(self):
print(self._testMethodName)
expected_dt = datetime.datetime(2024, 3, 15, 14, 8, 33, 0)
ts = 1710508113.0
dt = SU.timestamp_to_datetime(ts)
self.assertIsNotNone(dt)
self.assertEqual(expected_dt, dt)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
import time
import unittest
from aura_engine.base.lang import synchronized
class MessageHolder:
message = None
def __init__(self):
self.message = "no message"
def set_message(self, message: str, sleep_time: float):
print(f"Updating message to '{message}' in {sleep_time} seconds")
time.sleep(sleep_time)
self.message = message
# print("Message updated")
@synchronized
def set_synced_message(self, message: str, sleep_time: float):
print(f"Synced: Updating message to '{message}' in {sleep_time} seconds")
time.sleep(sleep_time)
self.message = message
def get_message(self):
return self.message
class TestSynchronized(unittest.TestCase):
"""
Testing the Logger.
"""
mh = None
def setUp(self):
self.mh = MessageHolder()
def test_not_synchronized(self):
print(self._testMethodName)
# Functions to update the message via threads
def fast_cat_1():
self.mh.set_message("fast cat 1", 0.05)
def sleepy_dog_1():
self.mh.set_message("sleepy dog 1", 0.5)
def fast_cat_2():
self.mh.set_message("fast cat 2", 0.1)
def sleepy_dog_2():
self.mh.set_message("sleepy dog 2", 1)
# CASE#0: Get initial message
msg = self.mh.get_message()
print(msg)
self.assertEqual("no message", msg)
# Start threads
thread1 = threading.Thread(target=fast_cat_1)
thread2 = threading.Thread(target=sleepy_dog_1)
thread3 = threading.Thread(target=sleepy_dog_2)
thread4 = threading.Thread(target=fast_cat_2)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
# CASE#1: First thread quickly updates the message
time.sleep(0.08)
msg = self.mh.get_message()
print(msg)
self.assertEqual("fast cat 1", msg)
# CASE#2: Last thread has overtaken the two slow ones
time.sleep(0.12)
msg = self.mh.get_message()
print(msg)
self.assertEqual("fast cat 2", msg)
# # CASE#3: Slow one arrived
time.sleep(0.5)
msg = self.mh.get_message()
print(msg)
self.assertEqual("sleepy dog 1", msg)
# # CASE#3: The other slow one arrived
time.sleep(0.5)
msg = self.mh.get_message()
print(msg)
self.assertEqual("sleepy dog 2", msg)
thread1.join()
thread2.join()
thread3.join()
thread4.join()
# TODO Investigate and uncomment again
# @see https://gitlab.servus.at/aura/engine/-/issues/122
# def test_synchronized(self):
# print(self._testMethodName)
# # Functions to update the message via threads
# def fast_cat_1():
# self.mh.set_synced_message("fast cat 1", 0.1)
# def sleepy_dog_1():
# self.mh.set_synced_message("sleepy dog 1", 0.5)
# def sleepy_dog_2():
# self.mh.set_synced_message("sleepy dog 2", 0.5)
# def fast_cat_2():
# self.mh.set_synced_message("fast cat 2", 0.3)
# # CASE#0: Get initial message
# msg = self.mh.get_message()
# print(msg)
# self.assertEqual("no message", msg)
# # Start threads
# thread1 = threading.Thread(target=fast_cat_1)
# thread2 = threading.Thread(target=sleepy_dog_1)
# thread3 = threading.Thread(target=sleepy_dog_2)
# thread4 = threading.Thread(target=fast_cat_2)
# thread1.start()
# time.sleep(0.01)
# thread2.start()
# time.sleep(0.01)
# thread3.start()
# time.sleep(0.01)
# thread4.start()
# # CASE#1: First thread quickly updates the message
# time.sleep(0.2)
# msg = self.mh.get_message()
# print(msg)
# self.assertEqual("fast cat 1", msg)
# # # CASE#2: Any fast cat has to wait for this dog
# time.sleep(0.7)
# msg = self.mh.get_message()
# print(msg)
# self.assertEqual("sleepy dog 1", msg)
# # # CASE#3: And for the other dog too
# time.sleep(0.2)
# msg = self.mh.get_message()
# print(msg)
# self.assertEqual("sleepy dog 2", msg)
# # # CASE#3: Finally it's the fast cats turn
# time.sleep(0.4)
# msg = self.mh.get_message()
# print(msg)
# self.assertEqual("fast cat 2", msg)
# thread1.join()
# thread2.join()
# thread3.join()
# thread4.join()
if __name__ == "__main__":
unittest.main()