Newer
Older
#
# Aura Engine
#
# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib
import logging
from modules.communication.redis.adapter import ClientRedisAdapter
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
class Monitoring:
"""
Engine Monitoring
"""
logger = None
soundsystem = None
status = None
def __init__(self, config, soundsystem):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.status = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status()
#self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["api"]["steering"]["url"] = self.config.get("api_steering_status")
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["url"] = self.config.get("api_tank_status")
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url"))
self.status["redis_ready"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
self.status["engine"]["status"] = MonitorResponseCode.OK.value
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
def get_io_state(self):
"""
Retrieves all input and outputs provided by the engine.
"""
ios = self.soundsystem.engine_state()
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
def get_status(self):
"""
Retrieves the current monitoring status.
"""
self.update_status()
return self.status
def has_valid_status(self):
"""
Checks if the current status is valid to run engine
"""
try:
if self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis_ready"] \
and self.status["audio_store"]["exists"]:
return True
return False
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
return False
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))