Skip to content
Snippets Groups Projects
monitor.py 11.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • David Trattnig's avatar
    David Trattnig committed
    # Aura Engine (https://gitlab.servus.at/aura/engine)
    #
    # Copyright (C) 2017-2020 - The Aura Engine Team.
    
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    
    # You should have received a copy of the GNU Affero General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    
    
    
    import urllib
    import logging
    
    import json
    
    David Trattnig's avatar
    David Trattnig committed
    import threading
    
    David Trattnig's avatar
    David Trattnig committed
    
    from enum import Enum
    
    from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
    
    from modules.cli.redis.adapter import ClientRedisAdapter
    
    from modules.base.utils import SimpleUtil as SU
    from modules.base.mail import AuraMailer
    
    
    # Exceptions
    
    class EngineMalfunctionException(Exception):
        pass
    
    
    # Status Codes
    
    
    David Trattnig's avatar
    David Trattnig committed
    class MonitorResponseCode(Enum):
        OK = "OK"
        INVALID_STATE = "INVALID-STATE"
    
    
    
    
    class AuraMonitor:
    
    David Trattnig's avatar
    David Trattnig committed
        Engine Monitoring is in charge of:
    
        - Checking the overall status of all components and external API endpoints
        - Checking the vital parts, which are minimal requirements for running the engine
        - Sending a heartbeat to a defined server via socket
    
    
        """
        logger = None
        soundsystem = None
    
        status = None
    
        already_invalid = None
        engine_id = None
    
    David Trattnig's avatar
    David Trattnig committed
        heartbeat_server = None
        heartbeat_port = None
        heartbeat_frequency = None
        heartbeat_socket = None
        heartbeat_running = None
    
    
        def __init__(self, config, soundsystem):
    
            """
            Initialize Monitoring
            """
            self.logger = logging.getLogger("AuraEngine")
            self.config = config
            self.soundsystem = soundsystem
    
            self.mailer = AuraMailer(self.config)
    
            self.status = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["engine"] = dict()
    
            self.status["soundsystem"] = dict()
    
            self.status["redis"] = dict()
    
            self.status["api"] = dict()
    
            self.status["api"]["steering"] = dict()
            self.status["api"]["tank"] = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"] = dict()
    
            self.already_invalid = False
    
            # Register as an engine plugin
            self.soundsystem.plugins["monitor"] = self
    
    David Trattnig's avatar
    David Trattnig committed
            # Heartbeat settings
            self.heartbeat_running = False
            self.heartbeat_server = config.get("heartbeat_server")
            self.heartbeat_port = config.get("heartbeat_port")
            self.heartbeat_frequency = config.get("heartbeat_frequency")
            self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
    
    
            self.engine_id = self.get_engine_id()
    
        
        #
        # EVENTS
        #
    
        def on_boot(self):
            """
            Called when the engine is booting.
            """
            # Start Monitoring
            is_valid = self.has_valid_status(False)
            status = self.get_status()
            self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
            if not is_valid:
                self.logger.info("Engine Status: " + SU.red(status["engine"]["status"]))
                raise EngineMalfunctionException
            else:
                self.logger.info("Engine Status: " + SU.green("[OK]"))
    
    
    David Trattnig's avatar
    David Trattnig committed
    
        #
        # PUBLIC METHODS
        #
    
    
        def get_status(self):
            """
            Retrieves the current monitoring status.
            """
            return self.status
    
    
        def has_valid_status(self, update_vitality_only):
            """
            Checks if the current status is valid to run engine. By default it
            does not request new status information, rather using the cached one.
            To request new data either call `get_status()` before or use the
            `update_vital` parameter.
    
            Args:
                update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
            """
    
    David Trattnig's avatar
    David Trattnig committed
            is_valid = False
    
    
    David Trattnig's avatar
    David Trattnig committed
            if update_vitality_only:
                self.update_vitality_status()
            else:
                self.update_status()
                
            try:
                if self.status["soundsystem"]["active"] \
                    and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
    
                    and self.status["redis"]["active"] \
    
    David Trattnig's avatar
    David Trattnig committed
                    and self.status["audio_store"]["exists"]:
    
                    self.status["engine"]["status"] = MonitorResponseCode.OK.value
    
    David Trattnig's avatar
    David Trattnig committed
                    is_valid = True
    
    David Trattnig's avatar
    David Trattnig committed
                else:
                    self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
    
            except Exception as e:
                self.logger.error("Exception while validating engine status: " + str(e))
                self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
            
    
    David Trattnig's avatar
    David Trattnig committed
            return is_valid
    
    
        def update_status(self):
            """
            Requests the current status of all components
            """
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["engine"]["version"] = meta.__version__
    
            self.soundsystem.enable_transaction(self.soundsystem.client)
            self.status["soundsystem"]["version"]   = self.soundsystem.version()
            self.status["soundsystem"]["uptime"]    = self.soundsystem.uptime()
            self.status["soundsystem"]["io"]        = self.get_io_state()
    
            self.status["soundsystem"]["mixer"]     = self.soundsystem.mixer_status()
    
            #self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
    
            self.soundsystem.disable_transaction(self.soundsystem.client)
        
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["steering"]["url"]       = self.config.get("api_steering_status")
    
            self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["tank"]["url"]           = self.config.get("api_tank_status")
            self.status["api"]["tank"]["available"]     = self.validate_url_connection(self.config.get("api_tank_status"))
            self.status["api"]["tank"]["status"]        = self.get_url_response(self.config.get("api_tank_status"))
    
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"]["url"]         = self.config.get("exposed_api_url")
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"]["available"]   = self.validate_url_connection(self.config.get("exposed_api_url"))
    
    
    David Trattnig's avatar
    David Trattnig committed
            self.update_vitality_status()
    
    David Trattnig's avatar
    David Trattnig committed
        def update_vitality_status(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Refreshes the vital status info which are required for the engine to survive.
    
    David Trattnig's avatar
    David Trattnig committed
            self.soundsystem.enable_transaction(self.soundsystem.client)
    
            self.status["soundsystem"]["active"]  = self.soundsystem.is_active()
    
    David Trattnig's avatar
    David Trattnig committed
            self.soundsystem.disable_transaction(self.soundsystem.client)
    
    David Trattnig's avatar
    David Trattnig committed
    
    
            self.status["redis"]["active"] = self.validate_redis_connection()
            self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
    
    David Trattnig's avatar
    David Trattnig committed
            # After first update start the Heartbeat Monitor
    
    David Trattnig's avatar
    David Trattnig committed
            if not self.heartbeat_running:
                self.heartbeat_running = True
                if self.config.get("heartbeat_frequency") > 0:
                    self.heartbeat()
    
    David Trattnig's avatar
    David Trattnig committed
        def heartbeat(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
    
    David Trattnig's avatar
    David Trattnig committed
            a heartbeat is sent to the configured server.
    
    David Trattnig's avatar
    David Trattnig committed
            if self.has_valid_status(True):
    
    David Trattnig's avatar
    David Trattnig committed
                self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
    
                
                # Engine resurrected into normal state
                if self.already_invalid:
                    self.already_invalid = False
                    status = json.dumps(self.get_status())
    
                    self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status))
    
                    self.mailer.send_admin_mail( \
                        "OK - Engine turned back into some HEALTHY STATE!", \
                        "Things seem fine again at '%s':\n\n%s" % (self.engine_id, status))
            else:
                # Engine turned into invalid state
                if not self.already_invalid:
                    self.already_invalid = True
                    status = json.dumps(self.get_status())
    
                    self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status))
    
                    self.mailer.send_admin_mail( \
                        "ERROR - Engine turned into some INVALID STATE!", \
                        "There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status))
    
    David Trattnig's avatar
    David Trattnig committed
    
            threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
    
    David Trattnig's avatar
    David Trattnig committed
        def get_io_state(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Retrieves all input and outputs provided by the engine.
    
    David Trattnig's avatar
    David Trattnig committed
            ios = self.soundsystem.engine_state()
    
    David Trattnig's avatar
    David Trattnig committed
            try:
                ios = ios.replace('"connected":', '"connected": ""')
                ios = json.loads(ios, strict=False)
                return ios
    
            except Exception as e:
    
    David Trattnig's avatar
    David Trattnig committed
                self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
                return MonitorResponseCode.INVALID_STATE.value
    
    
    
        def validate_url_connection(self, url):
            """
            Checks if connection to passed URL is successful.
            """
            try:
                request = urllib.request.Request(url)
                response = urllib.request.urlopen(request)
                response.read()
            except Exception:
                return False
    
            return True
    
    
        def validate_redis_connection(self):
            """
            Checks if the connection to Redis is successful.
            """
            try:
                cra = ClientRedisAdapter(self.config)
                cra.publish("aura", "status")
            except:
                return False
    
            return True
    
    
        def validate_directory(self, dir_path):
            """
            Checks if a given directory is existing and holds content
            """
            status = dict()
    
            status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path)
    
            status["has_content"] = False
    
            if status["exists"]:
                status["has_content"] = any([True for _ in os.scandir(dir_path)])
    
    
            return status
    
    
        def get_url_response(self, url):
            """
            Fetches JSON data from the given URL.
    
            Args:
                url (String):       The API endpoint to call
            
            Returns:
                (dict[]):           A Python object representing the JSON structure
            """
            data = None
    
            try:
                request = urllib.request.Request(url)
                response = urllib.request.urlopen(request)
                data = response.read()
    
    David Trattnig's avatar
    David Trattnig committed
                return json.loads(data, strict=False)
    
            except (urllib.error.URLError, IOError, ValueError) as e:
                self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
    
    
    David Trattnig's avatar
    David Trattnig committed
            return MonitorResponseCode.INVALID_STATE.value
    
    
    
        def get_engine_id(self):
            """
            Retrieves a String identifier consisting of IP and Hostname to differentiate
            the engine in mails and status broadcasts.
            """
            host = platform.node()
            return "%s (%s)" % (self.get_ip(), host)
    
    
        def get_ip(self):
            """
            Returns the IP of the Engine instance.
            """
    
    David Trattnig's avatar
    David Trattnig committed
            try:
                s = socket(AF_INET, SOCK_DGRAM)
                s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
                s.connect(('<broadcast>', 0))
                return s.getsockname()[0]
            except:
    
                self.logger.critical(SU.red("Error while accessing network via <broadcast>!"))
    
    David Trattnig's avatar
    David Trattnig committed
                return "<UNKNOWN NETWORK>"