# # steering, Programme/schedule management for AURA # # Copyright (C) 2011-2017, 2020, Ernesto Rico Schmidt # Copyright (C) 2017-2019, Ingo Leindecker # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # import json import logging from datetime import date, datetime, time, timedelta from itertools import pairwise from textwrap import dedent from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import OpenApiResponse, extend_schema, extend_schema_view from rest_framework import filters as drf_filters from rest_framework import mixins, permissions, status, viewsets from rest_framework.pagination import LimitOffsetPagination from rest_framework.response import Response from django.contrib.auth.models import User from django.db import IntegrityError from django.http import HttpResponse, JsonResponse from django.shortcuts import get_object_or_404 from django.utils import timezone from django.utils.translation import gettext as _ from program import filters from program.models import ( Category, FundingCategory, Host, Image, Language, License, LinkType, MusicFocus, Note, RRule, Schedule, ScheduleConflictError, Show, TimeSlot, Topic, Type, ) from program.serializers import ( CategorySerializer, ErrorSerializer, FundingCategorySerializer, HostSerializer, ImageSerializer, LanguageSerializer, LicenseSerializer, LinkTypeSerializer, MusicFocusSerializer, NoteSerializer, RRuleSerializer, ScheduleConflictResponseSerializer, ScheduleCreateUpdateRequestSerializer, ScheduleDryRunResponseSerializer, ScheduleResponseSerializer, ScheduleSerializer, ShowSerializer, TimeSlotSerializer, TopicSerializer, TypeSerializer, UserSerializer, ) from program.services import get_timerange_timeslots, resolve_conflicts from program.utils import ( DisabledObjectPermissionCheckMixin, NestedObjectFinderMixin, get_values, parse_date, ) logger = logging.getLogger(__name__) def timeslot_entry(*, timeslot: TimeSlot) -> dict: """return a timeslot entry as a dict""" schedule = timeslot.schedule show = timeslot.schedule.show playlist_id = timeslot.playlist_id title = show_name = f"{show.name} {_('REP')}" if schedule.is_repetition else show.name # we start and end as timezone naive datetime objects start = timezone.make_naive(timeslot.start).strftime("%Y-%m-%dT%H:%M:%S") end = timezone.make_naive(timeslot.end).strftime("%Y-%m-%dT%H:%M:%S") return { "end": end, "id": timeslot.id, "playlistId": playlist_id, # `Timeslot.repetition_of` is a foreign key that can be null "repetitionOfId": timeslot.repetition_of.id if timeslot.repetition_of else None, "scheduleDefaultPlaylistId": schedule.default_playlist_id, "scheduleId": schedule.id, "showCategories": ", ".join(show.category.values_list("name", flat=True)), "showDefaultPlaylistId": show.default_playlist_id, # `Show.funding_category` is a foreign key can be null "showFundingCategory": show.funding_category.name if show.funding_category_id else "", "showHosts": ", ".join(show.hosts.values_list("name", flat=True)), "showId": show.id, "showLanguages": ", ".join(show.language.values_list("name", flat=True)), "showMusicFocus": ", ".join(show.music_focus.values_list("name", flat=True)), "showName": show_name, "showTopics": ", ".join(show.topic.values_list("name", flat=True)), # `Show.type` is a foreign key that can be null "showType": show.type.name if show.type_id else "", "start": start, "title": title, } def gap_entry(*, gap_start: datetime, gap_end: datetime) -> dict: """return a virtual timeslot to fill the gap in between `gap_start` and `gap_end` as a dict""" return { "end": gap_end.strftime("%Y-%m-%dT%H:%M:%S"), "start": gap_start.strftime("%Y-%m-%dT%H:%M:%S"), "virtual": True, } def json_day_schedule(request, year=None, month=None, day=None): # datetime.combine returns a timezone naive datetime object if year is None and month is None and day is None: start = timezone.make_aware(datetime.combine(timezone.now(), time(0, 0))) else: start = timezone.make_aware(datetime.combine(date(year, month, day), time(0, 0))) end = start + timedelta(hours=24) timeslots = get_timerange_timeslots(start, end).select_related("schedule") schedule = [] for ts in timeslots: entry = { "start": ts.start.strftime("%Y-%m-%d_%H:%M:%S"), "end": ts.end.strftime("%Y-%m-%d_%H:%M:%S"), "title": ts.show.name, "id": ts.show.id, } schedule.append(entry) return HttpResponse( json.dumps(schedule, ensure_ascii=False).encode("utf8"), content_type="application/json; charset=utf-8", ) def json_playout(request): """ Return a JSON representation of the scheduled playout. Expects GET parameters `start` (date), `end` (date), and `includeVirtual` (boolean). - `start` is today by default. - `end` is one week after the start date by default. - `includeVirtual` is false by default. The schedule will include virtual timeslots to fill unscheduled gaps if requested. Called by - engine (playout) to retrieve timeslots within a given timerange - internal calendar to retrieve all timeslots for a week """ # datetime.now and datetime.combine return timezone naive datetime objects if request.GET.get("start") is None: schedule_start = timezone.make_aware(datetime.combine(datetime.now(), time(0, 0))) else: schedule_start = timezone.make_aware( datetime.combine(parse_date(request.GET.get("start")), time(0, 0)) ) if request.GET.get("end") is None: schedule_end = schedule_start + timedelta(days=7) else: schedule_end = timezone.make_aware( datetime.combine(parse_date(request.GET.get("end")) + timedelta(days=1), time(0, 0)) ) include_virtual = request.GET.get("include_virtual") == "true" timeslots = get_timerange_timeslots(schedule_start, schedule_end).select_related("schedule") schedule = [] first_timeslot = timeslots.first() if include_virtual and first_timeslot.start > schedule_start: schedule.append(gap_entry(gap_start=schedule_start, gap_end=first_timeslot.start)) for current, upcoming in pairwise(timeslots): schedule.append(timeslot_entry(timeslot=current)) if include_virtual and current.end != upcoming.start: schedule.append(gap_entry(gap_start=current.end, gap_end=upcoming.start)) last_timeslot = timeslots.last() # we need to append the last timeslot to the schedule to complete it if last_timeslot: schedule.append(timeslot_entry(timeslot=last_timeslot)) if include_virtual and last_timeslot.end < schedule_end: schedule.append(gap_entry(gap_start=last_timeslot.end, gap_end=schedule_end)) return HttpResponse( json.dumps(schedule, ensure_ascii=False).encode("utf8"), content_type="application/json; charset=utf-8", ) @extend_schema_view( create=extend_schema(summary="Create a new user."), retrieve=extend_schema( summary="Retrieve a single user.", description="Non-admin users may only retrieve their own user record.", ), update=extend_schema( summary="Update an existing user.", description="Non-admin users may only update their own user record.", ), partial_update=extend_schema( summary="Partially update an existing user.", description="Non-admin users may only update their own user record.", ), list=extend_schema( summary="List all users.", description=( "The returned list of records will only contain a single record " "for non-admin users which is their own user account." ), ), ) class APIUserViewSet( DisabledObjectPermissionCheckMixin, mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet, ): serializer_class = UserSerializer filter_backends = [drf_filters.SearchFilter] search_fields = ["username", "first_name", "last_name", "email"] def get_queryset(self): """The queryset is empty if the user is not authenticated, contains all the users if the method is safe or the requesting user is a superuser, otherwise it only contains the requesting user.""" user = self.request.user if not user.is_authenticated: return User.objects.none() elif self.request.method in permissions.SAFE_METHODS or user.is_superuser: return User.objects.all() else: return User.objects.filter(pk=user.id) def create(self, request, *args, **kwargs): serializer = UserSerializer( # FIXME: the method get_serializer_context should be used but it does seem to get lost context={"request": request}, # the serializer needs the request in the context data=request.data, ) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) @extend_schema_view( create=extend_schema(summary="Create a new image."), destroy=extend_schema(summary="Delete an existing image."), list=extend_schema(summary="List all images."), partial_update=extend_schema( summary="Partially update an existing image.", description="Only `alt_text`, `credits`, and `ppoi` can be updated.", ), retrieve=extend_schema(summary="Retrieve a single image."), update=extend_schema( summary="Update an existing image.", description="Only `alt_text`, `credits`, and `ppoi` can be updated.", ), ) class APIImageViewSet(viewsets.ModelViewSet): serializer_class = ImageSerializer pagination_class = LimitOffsetPagination permission_classes = [permissions.IsAuthenticatedOrReadOnly] def get_queryset(self): """The queryset contains all the images if the method is safe, otherwise it only contains the images owned by the requesting user.""" if self.request.method in permissions.SAFE_METHODS: return Image.objects.all() else: return Image.objects.filter(owner=self.request.user.username) def create(self, request, *args, **kwargs): """Create an Image instance. Any user can create an image.""" serializer = ImageSerializer( context={"request": request}, # the serializer needs the request in the context data=request.data, ) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): """Update an Image instance. Only the creator can update an image.""" image = self.get_object() serializer = ImageSerializer( image, context={"request": request}, # the serializer needs the request in the context data=request.data, ) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data) def destroy(self, request, *args, **kwargs): """Destroy an Image instance. Only the owner can delete an image.""" image = self.get_object() image.delete() return Response(status=status.HTTP_204_NO_CONTENT) @extend_schema_view( create=extend_schema(summary="Create a new show."), retrieve=extend_schema(summary="Retrieve a single show."), update=extend_schema(summary="Update an existing show."), partial_update=extend_schema(summary="Partially update an existing show."), destroy=extend_schema(summary="Delete an existing show."), list=extend_schema(summary="List all shows."), ) class APIShowViewSet(DisabledObjectPermissionCheckMixin, viewsets.ModelViewSet): queryset = Show.objects.all() serializer_class = ShowSerializer pagination_class = LimitOffsetPagination filter_backends = [DjangoFilterBackend, drf_filters.SearchFilter] filterset_class = filters.ShowFilterSet search_fields = ["name", "slug", "short_description", "description"] def get_object(self): queryset = self.filter_queryset(self.get_queryset()) lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field lookup_arg = self.kwargs[lookup_url_kwarg] # allow object retrieval through id or slug try: filter_kwargs = {self.lookup_field: int(lookup_arg)} except ValueError: filter_kwargs = {"slug": lookup_arg} obj = get_object_or_404(queryset, **filter_kwargs) self.check_object_permissions(self.request, obj) return obj def create(self, request, *args, **kwargs): serializer = ShowSerializer( # FIXME: the method get_serializer_context should be used but it does seem to get lost context={"request": request}, # the serializer needs the request in the context data=request.data, ) if serializer.is_valid(raise_exception=True): try: serializer.save() except IntegrityError: data = { "slug": {"code": "unique", "message": "show with this slug already exists."} } return Response(data, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.data, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): partial = kwargs.get("partial", False) show = self.get_object() serializer = ShowSerializer( # FIXME: the method get_serializer_context should be used but it does seem to get lost context={"request": request}, # the serializer needs the request in the context data=request.data, instance=show, partial=partial, ) if serializer.is_valid(raise_exception=True): try: serializer.save() return Response(serializer.data) except IntegrityError: data = { "slug": {"code": "unique", "message": "show with this slug already exists."} } return Response(data, status=status.HTTP_400_BAD_REQUEST) def partial_update(self, request, *args, **kwargs): kwargs["partial"] = True return self.update(request, *args, **kwargs) @extend_schema_view( retrieve=extend_schema(summary="Retrieve a single rrule."), list=extend_schema(summary="List all rrule."), ) class APIRRuleViewSet(viewsets.ModelViewSet): queryset = RRule.objects.all() serializer_class = RRuleSerializer @extend_schema_view( create=extend_schema( summary="Create a new schedule.", request=ScheduleCreateUpdateRequestSerializer, responses={ status.HTTP_201_CREATED: OpenApiResponse( response=ScheduleResponseSerializer, description=( "Signals the successful creation of the schedule and of the projected " "timeslots." ), ), status.HTTP_202_ACCEPTED: OpenApiResponse( response=ScheduleDryRunResponseSerializer, description=( "Returns the list of timeslots that would be created, updated and deleted if " "the schedule request would not have been sent with the dryrun flag." ), ), status.HTTP_400_BAD_REQUEST: OpenApiResponse( response=ErrorSerializer(many=True), description=dedent( """ Returned in case the request contained invalid data. This may happen if: * the last date is before the start date (`no-start-after-end`), in which case you should correct either the start or until date. * The start and last date are the same (`no-same-day-start-and-end`). This is only allowed for single timeslots with the recurrence rule set to `once`. You should fix either the start or until date. * The number of conflicts and solutions aren’t the same (`one-solution-per-conflict`). Only one solution is allowed per conflict, so you either offered too many or not enough solutions for any reported conflicts. * The referenced recurrence rule does not exist. """ ), ), status.HTTP_403_FORBIDDEN: OpenApiResponse( response=ErrorSerializer, description=( "Returned in case the request contained no or invalid authenticated data " "or the authenticated user does not have authorization to perform the " "requested operation." ), ), status.HTTP_409_CONFLICT: OpenApiResponse( response=ScheduleConflictResponseSerializer, description=dedent( """ Returns the list of projected timeslots and any collisions that may have been found for existing timeslots. Errors on projected timeslots may include: * 'This change on the timeslot is not allowed.' When adding: There was a change in the schedule's data during conflict resolution. When updating: Fields 'start', 'end', 'by_weekday' or 'rrule' have changed, which is not allowed. * 'No solution given': No solution was provided for the conflict in `solutions`. Provide a value of `solution_choices`. * 'Given solution is not accepted for this conflict.': The solution has a value which is not part of `solution_choices`. Provide a value of `solution_choices` (at least `ours` or `theirs`). """ ), ), }, ), retrieve=extend_schema(summary="Retrieve a single schedule."), update=extend_schema( summary="Update an existing schedule.", request=ScheduleCreateUpdateRequestSerializer, ), partial_update=extend_schema( summary="Partially update an existing schedule.", request=ScheduleCreateUpdateRequestSerializer, ), destroy=extend_schema(summary="Delete an existing schedule."), list=extend_schema(summary="List all schedules."), ) class APIScheduleViewSet( DisabledObjectPermissionCheckMixin, NestedObjectFinderMixin, viewsets.ModelViewSet, ): ROUTE_FILTER_LOOKUPS = { "show_pk": "show", } filterset_class = filters.ScheduleFilterSet pagination_class = LimitOffsetPagination queryset = Schedule.objects.all() serializer_class = ScheduleSerializer def get_serializer_class(self): if self.action in ("create", "update", "partial_update"): return ScheduleCreateUpdateRequestSerializer return super().get_serializer_class() def create(self, request, *args, **kwargs): """ Create a schedule, generate timeslots, test for collisions and resolve them (including notes). Note that creating or updating a schedule is the only way to create timeslots. The projected timeslots defined by the schedule are matched against existing timeslots. The API will return an object that contains * the schedule's data, * projected timeslots, * detected collisions, * and possible solutions. As long as no `solutions` object has been set or unresolved collisions exist, no data is written to the database. A schedule is only created if at least one timeslot was generated by it. In order to resolve any possible conflicts, the client must submit a new request with a solution for each conflict. Possible solutions are listed as part of the projected timeslot in the `solution_choices` array. In a best-case scenario with no detected conflicts an empty solutions object will suffice. For more details on the individual types of solutions see the SolutionChoicesEnum. **Please note**: If there's more than one collision for a projected timeslot, only `theirs` and `ours` are currently supported as solutions. """ pk, show_pk = get_values(self.kwargs, "pk", "show_pk") if show_pk is None: show_pk = request.data.get("schedule").get("show_id") # FIXME: this is wrong if show_pk == 0: show_pk = request.data.get("show_id") # Only allow creating when calling /shows/{show_pk}/schedules/ and with ehe `schedule` JSON # object if show_pk is None or "schedule" not in request.data: return Response(status=status.HTTP_400_BAD_REQUEST) try: resolution = resolve_conflicts(request.data, pk, show_pk) # FIXME: Find a better way to do this. # The exception is thrown by the instantiate_upcoming_schedule function. except RRule.DoesNotExist as exc: return JsonResponse({"rruleId": str(exc)}, status=status.HTTP_400_BAD_REQUEST) except ScheduleConflictError as exc: return Response(exc.conflicts, status.HTTP_409_CONFLICT) if all(key in resolution for key in ["create", "update", "delete"]): # this is a dry-run return Response(resolution, status=status.HTTP_202_ACCEPTED) return Response(resolution, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): """ Update a schedule, generate timeslots, test for collisions and resolve them including notes. """ if not request.user.is_superuser: return Response(status=status.HTTP_401_UNAUTHORIZED) # Only allow updating when with the `schedule` JSON object if "schedule" not in request.data: return Response(status=status.HTTP_400_BAD_REQUEST) schedule = self.get_object() # If default playlist id or repetition are given, just update if default_playlist_id := request.data.get("schedule").get("default_playlist_id"): schedule.default_playlist_id = int(default_playlist_id) schedule.save() serializer = ScheduleSerializer(schedule) return Response(serializer.data) if is_repetition := request.data.get("schedule").get("is_repetition"): schedule.is_repetition = bool(is_repetition) schedule.save() serializer = ScheduleSerializer(schedule) return Response(serializer.data) try: resolution = resolve_conflicts(request.data, schedule.pk, schedule.show.pk) except ScheduleConflictError as exc: return Response(exc.conflicts, status.HTTP_409_CONFLICT) return Response(resolution) # TODO: Create is currently not implemented because timeslots are supposed to be inserted # by creating or updating a schedule. # There might be a use case for adding a single timeslot without any conflicts though. @extend_schema_view( retrieve=extend_schema(summary="Retrieve a single timeslot."), update=extend_schema(summary="Update an existing timeslot."), partial_update=extend_schema(summary="Partially update an existing timeslot."), destroy=extend_schema(summary="Delete an existing timeslot."), list=extend_schema( summary="List all timeslots.", description=dedent( """ By default, only timeslots ranging from now + 60 days will be displayed. You may override this default overriding start and/or end parameter. """ ), ), ) class APITimeSlotViewSet( DisabledObjectPermissionCheckMixin, NestedObjectFinderMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet, ): ROUTE_FILTER_LOOKUPS = { "show_pk": "schedule__show", "schedule_pk": "schedule", } filterset_class = filters.TimeSlotFilterSet pagination_class = LimitOffsetPagination queryset = TimeSlot.objects.all().order_by("-start") serializer_class = TimeSlotSerializer def update(self, request, *args, **kwargs): show_pk = get_values(self.kwargs, "show_pk") timeslot = self.get_object() serializer = TimeSlotSerializer(timeslot, data=request.data) if serializer.is_valid(raise_exception=True): serializer.save() # Return the next repetition # We do this because the Dashboard needs to update the repetition timeslot as well # but with another playlist containing the recording instead of the original playlist if first_repetition := TimeSlot.objects.filter( schedule__show=show_pk, start__gt=timeslot.start, repetition_of=timeslot ).first(): serializer = TimeSlotSerializer(first_repetition) return Response(serializer.data) # ...or nothing if there isn't one return Response(serializer.data, status=status.HTTP_200_OK) @extend_schema_view( create=extend_schema(summary="Create a new note."), retrieve=extend_schema(summary="Retrieve a single note."), update=extend_schema(summary="Update an existing note."), partial_update=extend_schema( summary="Partially update an existing note.", description="Only admins can partially update existing notes.", ), destroy=extend_schema(summary="Delete an existing note."), list=extend_schema(summary="List all notes."), ) class APINoteViewSet( DisabledObjectPermissionCheckMixin, NestedObjectFinderMixin, viewsets.ModelViewSet, ): ROUTE_FILTER_LOOKUPS = { "show_pk": "timeslot__show", "timeslot_pk": "timeslot", } filterset_class = filters.NoteFilterSet pagination_class = LimitOffsetPagination serializer_class = NoteSerializer def get_queryset(self): """The queryset contains all the notes if the method is safe or the requesting user is a superuser, otherwise it only contains the notes for shows owned by the requesting user.""" user = self.request.user if self.request.method in permissions.SAFE_METHODS or user.has_perm("program.update_note"): return Note.objects.all() else: return Note.objects.filter(timeslot__schedule__show__owners=user) class ActiveFilterMixin: filter_class = filters.ActiveFilterSet @extend_schema_view( create=extend_schema(summary="Create a new category."), retrieve=extend_schema(summary="Retrieve a single category."), update=extend_schema(summary="Update an existing category."), partial_update=extend_schema(summary="Partially update an existing category."), destroy=extend_schema(summary="Delete an existing category."), list=extend_schema(summary="List all categories."), ) class APICategoryViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = Category.objects.all() serializer_class = CategorySerializer @extend_schema_view( create=extend_schema(summary="Create a new type."), retrieve=extend_schema(summary="Retrieve a single type."), update=extend_schema(summary="Update an existing type."), partial_update=extend_schema(summary="Partially update an existing type."), destroy=extend_schema(summary="Delete an existing type."), list=extend_schema(summary="List all types."), ) class APITypeViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = Type.objects.all() serializer_class = TypeSerializer @extend_schema_view( create=extend_schema(summary="Create a new topic."), retrieve=extend_schema(summary="Retrieve a single topic."), update=extend_schema(summary="Update an existing topic."), partial_update=extend_schema(summary="Partially update an existing topic."), destroy=extend_schema(summary="Delete an existing topic."), list=extend_schema(summary="List all topics."), ) class APITopicViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = Topic.objects.all() serializer_class = TopicSerializer @extend_schema_view( create=extend_schema(summary="Create a new music focus."), retrieve=extend_schema(summary="Retrieve a single music focus."), update=extend_schema(summary="Update an existing music focus."), partial_update=extend_schema(summary="Partially update an existing music focus."), destroy=extend_schema(summary="Delete an existing music focus."), list=extend_schema(summary="List all music focuses."), ) class APIMusicFocusViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = MusicFocus.objects.all() serializer_class = MusicFocusSerializer @extend_schema_view( create=extend_schema(summary="Create a new funding category."), retrieve=extend_schema(summary="Retrieve a single funding category."), update=extend_schema(summary="Update an existing funding category."), partial_update=extend_schema(summary="Partially update an existing funding category."), destroy=extend_schema(summary="Delete an existing funding category."), list=extend_schema(summary="List all funding categories."), ) class APIFundingCategoryViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = FundingCategory.objects.all() serializer_class = FundingCategorySerializer @extend_schema_view( create=extend_schema(summary="Create a new language."), retrieve=extend_schema(summary="Retrieve a single language."), update=extend_schema(summary="Update an existing language."), partial_update=extend_schema(summary="Partially update an existing language."), destroy=extend_schema(summary="Delete an existing language."), list=extend_schema(summary="List all languages."), ) class APILanguageViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = Language.objects.all() serializer_class = LanguageSerializer @extend_schema_view( create=extend_schema(summary="Create a new host."), retrieve=extend_schema(summary="Retrieve a single host."), update=extend_schema(summary="Update an existing host."), partial_update=extend_schema(summary="Partially update an existing host."), destroy=extend_schema(summary="Delete an existing host."), list=extend_schema(summary="List all hosts."), ) class APIHostViewSet(ActiveFilterMixin, viewsets.ModelViewSet): queryset = Host.objects.all() serializer_class = HostSerializer pagination_class = LimitOffsetPagination def create(self, request, *args, **kwargs): serializer = HostSerializer( # FIXME: the method get_serializer_context should be used but it does seem to get lost context={"request": request}, # the serializer needs the request in the context data=request.data, ) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): partial = kwargs.get("partial", False) host = self.get_object() serializer = HostSerializer( # FIXME: the method get_serializer_context should be used but it does seem to get lost context={"request": request}, # the serializer needs the request in the context data=request.data, instance=host, partial=partial, ) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status=status.HTTP_200_OK) def partial_update(self, request, *args, **kwargs): kwargs["partial"] = True return self.update(request, *args, **kwargs) @extend_schema_view( create=extend_schema(summary="Create a new link type."), retrieve=extend_schema(summary="Retrieve a single link type."), update=extend_schema(summary="Update an existing link type."), partial_update=extend_schema(summary="Partially update an existing link type."), destroy=extend_schema(summary="Delete an existing link type."), list=extend_schema(summary="List all link types."), ) class APILinkTypeViewSet(viewsets.ModelViewSet): queryset = LinkType.objects.all() serializer_class = LinkTypeSerializer def destroy(self, request, *args, **kwargs): """Destroying a link type just makes is inactive. This is needed to preserve all the links that reference a link type.""" link_type = self.get_object() link_type.is_active = False link_type.save() return Response(status=status.HTTP_204_NO_CONTENT) @extend_schema_view( create=extend_schema(summary="Create a new license type."), retrieve=extend_schema(summary="Retrieve a single license type."), update=extend_schema(summary="Update an existing license type."), partial_update=extend_schema(summary="Partially update an existing license type."), destroy=extend_schema(summary="Delete an existing license type."), list=extend_schema(summary="List all license types."), ) class APILicenseViewSet(viewsets.ModelViewSet): queryset = License.objects.all() serializer_class = LicenseSerializer