Newer
Older
#
# Aura Engine
#
# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib
import logging
import os.path
from os import path
from modules.communication.redis.adapter import ClientRedisAdapter
class Monitoring:
"""
Engine Monitoring
"""
logger = None
soundsystem = None
status = None
def __init__(self, config, soundsystem):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.status = dict()
self.status["soundsystem"] = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
def update_status(self):
"""
Requests the current status of all components
"""
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status()
#self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["redis_ready"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
# Set overall status
if self.has_valid_status():
self.status["engine_status"] = "OK"
else:
self.status["engine_status"] = "INVALID"
def get_io_state(self):
"""
Retrieves all input and outputs provided by the engine.
"""
ios = self.soundsystem.engine_state()
# ios = ios.replace("\\", "")
try:
# ios = ios.replace(": },", "''},")
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return "[ERROR]"
def get_status(self):
"""
Retrieves the current monitoring status.
"""
self.update_status()
return self.status
def has_valid_status(self):
"""
Checks if the current status is valid to run engine
"""
try:
if self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis_ready"] \
and self.status["audio_store"]["exists"]:
return True
return False
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
return False
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
return status
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
return json.loads(data, strict=False)