# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import urllib import json from src.core.engine import Engine from src.base.config import AuraConfig # ------------------------------------------------------------------------------------------ # class ConnectionTester(AuraConfig): # ------------------------------------------------------------------------------------------ # def __init__(self): super(ConnectionTester, self).__init__() # ------------------------------------------------------------------------------------------ # def get_connection_status(self): status = dict() status["db"] = False # self.test_db_conn() status["pv"] = self.test_pv_conn() status["lqs"] = self.test_lqs_conn() status["lqsr"] = False # self.test_lqsr_conn() status["tank"] = self.test_tank_conn() return json.dumps(status) # ------------------------------------------------------------------------------------------ # # def test_db_conn(self): # try: # ScheduleEntry.select_all() # except: # return False # # return True # ------------------------------------------------------------------------------------------ # def test_lqs_conn(self): try: lsc = Engine() lsc.player.mixer.mixer_status() return True except Exception as e: return False # ------------------------------------------------------------------------------------------ # def test_pv_conn(self): return self.test_url_connection(self.config.get("calendarurl")) # ------------------------------------------------------------------------------------------ # def test_tank_conn(self): # test load of playlist 1 return self.test_url_connection(self.config.get("importerurl")+"1") def test_url_connection(self, url): try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) response.read() except Exception as e: return False return True