# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import datetime import urllib import logging import json import requests import threading import platform from enum import Enum from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST import meta from modules.cli.redis.adapter import ClientRedisAdapter from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU from modules.base.mail import AuraMailer # Exceptions class EngineMalfunctionException(Exception): pass # Status Codes class MonitorResponseCode(Enum): OK = "OK" INVALID_STATE = "INVALID-STATE" class AuraMonitor: """ Engine Monitoring is in charge of: - Checking the overall status of all components and external API endpoints - Checking the vital parts, which are minimal requirements for running the engine - Sending a heartbeat to a defined server via socket """ logger = None engine = None mailer = None status = None already_invalid = None engine_id = None heartbeat_server = None heartbeat_port = None heartbeat_frequency = None heartbeat_socket = None heartbeat_running = None def __init__(self, engine): """ Initialize Monitoring """ self.logger = logging.getLogger("AuraEngine") self.config = AuraConfig.config() self.engine = engine self.mailer = AuraMailer(self.config) self.status = dict() self.status["engine"] = dict() self.status["lqs"] = dict() self.status["redis"] = dict() self.status["api"] = dict() self.status["api"]["steering"] = dict() self.status["api"]["tank"] = dict() self.status["api"]["engine"] = dict() self.already_invalid = False # Register as an engine plugin self.engine.plugins["monitor"] = self # Heartbeat settings self.heartbeat_running = False self.heartbeat_server = self.config.get("heartbeat_server") self.heartbeat_port = self.config.get("heartbeat_port") self.heartbeat_frequency = self.config.get("heartbeat_frequency") self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM) self.engine_id = self.get_engine_id() # # EVENTS # def on_boot(self): """ Called when the engine is booting. """ # Start Monitoring is_valid = self.has_valid_status(False) status = self.get_status() self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4)) if not is_valid: self.logger.info("Engine Status: " + SU.red(status["engine"]["status"])) self.post_health(status, False) raise EngineMalfunctionException else: self.logger.info("Engine Status: " + SU.green("[OK]")) self.post_health(status, True) def on_sick(self, data): """ Called when the engine is in some unhealthy state. """ self.post_health(data, False) def on_resurrect(self, data): """ Called when the engine turned healthy again after being sick. """ self.post_health(data, True) # # PUBLIC METHODS # def get_status(self): """ Retrieves the current monitoring status. """ return self.status def has_valid_status(self, update_vitality_only): """ Checks if the current status is valid to run engine. By default it does not request new status information, rather using the cached one. To request new data either call `get_status()` before or use the `update_vital` parameter. Args: update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat """ is_valid = False if update_vitality_only: self.update_vitality_status() else: self.update_status() try: if self.status["lqs"]["active"] \ and self.status["lqs"]["mixer"]["in_filesystem_0"] \ and self.status["redis"]["active"] \ and self.status["audio_source"]["exists"]: self.status["engine"]["status"] = MonitorResponseCode.OK.value is_valid = True else: self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value except Exception as e: self.logger.error("Exception while validating engine status: " + str(e)) self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value return is_valid # # PRIVATE METHODS # def post_health(self, data, is_healthy): """ Post unhealthy state info to Engine API. """ body = dict() body["log_time"] = datetime.datetime.now() body["is_healthy"] = is_healthy body["details"] = json.dumps(data, default=str) json_data = json.dumps(body, default=str) url = self.config.get("api_engine_store_health") url = url.replace("${ENGINE_NUMBER}", str(self.config.get("api_engine_number"))) headers = {'content-type': 'application/json'} r = requests.post(url, data=json_data, headers=headers) if r.status_code == 204: self.logger.info("Successfully posted healthy=%s state to Engine API!" % is_healthy) else: self.logger.error("HTTP %s | Error while pushing health state to Engine API: %s" % (r.status_code, str(r.json()))) def update_status(self): """ Requests the current status of all components """ self.status["engine"]["version"] = meta.__version__ self.engine.player.connector.enable_transaction() self.status["lqs"]["version"] = self.engine.version() self.status["lqs"]["uptime"] = self.engine.uptime() self.status["lqs"]["io"] = self.get_io_state() self.status["lqs"]["mixer"] = self.engine.player.mixer.mixer_status() self.status["lqs"]["mixer_fallback"] = self.engine.player.mixer_fallback.mixer_status() self.engine.player.connector.disable_transaction() self.status["api"]["steering"]["url"] = self.config.get("api_steering_status") self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status")) self.status["api"]["tank"]["url"] = self.config.get("api_tank_status") self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status")) self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status")) self.status["api"]["engine"]["url"] = self.config.get("api_engine_status") self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("api_engine_status")) self.update_vitality_status() def update_vitality_status(self): """ Refreshes the vital status info which are required for the engine to survive. """ self.engine.player.connector.enable_transaction() self.status["lqs"]["active"] = self.engine.is_active() self.engine.player.connector.disable_transaction() self.status["redis"]["active"] = self.validate_redis_connection() self.status["audio_source"] = self.validate_directory(self.config.get("audio_source_folder")) # After first update start the Heartbeat Monitor if not self.heartbeat_running: self.heartbeat_running = True if self.config.get("heartbeat_frequency") > 0: self.heartbeat() def heartbeat(self): """ Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay, a heartbeat is sent to the configured server. """ if self.has_valid_status(True): self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port)) # Engine resurrected into normal state if self.already_invalid: self.already_invalid = False status = json.dumps(self.get_status()) self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status)) self.mailer.send_admin_mail( \ "OK - Engine turned back into some HEALTHY STATE!", \ "Things seem fine again at '%s':\n\n%s" % (self.engine_id, status)) # Route call of event via event dispatcher to provide ability for additional hooks self.engine.event_dispatcher.on_resurrect(status) else: # Engine turned into invalid state if not self.already_invalid: self.already_invalid = True status = json.dumps(self.get_status()) self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status)) self.mailer.send_admin_mail( \ "ERROR - Engine turned into some INVALID STATE!", \ "There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status)) # Route call of event via event dispatcher to provide ability for additional hooks self.engine.event_dispatcher.on_sick(status) threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start() def get_io_state(self): """ Retrieves all input and outputs provided by the engine. """ ios = self.engine.engine_state() try: ios = ios.replace('"connected":', '"connected": ""') ios = json.loads(ios, strict=False) return ios except Exception as e: self.logger.warn("Got invalid JSON from Liquidsoap - " + str(e)) return MonitorResponseCode.INVALID_STATE.value def validate_url_connection(self, url): """ Checks if connection to passed URL is successful. """ try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) response.read() except Exception: return False return True def validate_redis_connection(self): """ Checks if the connection to Redis is successful. """ try: cra = ClientRedisAdapter(self.config) cra.publish("aura", "status") except: return False return True def validate_directory(self, dir_path): """ Checks if a given directory is existing and holds content """ status = dict() status["path"] = dir_path status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path) status["has_content"] = False if status["exists"]: status["has_content"] = any([True for _ in os.scandir(dir_path)]) return status def get_url_response(self, url): """ Fetches JSON data from the given URL. Args: url (String): The API endpoint to call Returns: (dict[]): A Python object representing the JSON structure """ data = None try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) data = response.read() return json.loads(data, strict=False) except (urllib.error.URLError, IOError, ValueError) as e: self.logger.error("Error while connecting to URL '%s' - %s" % (url, e)) return MonitorResponseCode.INVALID_STATE.value def get_engine_id(self): """ Retrieves a String identifier consisting of IP and Hostname to differentiate the engine in mails and status broadcasts. """ host = platform.node() return "%s (%s)" % (self.get_ip(), host) def get_ip(self): """ Returns the IP of the Engine instance. """ try: s = socket(AF_INET, SOCK_DGRAM) s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) s.connect(('<broadcast>', 0)) return s.getsockname()[0] except: self.logger.critical(SU.red("Error while accessing network via <broadcast>!")) return "<UNKNOWN NETWORK>"