Forked from
AURA / engine
1704 commits behind, 668 commits ahead of the upstream repository.
-
David Trattnig authoredDavid Trattnig authored
monitor.py 11.42 KiB
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import urllib
import logging
import json
import threading
import platform
from enum import Enum
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
import meta
from modules.cli.redis.adapter import ClientRedisAdapter
from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer
# Exceptions
class EngineMalfunctionException(Exception):
pass
# Status Codes
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
class AuraMonitor:
"""
Engine Monitoring is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
logger = None
soundsystem = None
mailer = None
status = None
already_invalid = None
engine_id = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, config, soundsystem):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.mailer = AuraMailer(self.config)
self.status = dict()
self.status["engine"] = dict()
self.status["soundsystem"] = dict()
self.status["redis"] = dict()
self.status["api"] = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
self.status["api"]["engine"] = dict()
self.already_invalid = False
# Register as an engine plugin
self.soundsystem.plugins["monitor"] = self
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = config.get("heartbeat_server")
self.heartbeat_port = config.get("heartbeat_port")
self.heartbeat_frequency = config.get("heartbeat_frequency")
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
self.engine_id = self.get_engine_id()
#
# EVENTS
#
def on_boot(self):
"""
Called when the engine is booting.
"""
# Start Monitoring
is_valid = self.has_valid_status(False)
status = self.get_status()
self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
if not is_valid:
self.logger.info("Engine Status: " + SU.red(status["engine"]["status"]))
raise EngineMalfunctionException
else:
self.logger.info("Engine Status: " + SU.green("[OK]"))
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieves the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Checks if the current status is valid to run engine. By default it
does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the
`update_vital` parameter.
Args:
update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
"""
is_valid = False
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
if self.status["soundsystem"]["active"] \
and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis"]["active"] \
and self.status["audio_store"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value
is_valid = True
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return is_valid
#
# PRIVATE METHODS
#
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.mixer_status()
#self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["api"]["steering"]["url"] = self.config.get("api_steering_status")
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["url"] = self.config.get("api_tank_status")
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url"))
self.update_vitality_status()
def update_vitality_status(self):
"""
Refreshes the vital status info which are required for the engine to survive.
"""
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["redis"]["active"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
# After first update start the Heartbeat Monitor
if not self.heartbeat_running:
self.heartbeat_running = True
if self.config.get("heartbeat_frequency") > 0:
self.heartbeat()
def heartbeat(self):
"""
Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
a heartbeat is sent to the configured server.
"""
if self.has_valid_status(True):
self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
# Engine resurrected into normal state
if self.already_invalid:
self.already_invalid = False
status = json.dumps(self.get_status())
self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status))
self.mailer.send_admin_mail( \
"OK - Engine turned back into some HEALTHY STATE!", \
"Things seem fine again at '%s':\n\n%s" % (self.engine_id, status))
else:
# Engine turned into invalid state
if not self.already_invalid:
self.already_invalid = True
status = json.dumps(self.get_status())
self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status))
self.mailer.send_admin_mail( \
"ERROR - Engine turned into some INVALID STATE!", \
"There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status))
threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
def get_io_state(self):
"""
Retrieves all input and outputs provided by the engine.
"""
ios = self.soundsystem.engine_state()
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
return status
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
return json.loads(data, strict=False)
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
return MonitorResponseCode.INVALID_STATE.value
def get_engine_id(self):
"""
Retrieves a String identifier consisting of IP and Hostname to differentiate
the engine in mails and status broadcasts.
"""
host = platform.node()
return "%s (%s)" % (self.get_ip(), host)
def get_ip(self):
"""
Returns the IP of the Engine instance.
"""
try:
s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.connect(('<broadcast>', 0))
return s.getsockname()[0]
except:
self.logger.critical(SU.red("Error while accessing network via <broadcast>!"))
return "<UNKNOWN NETWORK>"