# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import meta import sys import urllib import logging import json import os.path import threading import platform from os import path from enum import Enum from time import time, ctime, sleep from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST from modules.communication.redis.adapter import ClientRedisAdapter from modules.base.utils import SimpleUtil class MonitorResponseCode(Enum): OK = "OK" INVALID_STATE = "INVALID-STATE" class Monitoring: """ Engine Monitoring is in charge of: - Checking the overall status of all components and external API endpoints - Checking the vital parts, which are minimal requirements for running the engine - Sending a heartbeat to a defined server via socket """ logger = None soundsystem = None mailer = None status = None already_invalid = None engine_id = None heartbeat_server = None heartbeat_port = None heartbeat_frequency = None heartbeat_socket = None heartbeat_running = None def __init__(self, config, soundsystem, mailer): """ Initialize Monitoring """ self.logger = logging.getLogger("AuraEngine") self.config = config self.soundsystem = soundsystem self.mailer = mailer self.status = dict() self.status["engine"] = dict() self.status["soundsystem"] = dict() self.status["redis"] = dict() self.status["api"] = dict() self.status["api"]["steering"] = dict() self.status["api"]["tank"] = dict() self.status["api"]["engine"] = dict() self.already_invalid = False # Heartbeat settings self.heartbeat_running = False self.heartbeat_server = config.get("heartbeat_server") self.heartbeat_port = config.get("heartbeat_port") self.heartbeat_frequency = config.get("heartbeat_frequency") self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM) self.engine_id = self.get_engine_id() # # PUBLIC METHODS # def get_status(self): """ Retrieves the current monitoring status. """ return self.status def has_valid_status(self, update_vitality_only): """ Checks if the current status is valid to run engine. By default it does not request new status information, rather using the cached one. To request new data either call `get_status()` before or use the `update_vital` parameter. Args: update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat """ is_valid = False if update_vitality_only: self.update_vitality_status() else: self.update_status() try: if self.status["soundsystem"]["active"] \ and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \ and self.status["redis"]["active"] \ and self.status["audio_store"]["exists"]: self.status["engine"]["status"] = MonitorResponseCode.OK.value is_valid = True else: self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value except Exception as e: self.logger.error("Exception while validating engine status: " + str(e)) self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value return is_valid # # PRIVATE METHODS # def update_status(self): """ Requests the current status of all components """ self.status["engine"]["version"] = meta.__version__ self.soundsystem.enable_transaction(self.soundsystem.client) self.status["soundsystem"]["version"] = self.soundsystem.version() self.status["soundsystem"]["uptime"] = self.soundsystem.uptime() self.status["soundsystem"]["io"] = self.get_io_state() self.status["soundsystem"]["mixer"] = self.soundsystem.mixer_status() #self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status() self.soundsystem.disable_transaction(self.soundsystem.client) self.status["api"]["steering"]["url"] = self.config.get("api_steering_status") self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status")) self.status["api"]["tank"]["url"] = self.config.get("api_tank_status") self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status")) self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status")) self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url") self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url")) self.update_vitality_status() def update_vitality_status(self): """ Refreshes the vital status info which are required for the engine to survive. """ self.soundsystem.enable_transaction(self.soundsystem.client) self.status["soundsystem"]["active"] = self.soundsystem.is_active() self.soundsystem.disable_transaction(self.soundsystem.client) self.status["redis"]["active"] = self.validate_redis_connection() self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder")) # After first update start the Heartbeat Monitor if not self.heartbeat_running: self.heartbeat_running = True if self.config.get("heartbeat_frequency") > 0: self.heartbeat() def heartbeat(self): """ Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay, a heartbeat is sent to the configured server. """ if self.has_valid_status(True): self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port)) # Engine resurrected into normal state if self.already_invalid: self.already_invalid = False status = json.dumps(self.get_status()) self.logger.info(SimpleUtil.green("OK - Engine turned back into some healthy state!")+"\n"+str(status)) self.mailer.send_admin_mail( \ "OK - Engine turned back into some HEALTHY STATE!", \ "Things seem fine again at '%s':\n\n%s" % (self.engine_id, status)) else: # Engine turned into invalid state if not self.already_invalid: self.already_invalid = True status = json.dumps(self.get_status()) self.logger.critical(SimpleUtil.red("Engine turned into some INVALID STATE!")+"\n"+str(status)) self.mailer.send_admin_mail( \ "ERROR - Engine turned into some INVALID STATE!", \ "There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status)) threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start() def get_io_state(self): """ Retrieves all input and outputs provided by the engine. """ ios = self.soundsystem.engine_state() try: ios = ios.replace('"connected":', '"connected": ""') ios = json.loads(ios, strict=False) return ios except Exception as e: self.logger.warn("Got invalid JSON from soundsystem - " + str(e)) return MonitorResponseCode.INVALID_STATE.value def validate_url_connection(self, url): """ Checks if connection to passed URL is successful. """ try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) response.read() except Exception: return False return True def validate_redis_connection(self): """ Checks if the connection to Redis is successful. """ try: cra = ClientRedisAdapter(self.config) cra.publish("aura", "status") except: return False return True def validate_directory(self, dir_path): """ Checks if a given directory is existing and holds content """ status = dict() status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path) status["has_content"] = False if status["exists"]: status["has_content"] = any([True for _ in os.scandir(dir_path)]) return status def get_url_response(self, url): """ Fetches JSON data from the given URL. Args: url (String): The API endpoint to call Returns: (dict[]): A Python object representing the JSON structure """ data = None try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) data = response.read() return json.loads(data, strict=False) except (urllib.error.URLError, IOError, ValueError) as e: self.logger.error("Error while connecting to URL '%s' - %s" % (url, e)) return MonitorResponseCode.INVALID_STATE.value def get_engine_id(self): """ Retrieves a String identifier consisting of IP and Hostname to differentiate the engine in mails and status broadcasts. """ host = platform.node() return "%s (%s)" % (self.get_ip(), host) def get_ip(self): """ Returns the IP of the Engine instance. """ try: s = socket(AF_INET, SOCK_DGRAM) s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) s.connect(('<broadcast>', 0)) return s.getsockname()[0] except: self.logger.critical(SimpleUtil.red("Error while accessing network via <broadcast>!")) return "<UNKNOWN NETWORK>"