Newer
Older
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
from modules.cli.redis.adapter import ClientRedisAdapter
from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer
# Exceptions
class EngineMalfunctionException(Exception):
pass
# Status Codes
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
Engine Monitoring is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
logger = None
soundsystem = None
already_invalid = None
engine_id = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, config, soundsystem):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.mailer = AuraMailer(self.config)
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
self.already_invalid = False
# Register as an engine plugin
self.soundsystem.plugins["monitor"] = self
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = config.get("heartbeat_server")
self.heartbeat_port = config.get("heartbeat_port")
self.heartbeat_frequency = config.get("heartbeat_frequency")
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
self.engine_id = self.get_engine_id()
#
# EVENTS
#
def on_boot(self):
"""
Called when the engine is booting.
"""
# Start Monitoring
is_valid = self.has_valid_status(False)
status = self.get_status()
self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
if not is_valid:
self.logger.info("Engine Status: " + SU.red(status["engine"]["status"]))
raise EngineMalfunctionException
else:
self.logger.info("Engine Status: " + SU.green("[OK]"))
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieves the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Checks if the current status is valid to run engine. By default it
does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the
`update_vital` parameter.
Args:
update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
"""
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
if self.status["soundsystem"]["active"] \
and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["audio_store"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.mixer_status()
#self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["api"]["steering"]["url"] = self.config.get("api_steering_status")
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["url"] = self.config.get("api_tank_status")
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url"))
Refreshes the vital status info which are required for the engine to survive.
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["redis"]["active"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
# After first update start the Heartbeat Monitor
if not self.heartbeat_running:
self.heartbeat_running = True
if self.config.get("heartbeat_frequency") > 0:
self.heartbeat()
Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
# Engine resurrected into normal state
if self.already_invalid:
self.already_invalid = False
status = json.dumps(self.get_status())
self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status))
self.mailer.send_admin_mail( \
"OK - Engine turned back into some HEALTHY STATE!", \
"Things seem fine again at '%s':\n\n%s" % (self.engine_id, status))
else:
# Engine turned into invalid state
if not self.already_invalid:
self.already_invalid = True
status = json.dumps(self.get_status())
self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status))
self.mailer.send_admin_mail( \
"ERROR - Engine turned into some INVALID STATE!", \
"There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status))
threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
Retrieves all input and outputs provided by the engine.
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
return status
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
def get_engine_id(self):
"""
Retrieves a String identifier consisting of IP and Hostname to differentiate
the engine in mails and status broadcasts.
"""
host = platform.node()
return "%s (%s)" % (self.get_ip(), host)
def get_ip(self):
"""
Returns the IP of the Engine instance.
"""
try:
s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.connect(('<broadcast>', 0))
return s.getsockname()[0]
except:
self.logger.critical(SU.red("Error while accessing network via <broadcast>!"))