api.py 10 KB
Newer Older
David Trattnig's avatar
David Trattnig committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.



22
import logging
23
import requests
24
25
import queue
import threading
26

27
28

from src.base.utils import SimpleUtil as SU
29
from src.scheduling.utils import TimeslotFilter
30

31

David Trattnig's avatar
David Trattnig committed
32

33
class ApiFetcher(threading.Thread):
David Trattnig's avatar
David Trattnig committed
34
    """
35
    Fetches the timeslots, playlists and playlist entries as JSON
36
    via the API endpoints of Steering and Tank.
David Trattnig's avatar
David Trattnig committed
37
    """
38
39
    config = None
    logging = None
40
    queue = None    
41
    has_already_fetched = False
42
    fetched_timeslot_data = None
43
    stop_event = None
David Trattnig's avatar
David Trattnig committed
44

45
46
47
48
49
50
    # Config for API Endpoints
    steering_calendar_url = None
    tank_playlist_url = None
    tank_session = None
    tank_secret = None

51

David Trattnig's avatar
David Trattnig committed
52

53
    def __init__(self, config):
David Trattnig's avatar
David Trattnig committed
54
55
56
        """
        Constructor
        """
57
        self.config = config
58
        self.logger = logging.getLogger("AuraEngine")
59
60
61
62
63
        self.steering_calendar_url = self.config.get("api_steering_calendar")
        self.tank_playlist_url = self.config.get("api_tank_playlist")
        self.tank_session = self.config.get("api_tank_session")
        self.tank_secret = self.config.get("api_tank_secret")

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
        self.queue = queue.Queue()
        self.stop_event = threading.Event()
        threading.Thread.__init__(self)        



    def run(self):
        """
        Fetch timeslot data from the API.

        Returns
            Timeslot ([dict]):  An array of retrieved timeslots dictionary
        """
        try:
            fetched_timeslots = self.fetch()
            self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslots))

            # If nothing is fetched, return
            if not fetched_timeslots:
                self.queue.put("fetching_aborted Nothing fetched")
                return None

            # Release the mutex
            self.queue.put(fetched_timeslots)
        except Exception as e:
            # Release the mutex
            self.logger.warning("Fetching aborted due to: %s" % str(e), e)
            self.queue.put("fetching_aborted " + str(e))

        # Terminate the thread
        return

David Trattnig's avatar
David Trattnig committed
96
97

    #
98
    #   METHODS
David Trattnig's avatar
David Trattnig committed
99
100
    #

101

102
103
104
105
106
107
108
109
    def get_fetched_data(self):
        """
        Retrieves the fetched data from the queue.
        """
        return self.queue.get()



110
    def fetch(self):
David Trattnig's avatar
David Trattnig committed
111
112
113
        """
        Retrieve all required data from the API.
        """
114
        return_data = []
David Trattnig's avatar
David Trattnig committed
115

116
117
118
119
        self.logger.debug("Fetching timeslots from STEERING")
        self.fetched_timeslot_data = self.fetch_timeslot_data()
        if not self.fetched_timeslot_data:
            self.logger.critical(SU.red("No timeslots fetched from API!"))
120
            return None
121

122
123
        self.logger.debug("Fetching playlists from TANK")
        self.fetch_playlists()
124
125

        try:
126
127
128
129
130
131
132
133
134
135
136
137
            for timeslot in self.fetched_timeslot_data:
                # Skip timeslot if no start or end is given
                if "start" not in timeslot:
                    self.logger.warning("No start of timeslot given. Skipping timeslot: " + str(timeslot))
                    timeslot = None
                if "end" not in timeslot:
                    self.logger.warning("No end of timeslot given. Skipping timeslot: " + str(timeslot))
                    timeslot = None
                if "playlist" not in timeslot \
                    and "show_fallback" not in timeslot \
                    and "schedule_fallback" not in timeslot \
                    and "station_fallback" not in timeslot:
138
                    
139
140
                    self.logger.warning("No playlist for timeslot given. Skipping timeslot: " + str(timeslot))
                    timeslot = None
141

142
143
                if timeslot:
                    return_data.append(timeslot)
144
145
        except TypeError:
            self.logger.error(SU.red("Nothing fetched ..."))
146
            self.fetched_timeslot_data = None
147
            return None
148
149
150

        return return_data

David Trattnig's avatar
David Trattnig committed
151
152


153
    def fetch_timeslot_data(self):
David Trattnig's avatar
David Trattnig committed
154
        """
155
        Fetches timeslot data from Steering.
David Trattnig's avatar
David Trattnig committed
156
157

        Returns:
158
            ([Timeslot]):   An array of timeslots
David Trattnig's avatar
David Trattnig committed
159
        """
160
        timeslots = None
161
162
163
        headers = { "content-type": "application/json" }
       
        try:
164
            self.logger.debug("Fetch timeslots from Steering API...")                
165
166
            response = requests.get(self.steering_calendar_url, data=None, headers=headers)
            if not response.status_code == 200:
167
                self.logger.critical(SU.red("HTTP Status: %s | Timeslots could not be fetched! Response: %s" % \
168
169
                    (str(response.status_code), response.text)))
                return None            
170
            timeslots = response.json()            
171
        except Exception as e:
172
            self.logger.critical(SU.red("Error while requesting timeslots from Steering!"), e)
173

174
175
        if not timeslots:
            self.logger.error(SU.red("Got no timeslots via Playout API (Steering)!"))
176
            return None
177

178
        return self.polish_timeslots(timeslots)
179

David Trattnig's avatar
David Trattnig committed
180
181


182
    def fetch_playlists(self):
David Trattnig's avatar
David Trattnig committed
183
        """
184
185
186
        Fetches all playlists including fallback playlists for every timeslot.
        This method used the class member `fetched_timeslot_data`` to iterate
        over and extend timeslot data.
David Trattnig's avatar
David Trattnig committed
187
        """
188
189
190
        # store fetched entries => do not have to fetch playlist_id more than once
        fetched_entries=[]

191
        try:
192
            for timeslot in self.fetched_timeslot_data:
193

194
                # Get IDs of playlists
195
196
197
198
                playlist_id = self.get_playlist_id(timeslot, "playlist_id")
                schedule_fallback_id = self.get_playlist_id(timeslot, "schedule_fallback_id")
                show_fallback_id = self.get_playlist_id(timeslot, "show_fallback_id")
                station_fallback_id = self.get_playlist_id(timeslot, "station_fallback_id")
199

200
                # Retrieve playlist and the fallback playlists for every timeslot.
David Trattnig's avatar
David Trattnig committed
201
                # If a playlist (like station_fallback) is already fetched, it is not fetched again but reused
202
203
204
205
                timeslot["playlist"]          = self.fetch_playlist(playlist_id,          fetched_entries)
                timeslot["schedule_fallback"] = self.fetch_playlist(schedule_fallback_id, fetched_entries)
                timeslot["show_fallback"]     = self.fetch_playlist(show_fallback_id,     fetched_entries)
                timeslot["station_fallback"]  = self.fetch_playlist(station_fallback_id,  fetched_entries)
206

207
        except Exception as e:
208
            self.logger.error("Error while fetching playlists from API endpoints: " + str(e), e)
209

David Trattnig's avatar
David Trattnig committed
210
211


212
    def fetch_playlist(self, playlist_id, fetched_playlists):
David Trattnig's avatar
David Trattnig committed
213
        """
214
        Fetches the playlist for a given timeslot.
David Trattnig's avatar
David Trattnig committed
215
216
217
218
219
220

        Args:
            id_name (String):       The type of playlist to fetch (e.g. normal vs. fallback)
            fetched_playlists ([]): Previously fetched playlists to avoid re-fetching

        Returns:
221
            (Playlist):             Playlist of type `id_name`
David Trattnig's avatar
David Trattnig committed
222
        """
223
224
        if not playlist_id:
            return None
225
226
227
228
229
230
        playlist = None
        url = self.tank_playlist_url.replace("${ID}", playlist_id) 
        headers = {
            "Authorization": "Bearer %s:%s" % (self.tank_session, self.tank_secret), 
            "content-type": "application/json"
        }
231

232
        # If playlist is already fetched in this round, use the existing one
David Trattnig's avatar
David Trattnig committed
233
        for playlist in fetched_playlists:
234
235
            if playlist["id"] == playlist_id:
                self.logger.debug("Playlist #%s already fetched" % playlist_id)
David Trattnig's avatar
David Trattnig committed
236
                return playlist
237
                   
238
        try:
239
240
241
242
243
244
245
            self.logger.debug("Fetch playlist from Tank API...")             
            response = requests.get(url, data=None, headers=headers)
            if not response.status_code == 200:
                self.logger.critical(SU.red("HTTP Status: %s | Playlist #%s could not be fetched or is not available! Response: %s" % \
                    (str(response.status_code), str(playlist_id), response.text)))
                return None          
            playlist = response.json()                  
246
        except Exception as e:
247
248
            self.logger.critical(SU.red("Error while requesting playlist #%s from Tank" % str(playlist_id)), e)
            return None
249

250
251
252
        fetched_playlists.append(playlist)
        return playlist
 
253
254


255
    def get_playlist_id(self, timeslot, id_name):
256
        """
257
        Extracts the playlist ID for a given playlist (fallback) type.
258
        """
259
        playlist_id = str(timeslot[id_name])
260
        if not playlist_id or playlist_id == "None":
261
            self.logger.debug("No value defined for '%s' in timeslot '#%s'" % (id_name, timeslot["id"]))
262
263
264
            return None
        
        return playlist_id
265
266


267
    def polish_timeslots(self, timeslots):
268
        """
269
270
        Removes all timeslots which are not relevant for further processing,
        and transparent timeslot ID assigment for more expressive use.
271
        """
272
        count_before = len(timeslots)
273
274
        timeslots = TimeslotFilter.filter_24h(timeslots)
        timeslots = TimeslotFilter.filter_past(timeslots)
275
276
277
278
279
280
        count_after = len(timeslots)
        self.logger.debug("Removed %d unnecessary timeslots from response. Timeslots left: %d" % ((count_before - count_after), count_after))

        for t in timeslots:
            t["timeslot_id"] = t["id"]
        return timeslots
281

282
283


284
285
286
287
288
289
    def terminate(self):
        """
        Terminates the thread.
        """
        self.logger.info("Shutting down API fetcher...")
        self.stop_event.set()