Newer
Older
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#

Gottfried Gaisbauer
committed
import logging
import simplejson
from modules.communication.liquidsoap.communicator import LiquidSoapCommunicator
from libraries.database.broadcasts import ScheduleEntry
from libraries.base.config import AuraConfig
class ConnectionTester(AuraConfig):
super(ConnectionTester, self).__init__()
def get_connection_status(self):
status = dict()
status["db"] = self.test_db_conn()
status["pv"] = self.test_pv_conn()
status["lqs"] = self.test_lqs_conn()

Gottfried Gaisbauer
committed
status["lqsr"] = False # self.test_lqsr_conn()
status["tank"] = self.test_tank_conn()
status["redis"] = self.test_redis_conn()
return simplejson.dumps(status)
def test_db_conn(self):
try:
ScheduleEntry.select_all()
except:
return False
return True
def test_lqs_conn(self):
try:
lsc = LiquidSoapCommunicator(self.config)

Gottfried Gaisbauer
committed
lsc.get_mixer_status()
return True
except Exception as e:

Gottfried Gaisbauer
committed
def test_lqsr_conn(self):
try:
lsc = LiquidSoapCommunicator(self.config)
lsc.get_recorder_status()

Gottfried Gaisbauer
committed
except Exception as e:
return False
def test_pv_conn(self):
return self.test_url_connection(self.config.get("calendarurl"))
def test_tank_conn(self):
# test load of playlist 1
return self.test_url_connection(self.config.get("importerurl")+"1")
def test_redis_conn(self):
from modules.communication.redis.adapter import ClientRedisAdapter
try:
cra = ClientRedisAdapter()
cra.publish("aura", "status")
except:
return False
return True
def test_url_connection(self, url):
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception as e:
return False
return True