communicator.py 21.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#
#  engine
#
#  Playout Daemon for autoradio project
#
#
#  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
#  This file is part of engine.
#
#  engine is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  any later version.
#
#  engine is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with engine. If not, see <http://www.gnu.org/licenses/>.
#

25
26
import time
import logging
27
28
import simplejson

29
from modules.communication.liquidsoap.playerclient import LiquidSoapPlayerClient
30
# from modules.communication.liquidsoap.recorderclient import LiquidSoapRecorderClient
31
from modules.communication.liquidsoap.initthread import LiquidSoapInitThread
32
33
from modules.communication.mail.mail import AuraMailer

34
from libraries.enum.auraenumerations import TerminalColors, ScheduleEntryType
35
from libraries.exceptions.auraexceptions import LQConnectionError
36
from libraries.database.broadcasts import TrackService
37
from libraries.exceptions.exception_logger import ExceptionLogger
38

39
40
41
42
""" 
    LiquidSoapCommunicator Class 
    Uses LiquidSoapClient, but introduces more complex commands, transactions and error handling
"""
43

44
class LiquidSoapCommunicator(ExceptionLogger):
45
    lqcr = None
46
    client = None
47
    logger = None
48
    transaction = 0
49
    channels = None
50
    scheduler = None
51
    error_data = None
52
    auramailer = None
53
    is_liquidsoap_running = False
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
54
    connection_attempts = 0
55
56

    # ------------------------------------------------------------------------------------------ #
57
    def __init__(self, config, logger=True):
58
        """
59
60
        Constructor
        """
61
        self.config = config
62
63
        self.logger = logging.getLogger("AuraEngine")

64
65
        self.client = LiquidSoapPlayerClient(config, "engine.sock")
        # self.lqcr = LiquidSoapRecorderClient(config, "record.sock")
66

67
        errors_file = self.config.get("install_dir") + "/errormessages/controller_error.js"
68
69
70
        f = open(errors_file)
        self.error_data = simplejson.load(f)
        f.close()
71

72
        self.auramailer = AuraMailer(self.config)
73

74
75
76
77
78
79
80
81
82
83
84
85
86
87
        self.is_liquidsoap_up_and_running()

    # ------------------------------------------------------------------------------------------ #
    def is_liquidsoap_up_and_running(self):
        try:
            self.uptime()
            self.is_liquidsoap_running = True
        except LQConnectionError as e:
            self.logger.info("Liquidsoap is not running so far")
            self.is_liquidsoap_running = False
        except Exception as e:
            self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e))
            self.is_liquidsoap_running = False

88
89
    # ------------------------------------------------------------------------------------------ #
    def set_volume(self, mixernumber, volume):
90
        return self.__send_lqc_command__(self.client, "mixer", "volume", mixernumber, volume)
91
92
93

    # ------------------------------------------------------------------------------------------ #
    def get_active_mixer(self):
94
95
96
97
        """
        get active mixer in liquidsoap server
        :return:
        """
98
99
        activeinputs = []

100
        # enable more control over the connection
101
        self.enable_transaction()
102

103
        inputs = self.get_all_channels()
104
105
106

        cnt = 0
        for input in inputs:
107
            status = self.__get_mixer_status__(cnt)
108
109
110
111
112
113

            if "selected=true" in status:
                activeinputs.append(input)

            cnt = cnt + 1

114
        self.disable_transaction()
115
116
117

        return activeinputs

118
119
120
121
122
123
    # ------------------------------------------------------------------------------------------ #
    def get_active_channel(self):
        """
        gets active channel from programme
        :return:
        """
124
125
        active_entry = self.scheduler.get_active_entry()
        if active_entry is None:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
126
            return ""
127
        return active_entry.type
128

129
130
    # ------------------------------------------------------------------------------------------ #
    def get_mixer_status(self):
131
132
        inputstate = {}

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
133
        self.enable_transaction()
134

135
        inputs = self.get_all_channels()
136
137
138

        cnt = 0
        for input in inputs:
139
            inputstate[input] = self.__get_mixer_status__(cnt)
140
141
            cnt = cnt + 1

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
142
        self.disable_transaction()
143
144
145

        return inputstate

146
147
148
149
150
151
152
153
    # ------------------------------------------------------------------------------------------ #
    def get_recorder_status(self):
        self.enable_transaction(self.lqcr)
        recorder_state = self.__send_lqc_command__(self.lqcr, "record", "status")
        self.disable_transaction(self.lqcr)

        return recorder_state

154
155
156
157
158
159
160
161
162
    # ------------------------------------------------------------------------------------------ #
    def http_start_stop(self, start):
        if start:
            cmd = "start"
        else:
            cmd = "stop"

        try:
            self.enable_transaction()
163
            self.__send_lqc_command__(self.client, "http", cmd)
164
165
166
167
168
            self.disable_transaction()
        except LQConnectionError as e:
            # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
            pass

169
170
171
172
173
174
175
176
177
178
179
180
    # ------------------------------------------------------------------------------------------ #
    def recorder_stop(self):
        self.enable_transaction()

        for i in range(5):
            if self.config.get("rec_" + str(i)) == "y":
                self.__send_lqc_command__(self.client, "recorder_" + str(i), "stop")

        self.disable_transaction()

    # ------------------------------------------------------------------------------------------ #
    def recorder_start(self, num=-1):
181
182
183
184
185
186
187
188
        if not self.is_liquidsoap_running:
            if num==-1:
                msg = "Want to start recorder, but LiquidSoap is not running"
            else:
                msg = "Want to start recorder " + str(num) + ", but LiquidSoap is not running"
            self.logger.warning(msg)
            return False

189
190
191
192
193
194
195
        self.enable_transaction()

        if num == -1:
            self.recorder_start_all()
        else:
            self.recorder_start_one(num)

196
197
        self.disable_transaction()

198
199
    # ------------------------------------------------------------------------------------------ #
    def recorder_start_all(self):
200
201
202
203
        if not self.is_liquidsoap_running:
            self.logger.warning("Want to start all recorder, but LiquidSoap is not running")
            return False

204
        self.enable_transaction()
205
206
        for i in range(5):
            self.recorder_start_one(i)
207
        self.disable_transaction()
208
209
210

    # ------------------------------------------------------------------------------------------ #
    def recorder_start_one(self, num):
211
212
        if not self.is_liquidsoap_running:
            return False
213

214
215
216
217
218
        if self.config.get("rec_" + str(num)) == "y":
            returnvalue = self.__send_lqc_command__(self.client, "recorder", str(num), "status")

            if returnvalue == "off":
                self.__send_lqc_command__(self.client, "recorder", str(num), "start")
219

220
    # ------------------------------------------------------------------------------------------ #
221
222
223
224
225
    def activate(self, new_entry):
        # grab the actual active entry
        old_entry = self.scheduler.get_active_entry()
        # determine its type
        old_type = old_entry.type
226

227
228
        try:
            # enable transaction
229
230
            self.enable_transaction()

231
            if old_type == new_entry.type:
232
                # push something to active channel
233
                self.activate_same_channel(new_entry)
234
235
            else:
                # switch to another channel
236
                self.activate_different_channel(new_entry, old_type)
237

238
            # disable conn
239
            self.disable_transaction()
240

241
            # insert playlist entry
242
            self.insert_track_service_entry(new_entry)
243
        except LQConnectionError as e:
244
            # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further and pass the exception
245
            pass
246

247
    # ------------------------------------------------------------------------------------------ #
248
249
    def activate_same_channel(self, entry, activate_different_channel=False):
        if not activate_different_channel:
250
251
252
253
254
255
256
            self.logger.info(TerminalColors.PINK.value + entry.type.value + " already active!" + TerminalColors.ENDC.value)

        # push to fs or stream
        if entry.type == ScheduleEntryType.FILESYSTEM:
            self.playlist_push(entry.source)
        if entry.type == ScheduleEntryType.STREAM:
            self.set_http_url(entry.source)
257
            self.http_start_stop(True)
258
        # nothing to do when we are live => just leave it as is
259

260
261
262
263
        # set active channel to wanted volume
        if not activate_different_channel:
            self.channel_volume(entry.type.value, entry.volume)

264
265
266
    # ------------------------------------------------------------------------------------------ #
    def activate_different_channel(self, entry, active_type):
        self.logger.info(TerminalColors.PINK.value + "LiquidSoapCommunicator is activating " + entry.type.value + " & deactivating " + active_type.value + "!" + TerminalColors.ENDC.value)
267

268
        # reuse of this function, because activate_same_channel and activate_different_channel are doing pretty the same except setting of the volume
269
        self.activate_same_channel(entry, True)
270

271
        # set other channels to zero volume
272
        others = self.all_inputs_but(entry.getChannel())
273
274
        for o in others:
            self.channel_volume(o, 0)
275

276
277
        # set active channel to wanted volume
        self.channel_volume(entry.type.value, entry.volume)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
278
279
280

    # ------------------------------------------------------------------------------------------ #
    def insert_track_service_entry(self, schedule_entry):
281
        # create trackservice entry
282
        trackservice_entry = TrackService()
283
284

        # set foreign keys
285
286
287
        trackservice_entry.playlist_id = schedule_entry.playlist_id
        trackservice_entry.entry_num = schedule_entry.entry_num
        trackservice_entry.source = schedule_entry.source
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
288

289
        # store
290
        trackservice_entry.store(add=True, commit=True)
291
292
293

    # ------------------------------------------------------------------------------------------ #
    def all_inputs_but(self, input_type):
294
        try:
295
            activemixer_copy = self.get_all_channels().copy()
296
            activemixer_copy.remove(input_type)
297
298
        except ValueError as e:
            self.logger.error("Requested channel (" + input_type + ") not in channellist. Reason: " + str(e))
299
300
        except AttributeError:
            self.logger.critical("Channellist is None")
301

302
303
304
305
        return activemixer_copy

    # ------------------------------------------------------------------------------------------ #
    def get_all_channels(self):
306
        if self.channels is None or len(self.channels) == 0:
307
            self.channels = self.__send_lqc_command__(self.client, "mixer", "inputs")
308
309

        return self.channels
310

311
312
313
314
315
    # ------------------------------------------------------------------------------------------ #
    def reload_channels(self):
        self.channels = None
        return self.get_all_channels()

316
317
    # ------------------------------------------------------------------------------------------ #
    def __get_mixer_status__(self, mixernumber):
318
        return self.__send_lqc_command__(self.client, "mixer", "status", mixernumber)
319
320

    # ------------------------------------------------------------------------------------------ #
321
    def init_player(self):
322
323
324
325
        t = LiquidSoapInitThread()
        t.liquidsoapcommunicator = self
        t.active_entry = self.scheduler.get_active_entry()
        t.start()
326
        return "LiquidSoapInitThread started!"
327

328
    # ------------------------------------------------------------------------------------------ #
329
330
331
332
333
334
    def channel_activate(self, channel, activate):
        channels = self.get_all_channels()

        try:
            index = channels.index(channel)
            if len(channel) < 1:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
335
336
337
338
339
340
                self.logger.critical("Cannot activate channel. There are no channels!")
            else:
                message = self.__send_lqc_command__(self.client, "mixer", "select", index, activate)
                return message
        except Exception as e:
            self.logger.critical("Ran into exception when activating channel. Reason: " + str(e))
341

342
343
344
    # ------------------------------------------------------------------------------------------ #
    def channel_volume(self, channel, volume):
        """
345
        set volume of a channel
346
        @type     channel:  string
347
        @param    channel:  Channel
348
        @type     volume:   int
349
        @param    volume:   Volume between 0 and 100
350
        """
351

352
        try:
353
            channels = self.get_all_channels()
354
            index = channels.index(channel)
355

356
            if len(channel) < 1:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
357
                self.logger.warning("Cannot set volume of channel " + channel + "! There are no channels.")
358
            else:
359
                message = self.__send_lqc_command__(self.client, "mixer", "volume", str(index), str(int(volume)))
360

361
                if message.find('volume=' + str(volume) + '%'):
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
362
                    self.logger.debug("Set volume of channel " + channel + " to " + str(volume))
363
                else:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
364
                    self.logger.warning("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message)
365

366
                return message
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
367
        except (AttributeError, ValueError) as e: #(LQConnectionError, AttributeError):
368
            self.disable_transaction(force=True)
369
            self.logger.error("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e))
370

371
372
373
374
375
    # ------------------------------------------------------------------------------------------ #
    def auraengine_state(self):
        state = self.__send_lqc_command__(self.client, "auraengine", "state")
        return state

376
377
    # ------------------------------------------------------------------------------------------ #
    def liquidsoap_help(self):
378
        data = self.__send_lqc_command__(self.client, 'help')
379
        if not data:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
380
            self.logger.warning("Could not get Liquidsoap's help")
381
        else:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
382
383
            self.logger.debug("Got Liquidsoap's help")
        return data
384

385
386
    # ------------------------------------------------------------------------------------------ #
    def set_http_url(self, uri):
387
        return self.__send_lqc_command__(self.client, "http", "url", uri)
388

389
390
391
392
393
394
395
    # ------------------------------------------------------------------------------------------ #
    def playlist_push(self, uri):
        """
        Eine Uri in die Playlist einfügen
        @type   uri:   str
        @param  uri:   Die Uri
        """
396
        return self.__send_lqc_command__(self.client, "fs", "push", uri)
397
398
399
400

    # ------------------------------------------------------------------------------------------ #
    def playlist_seek(self, seconds_to_seek):
        return self.__send_lqc_command__(self.client, "fs", "seek", seconds_to_seek)
401
402
403
404
405
406

    # ------------------------------------------------------------------------------------------ #
    def version(self):
        """
        get version
        """
407
        data = self.__send_lqc_command__(self.client, "version", "")
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
408
409
        self.logger.debug("Got Liquidsoap's version")
        return data
410
411

    # ------------------------------------------------------------------------------------------ #
412
413
414
415
    def uptime(self):
        """
        get uptime
        """
416
417
        data = self.__send_lqc_command__(self.client, "uptime", "")
        self.logger.debug("Got Liquidsoap's uptime")
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
418
        return data
419
420

    # ------------------------------------------------------------------------------------------ #
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
421
    def __send_lqc_command__(self, lqs_instance, namespace, command, *args):
422
423
424
425
426
427
428
429
430
431
432
433
434
435
        """
        Ein Kommando an Liquidsoap senden
        @type  lqs_instance: object
        @param lqs_instance: Instance of LiquidSoap Client
        @type  namespace:    string
        @param namespace:    Namespace of function
        @type  command:      string
        @param command:      Function name
        @type args:          list
        @param args:         List of parameters
        @rtype:              string
        @return:             Response from LiquidSoap
        """
        try:
436
437
438
            if namespace == "recorder":
                self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args))
            else:
439
440
441
442
                if command == "":
                    self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + str(args))
                else:
                    self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args))
443

444
445
446
447
448
            # call wanted function ...
            func = getattr(lqs_instance, namespace)
            # ... and fetch the result
            result = func(command, *args)

449
450
            self.logger.debug("LiquidSoapCommunicator got response " + str(result))

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
451
            self.connection_attempts = 0
452

453
454
            return result

455
        except LQConnectionError as e:
456
            self.logger.error("Connection Error when sending " + str(namespace) + "." + str(command) + str(args))
457
            if self.try_to_reconnect():
458
                time.sleep(0.2)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
459
460
                self.connection_attempts += 1
                if self.connection_attempts < 5:
461
462
463
464
465
466
467
468
469
                    # reconnect
                    self.__open_conn(self.client)
                    self.logger.info("Trying to resend " + str(namespace) + "." + str(command) + str(args))
                    # grab return value
                    retval = self.__send_lqc_command__(lqs_instance, namespace, command, *args)
                    # disconnect
                    self.__close_conn(self.client)
                    # return the val
                    return retval
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
470
                else:
471
472
473
474
475
476
                    if command == "":
                        msg = "Rethrowing Exception while trying to send " + str(namespace) + str(args)
                    else:
                        msg = "Rethrowing Exception while trying to send " + str(namespace) + "." + str(command) + str(args)

                    self.logger.info(msg)
477
                    self.disable_transaction(socket=self.client, force=True)
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
478
                    raise e
479
            else:
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
480
                # also store when was last admin mail sent with which content...
Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
481
                self.logger.critical("SEND ADMIN MAIL AT THIS POINT")
482
483
                raise e

484
    # ------------------------------------------------------------------------------------------ #
485
486
487
488
489
490
491
492
    def try_to_reconnect(self):
        self.enable_transaction()
        return self.transaction > 0

    # ------------------------------------------------------------------------------------------ #
    def enable_transaction(self, socket=None):
        # set socket to playout if nothing else is given
        if socket is None:
493
            socket = self.client
494

495
496
        self.transaction = self.transaction + 1

497
        self.logger.debug(TerminalColors.WARNING.value + "ENabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)
498
499
500
501

        if self.transaction > 1:
            return

502
503
504
505
506
        try:
            self.__open_conn(socket)
        except FileNotFoundError:
            self.disable_transaction(socket=socket, force=True)

Gottfried Gaisbauer's avatar
Gottfried Gaisbauer committed
507
508
509
            msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?"
            self.logger.critical(TerminalColors.RED.value + msg + TerminalColors.ENDC.value)
            self.auramailer.send_admin_mail("CRITICAL Exception when connecting to Liquidsoap", msg)
510

511
    # ------------------------------------------------------------------------------------------ #
512
513
514
515
516
    def disable_transaction(self, socket=None, force=False):
        if not force:
            # nothing to disable
            if self.transaction == 0:
                return
517

518
519
            # decrease transaction counter
            self.transaction = self.transaction - 1
520

521
522
523
524
525
526
527
528
            # debug msg
            self.logger.debug(TerminalColors.WARNING.value + "DISabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)

            # return if connection is still needed
            if self.transaction > 0:
                return
        else:
            self.logger.debug(TerminalColors.WARNING.value + "Forcefully DISabling transaction! " + TerminalColors.ENDC.value)
529

530
531
        # close conn and set transactioncounter to 0
        self.__close_conn(socket)
532
        self.transaction = 0
533

534
    # ------------------------------------------------------------------------------------------ #
535
536
    def __open_conn(self, socket):
        # already connected
537
        if self.transaction > 1:
538
            return
539

540
        self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value)
541

542
        # try to connect
543
544
        socket.connect()

545
    # ------------------------------------------------------------------------------------------ #
546
547
    def __close_conn(self, socket):
        # set socket to playout
548
        if socket is None:
549
            socket = self.client
550

551
        # do not disconnect if a transaction is going on
552
        if self.transaction > 0:
553
            return
554

555
        # say bye
556
        socket.byebye()
557

558
559
        # debug msg
        self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value)