# # Aura Engine # # Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys import urllib import logging import json import os.path import threading from os import path from enum import Enum from socket import socket, AF_INET, SOCK_DGRAM from time import time, ctime, sleep import meta from modules.communication.redis.adapter import ClientRedisAdapter class MonitorResponseCode(Enum): OK = "OK" INVALID_STATE = "INVALID-STATE" class Monitoring: """ Engine Monitoring is in charge of: - Checking the overall status of all components and external API endpoints - Checking the vital parts, which are minimal requirements for running the engine - Sending a heartbeat to a defined server via socket """ logger = None soundsystem = None status = None heartbeat_server = None heartbeat_port = None heartbeat_frequency = None heartbeat_socket = None heartbeat_running = None def __init__(self, config, soundsystem): """ Initialize Monitoring """ self.logger = logging.getLogger("AuraEngine") self.config = config self.soundsystem = soundsystem self.status = dict() self.status["engine"] = dict() self.status["soundsystem"] = dict() self.status["api"] = dict() self.status["api"]["steering"] = dict() self.status["api"]["tank"] = dict() self.status["api"]["engine"] = dict() # Heartbeat settings self.heartbeat_running = False self.heartbeat_server = config.get("heartbeat_server") self.heartbeat_port = config.get("heartbeat_port") self.heartbeat_frequency = config.get("heartbeat_frequency") self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM) # # PUBLIC METHODS # def get_status(self): """ Retrieves the current monitoring status. """ return self.status def has_valid_status(self, update_vitality_only): """ Checks if the current status is valid to run engine. By default it does not request new status information, rather using the cached one. To request new data either call `get_status()` before or use the `update_vital` parameter. Args: update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat """ is_valid = False if update_vitality_only: self.update_vitality_status() else: self.update_status() try: if self.status["soundsystem"]["active"] \ and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \ and self.status["redis_ready"] \ and self.status["audio_store"]["exists"]: self.status["engine"]["status"] = MonitorResponseCode.OK.value is_valid = True else: self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value except Exception as e: self.logger.error("Exception while validating engine status: " + str(e)) self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value return is_valid # # PRIVATE METHODS # def update_status(self): """ Requests the current status of all components """ self.status["engine"]["version"] = meta.__version__ self.soundsystem.enable_transaction(self.soundsystem.client) self.status["soundsystem"]["version"] = self.soundsystem.version() self.status["soundsystem"]["uptime"] = self.soundsystem.uptime() self.status["soundsystem"]["io"] = self.get_io_state() self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status() #self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status() self.soundsystem.disable_transaction(self.soundsystem.client) self.status["api"]["steering"]["url"] = self.config.get("api_steering_status") self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status")) self.status["api"]["tank"]["url"] = self.config.get("api_tank_status") self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status")) self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status")) self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url") self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url")) self.update_vitality_status() def update_vitality_status(self): """ Refreshes the vital status info which are required for the engine to survive. """ self.soundsystem.enable_transaction(self.soundsystem.client) self.status["soundsystem"]["active"] = self.soundsystem.is_active() self.soundsystem.disable_transaction(self.soundsystem.client) self.status["redis_ready"] = self.validate_redis_connection() self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder")) # After first update start the Heartbeat Monitor if not self.heartbeat_running: self.heartbeat_running = True if self.config.get("heartbeat_frequency") > 0: self.heartbeat() def heartbeat(self): """ Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay, a heartbeat is sent to the configured server. """ if self.has_valid_status(True): self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port)) threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start() def get_io_state(self): """ Retrieves all input and outputs provided by the engine. """ ios = self.soundsystem.engine_state() try: ios = ios.replace('"connected":', '"connected": ""') ios = json.loads(ios, strict=False) return ios except Exception as e: self.logger.warn("Got invalid JSON from soundsystem - " + str(e)) return MonitorResponseCode.INVALID_STATE.value def validate_url_connection(self, url): """ Checks if connection to passed URL is successful. """ try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) response.read() except Exception: return False return True def validate_redis_connection(self): """ Checks if the connection to Redis is successful. """ try: cra = ClientRedisAdapter(self.config) cra.publish("aura", "status") except: return False return True def validate_directory(self, dir_path): """ Checks if a given directory is existing and holds content """ status = dict() status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path) status["has_content"] = False if status["exists"]: status["has_content"] = any([True for _ in os.scandir(dir_path)]) return status def get_url_response(self, url): """ Fetches JSON data from the given URL. Args: url (String): The API endpoint to call Returns: (dict[]): A Python object representing the JSON structure """ data = None try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) data = response.read() return json.loads(data, strict=False) except (urllib.error.URLError, IOError, ValueError) as e: self.logger.error("Error while connecting to URL '%s' - %s" % (url, e)) return MonitorResponseCode.INVALID_STATE.value