Skip to content
Snippets Groups Projects
monitor.py 13 KiB
Newer Older
  • Learn to ignore specific revisions
  • David Trattnig's avatar
    David Trattnig committed
    # Aura Engine (https://gitlab.servus.at/aura/engine)
    #
    # Copyright (C) 2017-2020 - The Aura Engine Team.
    
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    
    # You should have received a copy of the GNU Affero General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    
    
    
    David Trattnig's avatar
    David Trattnig committed
    import datetime
    
    import urllib
    import logging
    
    import json
    
    David Trattnig's avatar
    David Trattnig committed
    import requests
    
    David Trattnig's avatar
    David Trattnig committed
    import threading
    
    David Trattnig's avatar
    David Trattnig committed
    
    from enum import Enum
    
    from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
    
    from modules.cli.redis.adapter import ClientRedisAdapter
    
    from modules.base.config import AuraConfig
    
    from modules.base.utils import SimpleUtil as SU
    from modules.base.mail import AuraMailer
    
    
    # Exceptions
    
    class EngineMalfunctionException(Exception):
        pass
    
    
    # Status Codes
    
    
    David Trattnig's avatar
    David Trattnig committed
    class MonitorResponseCode(Enum):
        OK = "OK"
        INVALID_STATE = "INVALID-STATE"
    
    
    
    
    class AuraMonitor:
    
    David Trattnig's avatar
    David Trattnig committed
        Engine Monitoring is in charge of:
    
        - Checking the overall status of all components and external API endpoints
        - Checking the vital parts, which are minimal requirements for running the engine
        - Sending a heartbeat to a defined server via socket
    
    
        """
        logger = None
    
    David Trattnig's avatar
    David Trattnig committed
        engine = None
    
        status = None
    
        already_invalid = None
        engine_id = None
    
    David Trattnig's avatar
    David Trattnig committed
        heartbeat_server = None
        heartbeat_port = None
        heartbeat_frequency = None
        heartbeat_socket = None
        heartbeat_running = None
    
    
        def __init__(self, engine):
    
            """
            Initialize Monitoring
            """
            self.logger = logging.getLogger("AuraEngine")
    
            self.config = AuraConfig.config()
    
    David Trattnig's avatar
    David Trattnig committed
            self.engine = engine
    
            self.mailer = AuraMailer(self.config)
    
            self.status = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["engine"] = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["lqs"] = dict()
    
            self.status["redis"] = dict()
    
            self.status["api"] = dict()
    
            self.status["api"]["steering"] = dict()
            self.status["api"]["tank"] = dict()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["engine"] = dict()
    
            self.already_invalid = False
    
            # Register as an engine plugin
    
    David Trattnig's avatar
    David Trattnig committed
            self.engine.plugins["monitor"] = self
    
    David Trattnig's avatar
    David Trattnig committed
            # Heartbeat settings
            self.heartbeat_running = False
    
            self.heartbeat_server = self.config.get("heartbeat_server")
            self.heartbeat_port = self.config.get("heartbeat_port")
            self.heartbeat_frequency = self.config.get("heartbeat_frequency")
    
    David Trattnig's avatar
    David Trattnig committed
            self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
    
    
            self.engine_id = self.get_engine_id()
    
        
        #
        # EVENTS
        #
    
        def on_boot(self):
            """
            Called when the engine is booting.
            """
            # Start Monitoring
            is_valid = self.has_valid_status(False)
            status = self.get_status()
            self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
            if not is_valid:
                self.logger.info("Engine Status: " + SU.red(status["engine"]["status"]))
    
    David Trattnig's avatar
    David Trattnig committed
                self.post_health(status, False)
    
                raise EngineMalfunctionException
            else:
                self.logger.info("Engine Status: " + SU.green("[OK]"))
    
    David Trattnig's avatar
    David Trattnig committed
                self.post_health(status, True)
            
    
    
        def on_sick(self, data):
            """
            Called when the engine is in some unhealthy state.
            """
            self.post_health(data, False)
    
    
        def on_resurrect(self, data):
            """
            Called when the engine turned healthy again after being sick.
            """
            self.post_health(data, True)
    
    David Trattnig's avatar
    David Trattnig committed
    
        #
        # PUBLIC METHODS
        #
    
    
        def get_status(self):
            """
            Retrieves the current monitoring status.
            """
            return self.status
    
    
        def has_valid_status(self, update_vitality_only):
            """
            Checks if the current status is valid to run engine. By default it
            does not request new status information, rather using the cached one.
            To request new data either call `get_status()` before or use the
            `update_vital` parameter.
    
            Args:
                update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
            """
    
    David Trattnig's avatar
    David Trattnig committed
            is_valid = False
    
    
    David Trattnig's avatar
    David Trattnig committed
            if update_vitality_only:
                self.update_vitality_status()
            else:
                self.update_status()
                
            try:
    
    David Trattnig's avatar
    David Trattnig committed
                if self.status["lqs"]["active"] \
                    and self.status["lqs"]["mixer"]["in_filesystem_0"] \
    
                    and self.status["redis"]["active"] \
    
    David Trattnig's avatar
    David Trattnig committed
                    and self.status["audio_source"]["exists"]:
    
    David Trattnig's avatar
    David Trattnig committed
    
                    self.status["engine"]["status"] = MonitorResponseCode.OK.value
    
    David Trattnig's avatar
    David Trattnig committed
                    is_valid = True
    
    David Trattnig's avatar
    David Trattnig committed
                else:
                    self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
    
            except Exception as e:
                self.logger.error("Exception while validating engine status: " + str(e))
                self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
            
    
    David Trattnig's avatar
    David Trattnig committed
            return is_valid
    
    David Trattnig's avatar
    David Trattnig committed
        def post_health(self, data, is_healthy):
            """
            Post unhealthy state info to Engine API.
            """
            body = dict()
            body["log_time"] = datetime.datetime.now()
            body["is_healthy"] = is_healthy
            body["details"] = json.dumps(data, default=str)
            json_data = json.dumps(body, default=str)
    
            url = self.config.get("api_engine_store_health")
            url = url.replace("${ENGINE_NUMBER}", str(self.config.get("api_engine_number")))
            headers = {'content-type': 'application/json'}
            r = requests.post(url, data=json_data, headers=headers)
    
            if r.status_code == 204:
                self.logger.info("Successfully posted healthy=%s state to Engine API!" % is_healthy)
            else:
                self.logger.error("HTTP %s | Error while pushing health state to Engine API: %s" % (r.status_code, str(r.json())))
    
    
    
        def update_status(self):
            """
            Requests the current status of all components
            """
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["engine"]["version"] = meta.__version__
    
    David Trattnig's avatar
    David Trattnig committed
            self.engine.player.connector.enable_transaction()
            self.status["lqs"]["version"]   = self.engine.version()
            self.status["lqs"]["uptime"]    = self.engine.uptime()
            self.status["lqs"]["io"]        = self.get_io_state()
            self.status["lqs"]["mixer"]     = self.engine.player.mixer.mixer_status()
            self.status["lqs"]["mixer_fallback"] = self.engine.player.mixer_fallback.mixer_status()
            self.engine.player.connector.disable_transaction()
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["steering"]["url"]       = self.config.get("api_steering_status")
    
            self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
    
    David Trattnig's avatar
    David Trattnig committed
            self.status["api"]["tank"]["url"]           = self.config.get("api_tank_status")
            self.status["api"]["tank"]["available"]     = self.validate_url_connection(self.config.get("api_tank_status"))
            self.status["api"]["tank"]["status"]        = self.get_url_response(self.config.get("api_tank_status"))
    
    
            self.status["api"]["engine"]["url"]         = self.config.get("api_engine_status")
            self.status["api"]["engine"]["available"]   = self.validate_url_connection(self.config.get("api_engine_status"))
    
    David Trattnig's avatar
    David Trattnig committed
            self.update_vitality_status()
    
    David Trattnig's avatar
    David Trattnig committed
        def update_vitality_status(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Refreshes the vital status info which are required for the engine to survive.
    
    David Trattnig's avatar
    David Trattnig committed
            self.engine.player.connector.enable_transaction()
            self.status["lqs"]["active"]  = self.engine.is_active()
            self.engine.player.connector.disable_transaction()
    
    David Trattnig's avatar
    David Trattnig committed
    
    
            self.status["redis"]["active"] = self.validate_redis_connection()
    
            self.status["audio_source"] = self.validate_directory(self.config.get("audio_source_folder"))
    
    David Trattnig's avatar
    David Trattnig committed
            # After first update start the Heartbeat Monitor
    
    David Trattnig's avatar
    David Trattnig committed
            if not self.heartbeat_running:
                self.heartbeat_running = True
                if self.config.get("heartbeat_frequency") > 0:
                    self.heartbeat()
    
    David Trattnig's avatar
    David Trattnig committed
        def heartbeat(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
    
    David Trattnig's avatar
    David Trattnig committed
            a heartbeat is sent to the configured server.
    
    David Trattnig's avatar
    David Trattnig committed
            if self.has_valid_status(True):
    
    David Trattnig's avatar
    David Trattnig committed
                self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
    
                
                # Engine resurrected into normal state
                if self.already_invalid:
                    self.already_invalid = False
                    status = json.dumps(self.get_status())
    
                    self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status))
    
                    self.mailer.send_admin_mail( \
                        "OK - Engine turned back into some HEALTHY STATE!", \
                        "Things seem fine again at '%s':\n\n%s" % (self.engine_id, status))
    
    David Trattnig's avatar
    David Trattnig committed
                    # Route call of event via event dispatcher to provide ability for additional hooks
    
    David Trattnig's avatar
    David Trattnig committed
                    self.engine.event_dispatcher.on_resurrect(status)                    
    
            else:
                # Engine turned into invalid state
                if not self.already_invalid:
                    self.already_invalid = True
                    status = json.dumps(self.get_status())
    
                    self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status))
    
                    self.mailer.send_admin_mail( \
                        "ERROR - Engine turned into some INVALID STATE!", \
                        "There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status))
    
    David Trattnig's avatar
    David Trattnig committed
                    # Route call of event via event dispatcher to provide ability for additional hooks
    
    David Trattnig's avatar
    David Trattnig committed
                    self.engine.event_dispatcher.on_sick(status)
    
    David Trattnig's avatar
    David Trattnig committed
    
            threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
    
    David Trattnig's avatar
    David Trattnig committed
        def get_io_state(self):
    
    David Trattnig's avatar
    David Trattnig committed
            Retrieves all input and outputs provided by the engine.
    
    David Trattnig's avatar
    David Trattnig committed
            ios = self.engine.engine_state()
    
    David Trattnig's avatar
    David Trattnig committed
            try:
                ios = ios.replace('"connected":', '"connected": ""')
                ios = json.loads(ios, strict=False)
                return ios
    
            except Exception as e:
    
    David Trattnig's avatar
    David Trattnig committed
                self.logger.warn("Got invalid JSON from Liquidsoap - " + str(e))
    
    David Trattnig's avatar
    David Trattnig committed
                return MonitorResponseCode.INVALID_STATE.value
    
    
    
        def validate_url_connection(self, url):
            """
            Checks if connection to passed URL is successful.
            """
            try:
                request = urllib.request.Request(url)
                response = urllib.request.urlopen(request)
                response.read()
            except Exception:
                return False
    
            return True
    
    
        def validate_redis_connection(self):
            """
            Checks if the connection to Redis is successful.
            """
            try:
                cra = ClientRedisAdapter(self.config)
                cra.publish("aura", "status")
            except:
                return False
    
            return True
    
    
        def validate_directory(self, dir_path):
            """
            Checks if a given directory is existing and holds content
            """
            status = dict()
    
            status["path"] = dir_path
    
            status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path)
    
            status["has_content"] = False
    
            if status["exists"]:
                status["has_content"] = any([True for _ in os.scandir(dir_path)])
    
    
            return status
    
    
        def get_url_response(self, url):
            """
            Fetches JSON data from the given URL.
    
            Args:
                url (String):       The API endpoint to call
            
            Returns:
                (dict[]):           A Python object representing the JSON structure
            """
            data = None
    
            try:
                request = urllib.request.Request(url)
                response = urllib.request.urlopen(request)
                data = response.read()
    
    David Trattnig's avatar
    David Trattnig committed
                return json.loads(data, strict=False)
    
            except (urllib.error.URLError, IOError, ValueError) as e:
                self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
    
    
    David Trattnig's avatar
    David Trattnig committed
            return MonitorResponseCode.INVALID_STATE.value
    
    
    
        def get_engine_id(self):
            """
            Retrieves a String identifier consisting of IP and Hostname to differentiate
            the engine in mails and status broadcasts.
            """
            host = platform.node()
            return "%s (%s)" % (self.get_ip(), host)
    
    
        def get_ip(self):
            """
            Returns the IP of the Engine instance.
            """
    
    David Trattnig's avatar
    David Trattnig committed
            try:
                s = socket(AF_INET, SOCK_DGRAM)
                s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
                s.connect(('<broadcast>', 0))
                return s.getsockname()[0]
            except:
    
                self.logger.critical(SU.red("Error while accessing network via <broadcast>!"))
    
    David Trattnig's avatar
    David Trattnig committed
                return "<UNKNOWN NETWORK>"