communicator.py 19.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#
#  engine
#
#  Playout Daemon for autoradio project
#
#
#  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
#  This file is part of engine.
#
#  engine is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  any later version.
#
#  engine is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with engine. If not, see <http://www.gnu.org/licenses/>.
#

25
26
import time
import logging
27
28
import simplejson

29
from modules.communication.liquidsoap.playerclient import LiquidSoapPlayerClient
30
# from modules.communication.liquidsoap.recorderclient import LiquidSoapRecorderClient
31
from modules.communication.liquidsoap.initthread import LiquidSoapInitThread
32
33
from modules.communication.mail.mail import AuraMailer

34
from libraries.enum.auraenumerations import TerminalColors, ScheduleEntryType
35
from libraries.exceptions.auraexceptions import LQConnectionError
36
from libraries.database.broadcasts import TrackService
37
from libraries.exceptions.exception_logger import ExceptionLogger
38

39
40
41
42
""" 
    LiquidSoapCommunicator Class 
    Uses LiquidSoapClient, but introduces more complex commands, transactions and error handling
"""
43

44
class LiquidSoapCommunicator(ExceptionLogger):
45
    client = None
46
    lqcr = None
47
    logger = None
48
    transaction = 0
49
    channels = None
50
    scheduler = None
51
    error_data = None
52
53
    auramailer = None
    aborttransaction = False
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
54
    connection_attempts = 0
55
56

    # ------------------------------------------------------------------------------------------ #
57
    def __init__(self, config, logger=True):
58
        """
59
60
        Constructor
        """
61
        self.config = config
62
63
        self.logger = logging.getLogger("AuraEngine")

64
65
        self.client = LiquidSoapPlayerClient(config, "engine.sock")
        # self.lqcr = LiquidSoapRecorderClient(config, "record.sock")
66

67
        errors_file = self.config.get("install_dir") + "/errormessages/controller_error.js"
68
69
70
        f = open(errors_file)
        self.error_data = simplejson.load(f)
        f.close()
71

72
        self.auramailer = AuraMailer(self.config)
73

74
75
    # ------------------------------------------------------------------------------------------ #
    def set_volume(self, mixernumber, volume):
76
        return self.__send_lqc_command__(self.client, "mixer", "volume", mixernumber, volume)
77
78
79

    # ------------------------------------------------------------------------------------------ #
    def get_active_mixer(self):
80
81
82
83
        """
        get active mixer in liquidsoap server
        :return:
        """
84
85
        activeinputs = []

86
        # enable more control over the connection
87
        self.enable_transaction()
88

89
        inputs = self.get_all_channels()
90
91
92

        cnt = 0
        for input in inputs:
93
            status = self.__get_mixer_status__(cnt)
94
95
96
97
98
99

            if "selected=true" in status:
                activeinputs.append(input)

            cnt = cnt + 1

100
        self.disable_transaction()
101
102
103

        return activeinputs

104
105
106
107
108
109
    # ------------------------------------------------------------------------------------------ #
    def get_active_channel(self):
        """
        gets active channel from programme
        :return:
        """
110
111
        active_entry = self.scheduler.get_active_entry()
        if active_entry is None:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
112
            return ""
113
        return active_entry.type
114

115
116
    # ------------------------------------------------------------------------------------------ #
    def get_mixer_status(self):
117
118
        inputstate = {}

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
119
        self.enable_transaction()
120

121
        inputs = self.get_all_channels()
122
123
124

        cnt = 0
        for input in inputs:
125
            inputstate[input] = self.__get_mixer_status__(cnt)
126
127
            cnt = cnt + 1

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
128
        self.disable_transaction()
129
130
131

        return inputstate

132
133
134
135
136
137
138
139
    # ------------------------------------------------------------------------------------------ #
    def get_recorder_status(self):
        self.enable_transaction(self.lqcr)
        recorder_state = self.__send_lqc_command__(self.lqcr, "record", "status")
        self.disable_transaction(self.lqcr)

        return recorder_state

140
141
142
143
144
145
146
147
148
    # ------------------------------------------------------------------------------------------ #
    def http_start_stop(self, start):
        if start:
            cmd = "start"
        else:
            cmd = "stop"

        try:
            self.enable_transaction()
149
            self.__send_lqc_command__(self.client, "http", cmd)
150
151
152
153
154
            self.disable_transaction()
        except LQConnectionError as e:
            # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
            pass

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
    # ------------------------------------------------------------------------------------------ #
    def recorder_stop(self):
        self.enable_transaction()

        for i in range(5):
            if self.config.get("rec_" + str(i)) == "y":
                self.__send_lqc_command__(self.client, "recorder_" + str(i), "stop")

        self.disable_transaction()

    # ------------------------------------------------------------------------------------------ #
    def recorder_start(self, num=-1):
        self.enable_transaction()

        if num == -1:
            self.recorder_start_all()
        else:
            self.recorder_start_one(num)

    # ------------------------------------------------------------------------------------------ #
    def recorder_start_all(self):
176
        self.enable_transaction()
177
178
        for i in range(5):
            self.recorder_start_one(i)
179
        self.disable_transaction()
180
181
182

    # ------------------------------------------------------------------------------------------ #
    def recorder_start_one(self, num):
183

184
185
186
187
188
        if self.config.get("rec_" + str(num)) == "y":
            returnvalue = self.__send_lqc_command__(self.client, "recorder", str(num), "status")

            if returnvalue == "off":
                self.__send_lqc_command__(self.client, "recorder", str(num), "start")
189

190
    # ------------------------------------------------------------------------------------------ #
191
    def activate(self, entry):
192
193
        active_type = self.scheduler.get_active_entry().type

194
195
        try:
            # enable transaction
196
197
            self.enable_transaction()

198
199
200
201
202
203
            if active_type == entry.type:
                # push something to active channel
                self.activate_same_channel(entry)
            else:
                # switch to another channel
                self.activate_different_channel(entry, active_type)
204

205
            # disable conn
206
            self.disable_transaction()
207

208
            # insert playlist entry
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
209
            self.insert_track_service_entry(entry)
210
211
212
        except LQConnectionError as e:
            # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
            pass
213

214
215
216
217
218
219
220
221
222
223
224
225
    # ------------------------------------------------------------------------------------------ #
    def activate_same_channel(self, entry, silent=False):
        if not silent:
            self.logger.info(TerminalColors.PINK.value + entry.type.value + " already active!" + TerminalColors.ENDC.value)

        # push to fs or stream
        if entry.type == ScheduleEntryType.FILESYSTEM:
            self.playlist_push(entry.source)
        if entry.type == ScheduleEntryType.STREAM:
            self.http_start_stop(True)
            self.set_http_url(entry.source)
        # nothing to do when we are live => just leave it as is
226

227
228
229
    # ------------------------------------------------------------------------------------------ #
    def activate_different_channel(self, entry, active_type):
        self.logger.info(TerminalColors.PINK.value + "LiquidSoapCommunicator is activating " + entry.type.value + " & deactivating " + active_type.value + "!" + TerminalColors.ENDC.value)
230

231
        self.activate_same_channel(entry, True)
232

233
        # set others to zero volume
234
        others = self.all_inputs_but(entry.getChannel())
235
236
237
238
        for o in others:
            self.channel_volume(o, 0)
        # set active channel to wanted volume
        self.channel_volume(entry.type.value, entry.volume)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
239
240
241

    # ------------------------------------------------------------------------------------------ #
    def insert_track_service_entry(self, schedule_entry):
242
243
244
245
        trackservice_entry = TrackService()
        trackservice_entry.playlist_id = schedule_entry.playlist_id
        trackservice_entry.entry_num = schedule_entry.entry_num
        trackservice_entry.source = schedule_entry.source
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
246

247
        trackservice_entry.store(add=True, commit=True)
248
249
250

    # ------------------------------------------------------------------------------------------ #
    def all_inputs_but(self, input_type):
251
        try:
252
            activemixer_copy = self.get_all_channels().copy()
253
            activemixer_copy.remove(input_type)
254
255
        except ValueError as e:
            self.logger.error("Requested channel (" + input_type + ") not in channellist. Reason: " + str(e))
256
257
        except AttributeError:
            self.logger.critical("Channellist is None")
258

259
260
261
262
        return activemixer_copy

    # ------------------------------------------------------------------------------------------ #
    def get_all_channels(self):
263
        if self.channels is None or len(self.channels) == 0:
264
            self.channels = self.__send_lqc_command__(self.client, "mixer", "inputs")
265
266

        return self.channels
267

268
269
270
271
272
    # ------------------------------------------------------------------------------------------ #
    def reload_channels(self):
        self.channels = None
        return self.get_all_channels()

273
274
    # ------------------------------------------------------------------------------------------ #
    def __get_mixer_status__(self, mixernumber):
275
        return self.__send_lqc_command__(self.client, "mixer", "status", mixernumber)
276
277

    # ------------------------------------------------------------------------------------------ #
278
    def init_player(self):
279
280
281
282
        t = LiquidSoapInitThread()
        t.liquidsoapcommunicator = self
        t.active_entry = self.scheduler.get_active_entry()
        t.start()
283
        return "LiquidSoapInitThread started!"
284

285
    # ------------------------------------------------------------------------------------------ #
286
287
288
289
290
291
    def channel_activate(self, channel, activate):
        channels = self.get_all_channels()

        try:
            index = channels.index(channel)
            if len(channel) < 1:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
292
293
294
295
296
297
                self.logger.critical("Cannot activate channel. There are no channels!")
            else:
                message = self.__send_lqc_command__(self.client, "mixer", "select", index, activate)
                return message
        except Exception as e:
            self.logger.critical("Ran into exception when activating channel. Reason: " + str(e))
298

299
300
301
    # ------------------------------------------------------------------------------------------ #
    def channel_volume(self, channel, volume):
        """
302
        set volume of a channel
303
        @type     channel:  string
304
        @param    channel:  Channel
305
        @type     volume:   int
306
        @param    volume:   Volume between 0 and 100
307
        """
308

309
        try:
310
            channels = self.get_all_channels()
311
            index = channels.index(channel)
312

313
            if len(channel) < 1:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
314
                self.logger.warning("Cannot set volume of channel " + channel + "! There are no channels.")
315
            else:
316
                message = self.__send_lqc_command__(self.client, "mixer", "volume", str(index), str(int(volume)))
317

318
                if message.find('volume=' + str(volume) + '%'):
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
319
                    self.logger.debug("Set volume of channel " + channel + " to " + str(volume))
320
                else:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
321
                    self.logger.warning("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message)
322

323
                return message
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
324
        except (AttributeError, ValueError) as e: #(LQConnectionError, AttributeError):
325
            self.disable_transaction(force=True)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
326
            self.logger.critical("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e))
327

328
329
330
331
332
    # ------------------------------------------------------------------------------------------ #
    def auraengine_state(self):
        state = self.__send_lqc_command__(self.client, "auraengine", "state")
        return state

333
334
    # ------------------------------------------------------------------------------------------ #
    def liquidsoap_help(self):
335
        data = self.__send_lqc_command__(self.client, 'help')
336
        if not data:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
337
            self.logger.warning("Could not get Liquidsoap's help")
338
        else:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
339
340
            self.logger.debug("Got Liquidsoap's help")
        return data
341

342
343
    # ------------------------------------------------------------------------------------------ #
    def set_http_url(self, uri):
344
        return self.__send_lqc_command__(self.client, "http", "url", uri)
345

346
347
348
349
350
351
352
    # ------------------------------------------------------------------------------------------ #
    def playlist_push(self, uri):
        """
        Eine Uri in die Playlist einfügen
        @type   uri:   str
        @param  uri:   Die Uri
        """
353
        return self.__send_lqc_command__(self.client, "fs", "push", uri)
354
#        self.notifyClient()
355
356
357
358
359
360

    # ------------------------------------------------------------------------------------------ #
    def version(self):
        """
        get version
        """
361
        data = self.__send_lqc_command__(self.client, "version")
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
362
363
        self.logger.debug("Got Liquidsoap's version")
        return data
364
365

    # ------------------------------------------------------------------------------------------ #
366
367
368
369
    def uptime(self):
        """
        get uptime
        """
370
        data = self.__send_lqc_command__(self.client, "uptime")
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
371
372
        self.logger.debug("Got Liquidsoap's help")
        return data
373
374

    # ------------------------------------------------------------------------------------------ #
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
375
    def __send_lqc_command__(self, lqs_instance, namespace, command, *args):
376
377
378
379
380
381
382
383
384
385
386
387
388
389
        """
        Ein Kommando an Liquidsoap senden
        @type  lqs_instance: object
        @param lqs_instance: Instance of LiquidSoap Client
        @type  namespace:    string
        @param namespace:    Namespace of function
        @type  command:      string
        @param command:      Function name
        @type args:          list
        @param args:         List of parameters
        @rtype:              string
        @return:             Response from LiquidSoap
        """
        try:
390
391
392
393
            if namespace == "recorder":
                self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args))
            else:
                self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args))
394

395
396
397
398
399
            # call wanted function ...
            func = getattr(lqs_instance, namespace)
            # ... and fetch the result
            result = func(command, *args)

400
401
            self.logger.debug("LiquidSoapCommunicator got response " + str(result))

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
402
            self.connection_attempts = 0
403

404
405
            return result

406
        except LQConnectionError as e:
407
            self.logger.info("Connection Error when sending " + str(namespace) + "." + str(command) + str(args))
408
            if self.try_to_reconnect():
409
                time.sleep(0.2)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
410
411
                self.connection_attempts += 1
                if self.connection_attempts < 5:
412
413
414
415
416
417
418
419
420
                    # reconnect
                    self.__open_conn(self.client)
                    self.logger.info("Trying to resend " + str(namespace) + "." + str(command) + str(args))
                    # grab return value
                    retval = self.__send_lqc_command__(lqs_instance, namespace, command, *args)
                    # disconnect
                    self.__close_conn(self.client)
                    # return the val
                    return retval
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
421
                else:
422
423
                    self.logger.info("Rethrowing Exception while trying to send " + str(namespace) + "." + str(command) + str(args))
                    self.disable_transaction(socket=self.client, force=True)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
424
                    raise e
425
            else:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
426
                # also store when was last admin mail sent with which content...
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
427
                self.logger.critical("SEND ADMIN MAIL AT THIS POINT")
428
429
                raise e

430
    # ------------------------------------------------------------------------------------------ #
431
432
433
434
435
436
437
438
    def try_to_reconnect(self):
        self.enable_transaction()
        return self.transaction > 0

    # ------------------------------------------------------------------------------------------ #
    def enable_transaction(self, socket=None):
        # set socket to playout if nothing else is given
        if socket is None:
439
            socket = self.client
440

441
442
        self.transaction = self.transaction + 1

443
        self.logger.debug(TerminalColors.WARNING.value + "ENabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)
444
445
446
447

        if self.transaction > 1:
            return

448
449
450
451
452
        try:
            self.__open_conn(socket)
        except FileNotFoundError:
            self.disable_transaction(socket=socket, force=True)

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
453
454
455
            msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?"
            self.logger.critical(TerminalColors.RED.value + msg + TerminalColors.ENDC.value)
            self.auramailer.send_admin_mail("CRITICAL Exception when connecting to Liquidsoap", msg)
456

457
    # ------------------------------------------------------------------------------------------ #
458
459
460
461
462
    def disable_transaction(self, socket=None, force=False):
        if not force:
            # nothing to disable
            if self.transaction == 0:
                return
463

464
465
            # decrease transaction counter
            self.transaction = self.transaction - 1
466

467
468
469
470
471
472
473
474
            # debug msg
            self.logger.debug(TerminalColors.WARNING.value + "DISabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)

            # return if connection is still needed
            if self.transaction > 0:
                return
        else:
            self.logger.debug(TerminalColors.WARNING.value + "Forcefully DISabling transaction! " + TerminalColors.ENDC.value)
475

476
477
        # close conn and set transactioncounter to 0
        self.__close_conn(socket)
478
        self.transaction = 0
479

480
    # ------------------------------------------------------------------------------------------ #
481
482
    def __open_conn(self, socket):
        # already connected
483
        if self.transaction > 1:
484
            return
485

486
        self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value)
487

488
        # try to connect
489
490
        socket.connect()

491
    # ------------------------------------------------------------------------------------------ #
492
493
    def __close_conn(self, socket):
        # set socket to playout
494
        if socket is None:
495
            socket = self.client
496

497
        # do not disconnect if a transaction is going on
498
        if self.transaction > 0:
499
            return
500

501
        # say bye
502
        socket.byebye()
503

504
505
        # debug msg
        self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value)