Forked from
AURA / engine
1704 commits behind, 780 commits ahead of the upstream repository.
-
David Trattnig authoredDavid Trattnig authored
monitor.py 12.55 KiB
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import datetime
import urllib
import logging
import json
import threading
import platform
import requests
from enum import Enum
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
import meta
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.base.mail import AuraMailer
# Exceptions
class EngineMalfunctionException(Exception):
pass
# Status Codes
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
class AuraMonitor:
"""
Engine Monitoring is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
logger = None
engine = None
mailer = None
status = None
already_invalid = None
engine_id = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, engine):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = AuraConfig.config()
self.engine = engine
self.mailer = AuraMailer(self.config)
self.status = dict()
self.status["engine"] = dict()
self.status["lqs"] = dict()
self.status["api"] = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
self.status["api"]["engine"] = dict()
self.already_invalid = False
# Register as an engine plugin
self.engine.plugins["monitor"] = self
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = self.config.get("heartbeat_server")
self.heartbeat_port = self.config.get("heartbeat_port")
self.heartbeat_frequency = self.config.get("heartbeat_frequency")
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
self.engine_id = self.get_engine_id()
#
# EVENTS
#
def on_boot(self):
"""
Called when the engine is booting.
"""
# Start Monitoring
is_valid = self.has_valid_status(False)
status = self.get_status()
self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
if not is_valid:
self.logger.info("Engine Status: " + SU.red(status["engine"]["status"]))
self.post_health(status, False)
raise EngineMalfunctionException
else:
self.logger.info("Engine Status: " + SU.green("[OK]"))
self.post_health(status, True)
def on_sick(self, data):
"""
Called when the engine is in some unhealthy state.
"""
self.post_health(data, False)
def on_resurrect(self, data):
"""
Called when the engine turned healthy again after being sick.
"""
self.post_health(data, True)
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieves the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Checks if the current status is valid to run engine. By default it
does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the
`update_vital` parameter.
Args:
update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
"""
is_valid = False
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
if self.status["lqs"]["active"] \
and self.status["lqs"]["mixer"]["in_filesystem_0"] \
and self.status["audio_source"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value
is_valid = True
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return is_valid
#
# PRIVATE METHODS
#
def post_health(self, data, is_healthy):
"""
Post unhealthy state info to Engine API.
"""
body = dict()
body["log_time"] = datetime.datetime.now()
body["is_healthy"] = is_healthy
body["details"] = json.dumps(data, default=str)
json_data = json.dumps(body, default=str)
url = self.config.get("api_engine_store_health")
url = url.replace("${ENGINE_NUMBER}", str(self.config.get("api_engine_number")))
headers = {'content-type': 'application/json'}
r = requests.post(url, data=json_data, headers=headers)
if r.status_code == 204:
self.logger.info("Successfully posted healthy=%s state to Engine API!" % is_healthy)
else:
self.logger.error("HTTP %s | Error while pushing health state to Engine API: %s" % (r.status_code, str(r.json())))
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.engine.player.connector.enable_transaction()
self.status["lqs"]["version"] = self.engine.version()
self.status["lqs"]["uptime"] = self.engine.uptime()
self.status["lqs"]["io"] = self.get_io_state()
self.status["lqs"]["mixer"] = self.engine.player.mixer.mixer_status()
self.status["lqs"]["mixer_fallback"] = self.engine.player.mixer_fallback.mixer_status()
self.engine.player.connector.disable_transaction()
self.status["api"]["steering"]["url"] = self.config.get("api_steering_status")
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["url"] = self.config.get("api_tank_status")
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("api_engine_status")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("api_engine_status"))
self.update_vitality_status()
def update_vitality_status(self):
"""
Refreshes the vital status info which are required for the engine to survive.
"""
self.engine.player.connector.enable_transaction()
self.status["lqs"]["active"] = self.engine.is_connected()
self.engine.player.connector.disable_transaction()
self.status["audio_source"] = self.validate_directory(self.config.get("audio_source_folder"))
# After first update start the Heartbeat Monitor
if not self.heartbeat_running:
self.heartbeat_running = True
if self.config.get("heartbeat_frequency") > 0:
self.heartbeat()
def heartbeat(self):
"""
Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
a heartbeat is sent to the configured server.
"""
if self.has_valid_status(True):
self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
# Engine resurrected into normal state
if self.already_invalid:
self.already_invalid = False
status = json.dumps(self.get_status())
self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status))
self.mailer.send_admin_mail( \
"OK - Engine turned back into some HEALTHY STATE!", \
"Things seem fine again at '%s':\n\n%s" % (self.engine_id, status))
# Route call of event via event dispatcher to provide ability for additional hooks
self.engine.event_dispatcher.on_resurrect(status)
else:
# Engine turned into invalid state
if not self.already_invalid:
self.already_invalid = True
status = json.dumps(self.get_status())
self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status))
self.mailer.send_admin_mail( \
"ERROR - Engine turned into some INVALID STATE!", \
"There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status))
# Route call of event via event dispatcher to provide ability for additional hooks
self.engine.event_dispatcher.on_sick(status)
threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
def get_io_state(self):
"""
Retrieves all input and outputs provided by the engine.
"""
ios = self.engine.engine_state()
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.warn("Got invalid JSON from Liquidsoap - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["path"] = dir_path
status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
return status
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
return json.loads(data, strict=False)
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
return MonitorResponseCode.INVALID_STATE.value
def get_engine_id(self):
"""
Retrieves a String identifier consisting of IP and Hostname to differentiate
the engine in mails and status broadcasts.
"""
host = platform.node()
return "%s (%s)" % (self.get_ip(), host)
def get_ip(self):
"""
Returns the IP of the Engine instance.
"""
try:
s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.connect(('<broadcast>', 0))
return s.getsockname()[0]
except:
self.logger.critical(SU.red("Error while accessing network via <broadcast>!"))
return "<UNKNOWN NETWORK>"