# # Aura Engine # # Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import urllib import logging import json import os.path from os import path from modules.communication.redis.adapter import ClientRedisAdapter class Monitoring: """ Engine Monitoring """ logger = None soundsystem = None status = None def __init__(self, config, soundsystem): """ Initialize Monitoring """ self.logger = logging.getLogger("AuraEngine") self.config = config self.soundsystem = soundsystem self.status = dict() self.status["soundsystem"] = dict() self.status["api"] = dict() self.status["api"]["steering"] = dict() self.status["api"]["tank"] = dict() def update_status(self): """ Requests the current status of all components """ self.soundsystem.enable_transaction(self.soundsystem.client) self.status["soundsystem"]["version"] = self.soundsystem.version() self.status["soundsystem"]["active"] = self.soundsystem.is_active() self.status["soundsystem"]["uptime"] = self.soundsystem.uptime() self.status["soundsystem"]["io"] = self.get_io_state() self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status() #self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status() self.soundsystem.disable_transaction(self.soundsystem.client) self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status")) self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status")) self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status")) self.status["redis_ready"] = self.validate_redis_connection() self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder")) # Set overall status if self.has_valid_status(): self.status["engine_status"] = "OK" else: self.status["engine_status"] = "INVALID" def get_io_state(self): """ Retrieves all input and outputs provided by the engine. """ ios = self.soundsystem.engine_state() try: ios = ios.replace('"connected":', '"connected": ""') ios = json.loads(ios, strict=False) return ios except Exception as e: self.logger.warn("Got invalid JSON from soundsystem - " + str(e)) return "[ERROR]" def get_status(self): """ Retrieves the current monitoring status. """ self.update_status() return self.status def has_valid_status(self): """ Checks if the current status is valid to run engine """ try: if self.status["soundsystem"]["mixer"]["in_filesystem_0"] \ and self.status["redis_ready"] \ and self.status["audio_store"]["exists"]: return True return False except Exception as e: self.logger.error("Exception while validating engine status: " + str(e)) return False def validate_url_connection(self, url): """ Checks if connection to passed URL is successful. """ try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) response.read() except Exception: return False return True def validate_redis_connection(self): """ Checks if the connection to Redis is successful. """ try: cra = ClientRedisAdapter(self.config) cra.publish("aura", "status") except: return False return True def validate_directory(self, dir_path): """ Checks if a given directory is existing and holds content """ status = dict() status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path) status["has_content"] = False if status["exists"]: status["has_content"] = any([True for _ in os.scandir(dir_path)]) return status def get_url_response(self, url): """ Fetches JSON data from the given URL. Args: url (String): The API endpoint to call Returns: (dict[]): A Python object representing the JSON structure """ data = None try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) data = response.read() except (urllib.error.URLError, IOError, ValueError) as e: self.logger.error("Error while connecting to URL '%s' - %s" % (url, e)) return json.loads(data, strict=False)