# # engine # # Playout Daemon for autoradio project # # # Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at> # # This file is part of engine. # # engine is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # any later version. # # engine is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with engine. If not, see <http://www.gnu.org/licenses/>. # import urllib import logging import json from modules.core.engine import SoundSystem #from modules.database.model import ScheduleEntry from modules.base.config import AuraConfig # ------------------------------------------------------------------------------------------ # class ConnectionTester(AuraConfig): # ------------------------------------------------------------------------------------------ # def __init__(self): super(ConnectionTester, self).__init__() # ------------------------------------------------------------------------------------------ # def get_connection_status(self): status = dict() status["db"] = False # self.test_db_conn() status["pv"] = self.test_pv_conn() status["lqs"] = self.test_lqs_conn() status["lqsr"] = False # self.test_lqsr_conn() status["tank"] = self.test_tank_conn() status["redis"] = self.test_redis_conn() return json.dumps(status) # ------------------------------------------------------------------------------------------ # # def test_db_conn(self): # try: # ScheduleEntry.select_all() # except: # return False # # return True # ------------------------------------------------------------------------------------------ # def test_lqs_conn(self): try: lsc = soundsystem(self.config) lsc.mixer_status() return True except Exception as e: return False # ------------------------------------------------------------------------------------------ # def test_lqsr_conn(self): try: lsc = soundsystem(self.config) lsc.get_recorder_status() return True except Exception as e: return False # ------------------------------------------------------------------------------------------ # def test_pv_conn(self): return self.test_url_connection(self.config.get("calendarurl")) # ------------------------------------------------------------------------------------------ # def test_tank_conn(self): # test load of playlist 1 return self.test_url_connection(self.config.get("importerurl")+"1") # ------------------------------------------------------------------------------------------ # def test_redis_conn(self): from modules.communication.redis.adapter import ClientRedisAdapter try: cra = ClientRedisAdapter() cra.publish("aura", "status") except: return False return True def test_url_connection(self, url): try: request = urllib.request.Request(url) response = urllib.request.urlopen(request) response.read() except Exception as e: return False return True