api.py 10.6 KB
Newer Older
David Trattnig's avatar
David Trattnig committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.



22
import logging
23
import requests
24
25
import queue
import threading
26

27
28

from src.base.utils import SimpleUtil as SU
29
from src.scheduling.utils import TimeslotFilter
30

31

David Trattnig's avatar
David Trattnig committed
32

33
class ApiFetcher(threading.Thread):
David Trattnig's avatar
David Trattnig committed
34
    """
35
    Fetches the timeslots, playlists and playlist entries as JSON
36
    via the API endpoints of Steering and Tank.
David Trattnig's avatar
David Trattnig committed
37
    """
38
39
    config = None
    logging = None
40
    queue = None
41
    has_already_fetched = False
42
    fetched_timeslot_data = None
43
    stop_event = None
David Trattnig's avatar
David Trattnig committed
44

45
46
47
48
49
50
    # Config for API Endpoints
    steering_calendar_url = None
    tank_playlist_url = None
    tank_session = None
    tank_secret = None

51

David Trattnig's avatar
David Trattnig committed
52

53
    def __init__(self, config):
David Trattnig's avatar
David Trattnig committed
54
55
56
        """
        Constructor
        """
57
        self.config = config
58
        self.logger = logging.getLogger("AuraEngine")
59
60
61
62
        self.steering_calendar_url = self.config.get("api_steering_calendar")
        self.tank_playlist_url = self.config.get("api_tank_playlist")
        self.tank_session = self.config.get("api_tank_session")
        self.tank_secret = self.config.get("api_tank_secret")
63
64
        self.queue = queue.Queue()
        self.stop_event = threading.Event()
65
        threading.Thread.__init__(self)
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94



    def run(self):
        """
        Fetch timeslot data from the API.

        Returns
            Timeslot ([dict]):  An array of retrieved timeslots dictionary
        """
        try:
            fetched_timeslots = self.fetch()
            self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslots))

            # If nothing is fetched, return
            if not fetched_timeslots:
                self.queue.put("fetching_aborted Nothing fetched")
                return None

            # Release the mutex
            self.queue.put(fetched_timeslots)
        except Exception as e:
            # Release the mutex
            self.logger.warning("Fetching aborted due to: %s" % str(e), e)
            self.queue.put("fetching_aborted " + str(e))

        # Terminate the thread
        return

David Trattnig's avatar
David Trattnig committed
95

David Trattnig's avatar
David Trattnig committed
96

David Trattnig's avatar
David Trattnig committed
97
    #
98
    #   METHODS
David Trattnig's avatar
David Trattnig committed
99
100
    #

101

102
103
104
105
106
107
108
109
    def get_fetched_data(self):
        """
        Retrieves the fetched data from the queue.
        """
        return self.queue.get()



110
    def fetch(self):
David Trattnig's avatar
David Trattnig committed
111
112
113
        """
        Retrieve all required data from the API.
        """
114
        return_data = []
David Trattnig's avatar
David Trattnig committed
115

116
117
118
119
        self.logger.debug("Fetching timeslots from STEERING")
        self.fetched_timeslot_data = self.fetch_timeslot_data()
        if not self.fetched_timeslot_data:
            self.logger.critical(SU.red("No timeslots fetched from API!"))
120
            return None
121

122
123
        for timeslot in self.fetched_timeslot_data:

124
125
            if "schedule_default_playlist_id" in timeslot:
                timeslot["default_schedule_playlist_id"] = timeslot["schedule_default_playlist_id"]
126
                timeslot["schedule_fallback_id"] = None
127
128
            if "show_default_playlist_id" in timeslot:
                timeslot["default_show_playlist_id"] = timeslot["show_default_playlist_id"]
129
                timeslot["show_fallback_id"] = None
130

131
132
        self.logger.debug("Fetching playlists from TANK")
        self.fetch_playlists()
133
134

        try:
135
            for timeslot in self.fetched_timeslot_data:
136

137
138
139
140
141
142
143
                # Skip timeslot if no start or end is given
                if "start" not in timeslot:
                    self.logger.warning("No start of timeslot given. Skipping timeslot: " + str(timeslot))
                    timeslot = None
                if "end" not in timeslot:
                    self.logger.warning("No end of timeslot given. Skipping timeslot: " + str(timeslot))
                    timeslot = None
144

145
146
                if timeslot:
                    return_data.append(timeslot)
147
148
        except TypeError:
            self.logger.error(SU.red("Nothing fetched ..."))
149
            self.fetched_timeslot_data = None
150
            return None
151
152
153

        return return_data

David Trattnig's avatar
David Trattnig committed
154
155


156
    def fetch_timeslot_data(self):
David Trattnig's avatar
David Trattnig committed
157
        """
158
        Fetches timeslot data from Steering.
David Trattnig's avatar
David Trattnig committed
159
160

        Returns:
161
            ([Timeslot]):   An array of timeslots
David Trattnig's avatar
David Trattnig committed
162
        """
163
        timeslots = None
164
        headers = { "content-type": "application/json" }
165

166
        try:
167
            self.logger.debug("Fetch timeslots from Steering API...")
168
169
            response = requests.get(self.steering_calendar_url, data=None, headers=headers)
            if not response.status_code == 200:
170
                self.logger.critical(SU.red("HTTP Status: %s | Timeslots could not be fetched! Response: %s" % \
171
                    (str(response.status_code), response.text)))
172
173
                return None
            timeslots = response.json()
174
        except Exception as e:
175
            self.logger.critical(SU.red("Error while requesting timeslots from Steering!"), e)
176

177
178
        if not timeslots:
            self.logger.error(SU.red("Got no timeslots via Playout API (Steering)!"))
179
            return None
180

181
        return self.polish_timeslots(timeslots)
182

David Trattnig's avatar
David Trattnig committed
183
184


185
    def fetch_playlists(self):
David Trattnig's avatar
David Trattnig committed
186
        """
187
188
189
        Fetches all playlists including fallback playlists for every timeslot.
        This method used the class member `fetched_timeslot_data`` to iterate
        over and extend timeslot data.
David Trattnig's avatar
David Trattnig committed
190
        """
191
192
193
        # store fetched entries => do not have to fetch playlist_id more than once
        fetched_entries=[]

194
        try:
195
            for timeslot in self.fetched_timeslot_data:
196

197
                # Get IDs of specific, default and fallback playlists
198
                playlist_id = self.get_playlist_id(timeslot, "playlist_id")
199
                default_schedule_playlist_id = self.get_playlist_id(timeslot, "default_schedule_playlist_id")
200
201
                default_show_playlist_id = self.get_playlist_id(timeslot, "default_show_playlist_id")
                schedule_fallback_id = self.get_playlist_id(timeslot, "schedule_fallback_id")
202
203
                show_fallback_id = self.get_playlist_id(timeslot, "show_fallback_id")
                station_fallback_id = self.get_playlist_id(timeslot, "station_fallback_id")
204

205
                # Retrieve playlist, default and the fallback playlists for every timeslot.
David Trattnig's avatar
David Trattnig committed
206
                # If a playlist (like station_fallback) is already fetched, it is not fetched again but reused
207
208
209
                timeslot["playlist"] = self.fetch_playlist(playlist_id, fetched_entries)
                timeslot["default_schedule_playlist"] = self.fetch_playlist(default_schedule_playlist_id, fetched_entries)
                timeslot["default_show_playlist"] = self.fetch_playlist(default_show_playlist_id, fetched_entries)
210
                timeslot["schedule_fallback"] = self.fetch_playlist(schedule_fallback_id, fetched_entries)
211
212
                timeslot["show_fallback"] = self.fetch_playlist(show_fallback_id, fetched_entries)
                timeslot["station_fallback"]  = self.fetch_playlist(station_fallback_id, fetched_entries)
213

214
        except Exception as e:
215
            self.logger.error("Error while fetching playlists from API endpoints: " + str(e), e)
216

David Trattnig's avatar
David Trattnig committed
217
218


219
    def fetch_playlist(self, playlist_id, fetched_playlists):
David Trattnig's avatar
David Trattnig committed
220
        """
221
        Fetches the playlist for a given timeslot.
David Trattnig's avatar
David Trattnig committed
222
223

        Args:
224
225
            playlist_id (String):       The ID of the playlist
            fetched_playlists ([dict]): Previously fetched playlists to avoid re-fetching
David Trattnig's avatar
David Trattnig committed
226
227

        Returns:
228
            (Playlist):             Playlist for `playlist_id`
David Trattnig's avatar
David Trattnig committed
229
        """
230
231
        if not playlist_id:
            return None
232

233
        playlist = None
234
        url = self.tank_playlist_url.replace("${ID}", playlist_id)
235
        headers = {
236
            "Authorization": "Bearer %s:%s" % (self.tank_session, self.tank_secret),
237
238
            "content-type": "application/json"
        }
239

240
        # If playlist is already fetched in this round, use the existing one
David Trattnig's avatar
David Trattnig committed
241
        for playlist in fetched_playlists:
242
243
            if playlist["id"] == playlist_id:
                self.logger.debug("Playlist #%s already fetched" % playlist_id)
David Trattnig's avatar
David Trattnig committed
244
                return playlist
245

246
        try:
247
            self.logger.debug("Fetch playlist from Tank API...")
248
249
250
251
            response = requests.get(url, data=None, headers=headers)
            if not response.status_code == 200:
                self.logger.critical(SU.red("HTTP Status: %s | Playlist #%s could not be fetched or is not available! Response: %s" % \
                    (str(response.status_code), str(playlist_id), response.text)))
252
253
                return None
            playlist = response.json()
254
        except Exception as e:
255
256
            self.logger.critical(SU.red("Error while requesting playlist #%s from Tank" % str(playlist_id)), e)
            return None
257

258
259
        fetched_playlists.append(playlist)
        return playlist
260

261
262


263
    def get_playlist_id(self, timeslot, id_name):
264
        """
265
        Extracts the playlist ID for a given playlist (fallback) type.
266
267
268
269
270
271
272

        Args:
            timeslot (dict):    The timeslot dictionary
            id_name (String):   The dictionary key holding the playlist ID

        Returns:
            (Integer):  The playlist ID
273
        """
274
275
276
        if not id_name in timeslot:
            return None

277
        playlist_id = str(timeslot[id_name])
278
        if not playlist_id or playlist_id == "None":
279
            self.logger.debug("No value defined for '%s' in timeslot '#%s'" % (id_name, timeslot["id"]))
280
            return None
281

282
        return playlist_id
283
284


285

286
    def polish_timeslots(self, timeslots):
287
        """
288
289
        Removes all timeslots which are not relevant for further processing,
        and transparent timeslot ID assigment for more expressive use.
290
        """
291
        count_before = len(timeslots)
292
293
        timeslots = TimeslotFilter.filter_24h(timeslots)
        timeslots = TimeslotFilter.filter_past(timeslots)
294
295
296
297
298
299
        count_after = len(timeslots)
        self.logger.debug("Removed %d unnecessary timeslots from response. Timeslots left: %d" % ((count_before - count_after), count_after))

        for t in timeslots:
            t["timeslot_id"] = t["id"]
        return timeslots
300

301
302


303
304
305
306
307
308
    def terminate(self):
        """
        Terminates the thread.
        """
        self.logger.info("Shutting down API fetcher...")
        self.stop_event.set()