Skip to content
Snippets Groups Projects
monitor.py 8.76 KiB
Newer Older
  • Learn to ignore specific revisions
  • #
    # Aura Engine
    #
    # Copyright (C) 2020    David Trattnig <david.trattnig@subsquare.at>
    
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    
    # You should have received a copy of the GNU Affero General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    
    
    
    David Trattnig's avatar
    David Trattnig committed
    
    import sys
    
    import urllib
    import logging
    
    import json
    
    import os.path
    
    David Trattnig's avatar
    David Trattnig committed
    import threading
    
    from os import path
    
    David Trattnig's avatar
    David Trattnig committed
    from enum import Enum
    
    David Trattnig's avatar
    David Trattnig committed
    from socket import socket, AF_INET, SOCK_DGRAM
    from time import time, ctime, sleep
    
    David Trattnig's avatar
    David Trattnig committed
    import meta
    
    from modules.communication.redis.adapter import ClientRedisAdapter
    
    
    
    David Trattnig's avatar
    David Trattnig committed
    class MonitorResponseCode(Enum):
        OK = "OK"
        INVALID_STATE = "INVALID-STATE"
    
    
    
    
    class Monitoring:
        """
    
    David Trattnig's avatar
    David Trattnig committed
        Engine Monitoring is in charge of:
    
        - Checking the overall status of all components and external API endpoints
        - Checking the vital parts, which are minimal requirements for running the engine
        - Sending a heartbeat to a defined server via socket
    
    
        """
        logger = None
        soundsystem = None
        status = None
    
    
    David Trattnig's avatar
    David Trattnig committed
        heartbeat_server = None
        heartbeat_port = None
        heartbeat_frequency = None
        heartbeat_socket = None
        heartbeat_running = None
    
    
    
        def __init__(self, config, soundsystem):
            """
            Initialize Monitoring
            """
            self.logger = logging.getLogger("AuraEngine")
            self.config = config
            self.soundsystem = soundsystem
            self.status = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["engine"] = dict()
    
            self.status["soundsystem"] = dict()
    
            self.status["redis"] = dict()
    
            self.status["api"] = dict()
    
            self.status["api"]["steering"] = dict()
            self.status["api"]["tank"] = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"] = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            # Heartbeat settings
            self.heartbeat_running = False
            self.heartbeat_server = config.get("heartbeat_server")
            self.heartbeat_port = config.get("heartbeat_port")
            self.heartbeat_frequency = config.get("heartbeat_frequency")
            self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
    
    
    
        #
        # PUBLIC METHODS
        #
    
    
        def get_status(self):
            """
            Retrieves the current monitoring status.
            """
            return self.status
    
    
    
        def has_valid_status(self, update_vitality_only):
            """
            Checks if the current status is valid to run engine. By default it
            does not request new status information, rather using the cached one.
            To request new data either call `get_status()` before or use the
            `update_vital` parameter.
    
            Args:
                update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
            """
    
    David Trattnig's avatar
    David Trattnig committed
            is_valid = False
    
    
    David Trattnig's avatar
    David Trattnig committed
            if update_vitality_only:
                self.update_vitality_status()
            else:
                self.update_status()
                
            try:
                if self.status["soundsystem"]["active"] \
                    and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
    
                    and self.status["redis"]["active"] \
    
    David Trattnig's avatar
    David Trattnig committed
                    and self.status["audio_store"]["exists"]:
    
                    self.status["engine"]["status"] = MonitorResponseCode.OK.value
    
    David Trattnig's avatar
    David Trattnig committed
                    is_valid = True
    
    David Trattnig's avatar
    David Trattnig committed
                else:
                    self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
    
            except Exception as e:
                self.logger.error("Exception while validating engine status: " + str(e))
                self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
            
    
    David Trattnig's avatar
    David Trattnig committed
            return is_valid
    
    
        def update_status(self):
            """
            Requests the current status of all components
            """
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["engine"]["version"] = meta.__version__
    
            self.soundsystem.enable_transaction(self.soundsystem.client)
            self.status["soundsystem"]["version"]   = self.soundsystem.version()
            self.status["soundsystem"]["uptime"]    = self.soundsystem.uptime()
            self.status["soundsystem"]["io"]        = self.get_io_state()
            self.status["soundsystem"]["mixer"]     = self.soundsystem.get_mixer_status()
    
            #self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
    
            self.soundsystem.disable_transaction(self.soundsystem.client)
        
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["steering"]["url"]       = self.config.get("api_steering_status")
    
            self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["tank"]["url"]           = self.config.get("api_tank_status")
            self.status["api"]["tank"]["available"]     = self.validate_url_connection(self.config.get("api_tank_status"))
            self.status["api"]["tank"]["status"]        = self.get_url_response(self.config.get("api_tank_status"))
    
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"]["url"]         = self.config.get("exposed_api_url")
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"]["available"]   = self.validate_url_connection(self.config.get("exposed_api_url"))
    
    
    David Trattnig's avatar
    David Trattnig committed
            self.update_vitality_status()
    
    David Trattnig's avatar
    David Trattnig committed
        def update_vitality_status(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Refreshes the vital status info which are required for the engine to survive.
    
    David Trattnig's avatar
    David Trattnig committed
            self.soundsystem.enable_transaction(self.soundsystem.client)
    
            self.status["soundsystem"]["active"]  = self.soundsystem.is_active()
    
    David Trattnig's avatar
    David Trattnig committed
            self.soundsystem.disable_transaction(self.soundsystem.client)
    
    David Trattnig's avatar
    David Trattnig committed
    
    
            self.status["redis"]["active"] = self.validate_redis_connection()
            self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
    
    David Trattnig's avatar
    David Trattnig committed
            # After first update start the Heartbeat Monitor
    
    David Trattnig's avatar
    David Trattnig committed
            if not self.heartbeat_running:
                self.heartbeat_running = True
                if self.config.get("heartbeat_frequency") > 0:
                    self.heartbeat()
    
    David Trattnig's avatar
    David Trattnig committed
    
        def heartbeat(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
    
    David Trattnig's avatar
    David Trattnig committed
            a heartbeat is sent to the configured server.
    
    David Trattnig's avatar
    David Trattnig committed
            if self.has_valid_status(True):
    
    David Trattnig's avatar
    David Trattnig committed
                self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
    
    David Trattnig's avatar
    David Trattnig committed
    
            threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
    
    David Trattnig's avatar
    David Trattnig committed
        def get_io_state(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Retrieves all input and outputs provided by the engine.
    
    David Trattnig's avatar
    David Trattnig committed
            ios = self.soundsystem.engine_state()
    
    David Trattnig's avatar
    David Trattnig committed
            try:
                ios = ios.replace('"connected":', '"connected": ""')
                ios = json.loads(ios, strict=False)
                return ios
    
            except Exception as e:
    
    David Trattnig's avatar
    David Trattnig committed
                self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
                return MonitorResponseCode.INVALID_STATE.value
    
    
    
    
        def validate_url_connection(self, url):
            """
            Checks if connection to passed URL is successful.
            """
            try:
                request = urllib.request.Request(url)
                response = urllib.request.urlopen(request)
                response.read()
            except Exception:
                return False
    
            return True
    
    
    
        def validate_redis_connection(self):
            """
            Checks if the connection to Redis is successful.
            """
            try:
                cra = ClientRedisAdapter(self.config)
                cra.publish("aura", "status")
            except:
                return False
    
            return True
    
    
    
        def validate_directory(self, dir_path):
            """
            Checks if a given directory is existing and holds content
            """
            status = dict()
            status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path)
            status["has_content"] = False
    
            if status["exists"]:
                status["has_content"] = any([True for _ in os.scandir(dir_path)])
    
    
            return status
    
    
    
    David Trattnig's avatar
    David Trattnig committed
    
    
        def get_url_response(self, url):
            """
            Fetches JSON data from the given URL.
    
            Args:
                url (String):       The API endpoint to call
            
            Returns:
                (dict[]):           A Python object representing the JSON structure
            """
            data = None
    
            try:
                request = urllib.request.Request(url)
                response = urllib.request.urlopen(request)
                data = response.read()
    
    David Trattnig's avatar
    David Trattnig committed
                return json.loads(data, strict=False)
    
            except (urllib.error.URLError, IOError, ValueError) as e:
                self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
    
    
    David Trattnig's avatar
    David Trattnig committed
            return MonitorResponseCode.INVALID_STATE.value