Newer
Older
#
# steering, Programme/schedule management for AURA
#
# Copyright (C) 2011-2017, 2020, Ernesto Rico Schmidt
# Copyright (C) 2017-2019, Ingo Leindecker
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from datetime import date, datetime, time, timedelta
from itertools import pairwise
from textwrap import dedent
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import OpenApiResponse, extend_schema, extend_schema_view
from rest_framework import filters as drf_filters
from rest_framework import mixins, permissions, status, viewsets
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.response import Response
from django.contrib.auth.models import User
from django.db import IntegrityError
from django.http import HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.utils.translation import gettext as _
from program import filters
from program.models import (
Category,
FundingCategory,
Host,
ScheduleConflictResponseSerializer,
ScheduleCreateUpdateRequestSerializer,
ScheduleDryRunResponseSerializer,
ScheduleResponseSerializer,
from program.services import get_timerange_timeslots, resolve_conflicts
from program.utils import (
DisabledObjectPermissionCheckMixin,
NestedObjectFinderMixin,
get_values,
parse_date,
)
logger = logging.getLogger(__name__)

Ernesto Rico Schmidt
committed
def timeslot_entry(*, timeslot: TimeSlot) -> dict:
"""return a timeslot entry as a dict"""
schedule = timeslot.schedule
show = timeslot.schedule.show
playlist_id = timeslot.playlist_id
title = show_name = f"{show.name} {_('REP')}" if schedule.is_repetition else show.name
# we start and end as timezone naive datetime objects
start = timezone.make_naive(timeslot.start).strftime("%Y-%m-%dT%H:%M:%S")
end = timezone.make_naive(timeslot.end).strftime("%Y-%m-%dT%H:%M:%S")
"id": timeslot.id,
"playlistId": playlist_id,
# `Timeslot.repetition_of` is a foreign key that can be null
"repetitionOfId": timeslot.repetition_of.id if timeslot.repetition_of else None,
"scheduleDefaultPlaylistId": schedule.default_playlist_id,
"scheduleId": schedule.id,
"showCategories": ", ".join(show.category.values_list("name", flat=True)),
"showDefaultPlaylistId": show.default_playlist_id,
# `Show.funding_category` is a foreign key can be null
"showFundingCategory": show.funding_category.name if show.funding_category_id else "",
"showHosts": ", ".join(show.hosts.values_list("name", flat=True)),
"showId": show.id,
"showLanguages": ", ".join(show.language.values_list("name", flat=True)),
"showMusicFocus": ", ".join(show.music_focus.values_list("name", flat=True)),
"showName": show_name,
"showTopics": ", ".join(show.topic.values_list("name", flat=True)),
# `Show.type` is a foreign key that can be null
"showType": show.type.name if show.type_id else "",
"start": start,
"title": title,
}
def gap_entry(*, gap_start: datetime, gap_end: datetime) -> dict:
"""return a virtual timeslot to fill the gap in between `gap_start` and `gap_end` as a dict"""
return {
"end": gap_end.strftime("%Y-%m-%dT%H:%M:%S"),
"start": gap_start.strftime("%Y-%m-%dT%H:%M:%S"),
"virtual": True,
}
def json_day_schedule(request, year=None, month=None, day=None):
# datetime.combine returns a timezone naive datetime object
if year is None and month is None and day is None:
start = timezone.make_aware(datetime.combine(timezone.now(), time(0, 0)))
start = timezone.make_aware(datetime.combine(date(year, month, day), time(0, 0)))
end = start + timedelta(hours=24)
timeslots = get_timerange_timeslots(start, end).select_related("schedule")
"start": ts.start.strftime("%Y-%m-%d_%H:%M:%S"),
"end": ts.end.strftime("%Y-%m-%d_%H:%M:%S"),
"title": ts.show.name,
"id": ts.show.id,
return HttpResponse(
json.dumps(schedule, ensure_ascii=False).encode("utf8"),
content_type="application/json; charset=utf-8",
)
def json_playout(request):
"""
Return a JSON representation of the scheduled playout.
Expects GET parameters `start` (date), `end` (date), and `includeVirtual` (boolean).
- `start` is today by default.
- `end` is one week after the start date by default.
- `includeVirtual` is false by default.
The schedule will include virtual timeslots to fill unscheduled gaps if requested.
Called by
- engine (playout) to retrieve timeslots within a given timerange
- internal calendar to retrieve all timeslots for a week
"""
# datetime.now and datetime.combine return timezone naive datetime objects
schedule_start = timezone.make_aware(datetime.combine(datetime.now(), time(0, 0)))
else:
schedule_start = timezone.make_aware(
datetime.combine(parse_date(request.GET.get("start")), time(0, 0))
)
schedule_end = schedule_start + timedelta(days=7)
else:
schedule_end = timezone.make_aware(
datetime.combine(parse_date(request.GET.get("end")) + timedelta(days=1), time(0, 0))
include_virtual = request.GET.get("include_virtual") == "true"
timeslots = get_timerange_timeslots(schedule_start, schedule_end).select_related("schedule")
schedule = []
first_timeslot = timeslots.first()
if include_virtual and first_timeslot.start > schedule_start:
schedule.append(gap_entry(gap_start=schedule_start, gap_end=first_timeslot.start))
for current, upcoming in pairwise(timeslots):
schedule.append(timeslot_entry(timeslot=current))
if include_virtual and current.end != upcoming.start:
schedule.append(gap_entry(gap_start=current.end, gap_end=upcoming.start))
last_timeslot = timeslots.last()
# we need to append the last timeslot to the schedule to complete it
if last_timeslot:
schedule.append(timeslot_entry(timeslot=last_timeslot))
if include_virtual and last_timeslot.end < schedule_end:
schedule.append(gap_entry(gap_start=last_timeslot.end, gap_end=schedule_end))
return HttpResponse(
json.dumps(schedule, ensure_ascii=False).encode("utf8"),
content_type="application/json; charset=utf-8",
)
@extend_schema_view(
create=extend_schema(summary="Create a new user."),
retrieve=extend_schema(
summary="Retrieve a single user.",
description="Non-admin users may only retrieve their own user record.",
),
update=extend_schema(
summary="Update an existing user.",
description="Non-admin users may only update their own user record.",
),
partial_update=extend_schema(
summary="Partially update an existing user.",
description="Non-admin users may only update their own user record.",
),
list=extend_schema(
summary="List all users.",
description=(
"The returned list of records will only contain a single record "
"for non-admin users which is their own user account."
),
),
)
DisabledObjectPermissionCheckMixin,
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet,
):
serializer_class = UserSerializer
filter_backends = [drf_filters.SearchFilter]
search_fields = ["username", "first_name", "last_name", "email"]
"""The queryset is empty if the user is not authenticated, contains all the users if the
method is safe or the requesting user is a superuser, otherwise it only contains the
requesting user."""
user = self.request.user
if not user.is_authenticated:
return User.objects.none()
elif self.request.method in permissions.SAFE_METHODS or user.is_superuser:
return User.objects.all()
else:
return User.objects.filter(pk=user.id)
serializer = UserSerializer(
# FIXME: the method get_serializer_context should be used but it does seem to get lost
context={"request": request}, # the serializer needs the request in the context
if serializer.is_valid(raise_exception=True):
return Response(serializer.data, status=status.HTTP_201_CREATED)
@extend_schema_view(
create=extend_schema(summary="Create a new image."),
destroy=extend_schema(summary="Delete an existing image."),
list=extend_schema(summary="List all images."),
partial_update=extend_schema(
summary="Partially update an existing image.",
description="Only `alt_text`, `credits`, and `ppoi` can be updated.",
),
retrieve=extend_schema(summary="Retrieve a single image."),
update=extend_schema(
summary="Update an existing image.",
description="Only `alt_text`, `credits`, and `ppoi` can be updated.",
),
)
class APIImageViewSet(viewsets.ModelViewSet):
serializer_class = ImageSerializer
pagination_class = LimitOffsetPagination
permission_classes = [permissions.IsAuthenticatedOrReadOnly]

Ernesto Rico Schmidt
committed
"""The queryset contains all the images if the method is safe, otherwise it only contains
the images owned by the requesting user."""
if self.request.method in permissions.SAFE_METHODS:
return Image.objects.all()
else:
return Image.objects.filter(owner=self.request.user.username)
def create(self, request, *args, **kwargs):
"""Create an Image instance. Any user can create an image."""
serializer = ImageSerializer(
context={"request": request}, # the serializer needs the request in the context
data=request.data,
)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
"""Update an Image instance. Only the creator can update an image."""
image = self.get_object()
serializer = ImageSerializer(
image,
context={"request": request}, # the serializer needs the request in the context
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data)
def destroy(self, request, *args, **kwargs):
"""Destroy an Image instance. Only the owner can delete an image."""
image = self.get_object()
image.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
@extend_schema_view(
create=extend_schema(summary="Create a new show."),
retrieve=extend_schema(summary="Retrieve a single show."),
update=extend_schema(summary="Update an existing show."),
partial_update=extend_schema(summary="Partially update an existing show."),
destroy=extend_schema(summary="Delete an existing show."),
list=extend_schema(summary="List all shows."),
)
class APIShowViewSet(DisabledObjectPermissionCheckMixin, viewsets.ModelViewSet):
queryset = Show.objects.all()
serializer_class = ShowSerializer
pagination_class = LimitOffsetPagination
filter_backends = [DjangoFilterBackend, drf_filters.SearchFilter]
filterset_class = filters.ShowFilterSet
search_fields = ["name", "slug", "short_description", "description"]
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
lookup_arg = self.kwargs[lookup_url_kwarg]
# allow object retrieval through id or slug
try:
filter_kwargs = {self.lookup_field: int(lookup_arg)}
except ValueError:
filter_kwargs = {"slug": lookup_arg}
obj = get_object_or_404(queryset, **filter_kwargs)
self.check_object_permissions(self.request, obj)
return obj
serializer = ShowSerializer(
# FIXME: the method get_serializer_context should be used but it does seem to get lost
context={"request": request}, # the serializer needs the request in the context
data=request.data,
)
if serializer.is_valid(raise_exception=True):
try:
serializer.save()
except IntegrityError:
data = {
"slug": {"code": "unique", "message": "show with this slug already exists."}
}
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.data, status=status.HTTP_201_CREATED)
partial = kwargs.get("partial", False)
show = self.get_object()
serializer = ShowSerializer(
# FIXME: the method get_serializer_context should be used but it does seem to get lost
context={"request": request}, # the serializer needs the request in the context
data=request.data,
instance=show,
partial=partial,
)
if serializer.is_valid(raise_exception=True):
try:
serializer.save()
return Response(serializer.data)
except IntegrityError:
data = {
"slug": {"code": "unique", "message": "show with this slug already exists."}
}
return Response(data, status=status.HTTP_400_BAD_REQUEST)
def partial_update(self, request, *args, **kwargs):
kwargs["partial"] = True
return self.update(request, *args, **kwargs)
@extend_schema_view(
retrieve=extend_schema(summary="Retrieve a single rrule."),
list=extend_schema(summary="List all rrule."),
)
class APIRRuleViewSet(viewsets.ModelViewSet):
queryset = RRule.objects.all()
serializer_class = RRuleSerializer
@extend_schema_view(
create=extend_schema(
summary="Create a new schedule.",
request=ScheduleCreateUpdateRequestSerializer,
responses={
status.HTTP_201_CREATED: OpenApiResponse(
response=ScheduleResponseSerializer,
description=(
"Signals the successful creation of the schedule and of the projected "
"timeslots."
),
),
status.HTTP_202_ACCEPTED: OpenApiResponse(
response=ScheduleDryRunResponseSerializer,
description=(
"Returns the list of timeslots that would be created, updated and deleted if "
"the schedule request would not have been sent with the dryrun flag."
),
),
status.HTTP_400_BAD_REQUEST: OpenApiResponse(
response=ErrorSerializer(many=True),
description=dedent(
"""
Returned in case the request contained invalid data.
This may happen if:
* the last date is before the start date (`no-start-after-end`),
in which case you should correct either the start or until date.
* The start and last date are the same (`no-same-day-start-and-end`).
This is only allowed for single timeslots with the recurrence rule
set to `once`. You should fix either the start or until date.
* The number of conflicts and solutions aren’t the same
(`one-solution-per-conflict`). Only one solution is allowed per conflict,
so you either offered too many or not enough solutions for any reported
conflicts.
* The referenced recurrence rule does not exist.
"""
),
),
status.HTTP_403_FORBIDDEN: OpenApiResponse(
response=ErrorSerializer,
description=(
"Returned in case the request contained no or invalid authenticated data "
"or the authenticated user does not have authorization to perform the "
"requested operation."
),
),
status.HTTP_409_CONFLICT: OpenApiResponse(
response=ScheduleConflictResponseSerializer,
description=dedent(
"""
Returns the list of projected timeslots and any collisions that may have
been found for existing timeslots.
Errors on projected timeslots may include:
* 'This change on the timeslot is not allowed.'
When adding: There was a change in the schedule's data during conflict
resolution.
When updating: Fields 'start', 'end', 'by_weekday' or 'rrule' have changed,
which is not allowed.
* 'No solution given': No solution was provided for the conflict in
`solutions`. Provide a value of `solution_choices`.
* 'Given solution is not accepted for this conflict.':
The solution has a value which is not part of `solution_choices`.
Provide a value of `solution_choices` (at least `ours` or `theirs`).
"""
),
),
},
),
retrieve=extend_schema(summary="Retrieve a single schedule."),
update=extend_schema(
summary="Update an existing schedule.",
request=ScheduleCreateUpdateRequestSerializer,
),
partial_update=extend_schema(
summary="Partially update an existing schedule.",
request=ScheduleCreateUpdateRequestSerializer,
),
destroy=extend_schema(summary="Delete an existing schedule."),
list=extend_schema(summary="List all schedules."),
)
class APIScheduleViewSet(
DisabledObjectPermissionCheckMixin,
NestedObjectFinderMixin,
viewsets.ModelViewSet,
):
ROUTE_FILTER_LOOKUPS = {
"show_pk": "show",
}

Ernesto Rico Schmidt
committed
filterset_class = filters.ScheduleFilterSet
pagination_class = LimitOffsetPagination
queryset = Schedule.objects.all()
serializer_class = ScheduleSerializer
def get_serializer_class(self):
if self.action in ("create", "update", "partial_update"):
return ScheduleCreateUpdateRequestSerializer
return super().get_serializer_class()
Create a schedule, generate timeslots, test for collisions and resolve them
(including notes).
Note that creating or updating a schedule is the only way to create timeslots.
The projected timeslots defined by the schedule are matched against existing
timeslots. The API will return an object that contains
* the schedule's data,
* projected timeslots,
* detected collisions,
* and possible solutions.
As long as no `solutions` object has been set or unresolved collisions exist,
no data is written to the database. A schedule is only created if at least
one timeslot was generated by it.
In order to resolve any possible conflicts, the client must submit a new request with
a solution for each conflict. Possible solutions are listed as part of the projected
timeslot in the `solution_choices` array. In a best-case scenario with no detected
conflicts an empty solutions object will suffice. For more details on the individual
types of solutions see the SolutionChoicesEnum.
**Please note**:
If there's more than one collision for a projected timeslot, only `theirs` and `ours`
are currently supported as solutions.
pk, show_pk = get_values(self.kwargs, "pk", "show_pk")
if show_pk is None:
show_pk = request.data.get("schedule").get("show_id")
# FIXME: this is wrong
if show_pk == 0:
show_pk = request.data.get("show_id")
# Only allow creating when calling /shows/{show_pk}/schedules/ and with ehe `schedule` JSON
# object
if show_pk is None or "schedule" not in request.data:
return Response(status=status.HTTP_400_BAD_REQUEST)
resolution = resolve_conflicts(request.data, pk, show_pk)
# FIXME: Find a better way to do this.
# The exception is thrown by the instantiate_upcoming_schedule function.
except RRule.DoesNotExist as exc:
return JsonResponse({"rruleId": str(exc)}, status=status.HTTP_400_BAD_REQUEST)
except ScheduleConflictError as exc:
return Response(exc.conflicts, status.HTTP_409_CONFLICT)
if all(key in resolution for key in ["create", "update", "delete"]):
# this is a dry-run
return Response(resolution, status=status.HTTP_202_ACCEPTED)
return Response(resolution, status=status.HTTP_201_CREATED)
Update a schedule, generate timeslots, test for collisions and resolve
them including notes.
if not request.user.is_superuser:
return Response(status=status.HTTP_401_UNAUTHORIZED)
# Only allow updating when with the `schedule` JSON object
if "schedule" not in request.data:
return Response(status=status.HTTP_400_BAD_REQUEST)
schedule = self.get_object()
# If default playlist id or repetition are given, just update
if default_playlist_id := request.data.get("schedule").get("default_playlist_id"):
schedule.default_playlist_id = int(default_playlist_id)
schedule.save()
serializer = ScheduleSerializer(schedule)
return Response(serializer.data)
if is_repetition := request.data.get("schedule").get("is_repetition"):
schedule.is_repetition = bool(is_repetition)
schedule.save()
serializer = ScheduleSerializer(schedule)
return Response(serializer.data)
resolution = resolve_conflicts(request.data, schedule.pk, schedule.show.pk)
except ScheduleConflictError as exc:
return Response(exc.conflicts, status.HTTP_409_CONFLICT)
return Response(resolution)
# TODO: Create is currently not implemented because timeslots are supposed to be inserted
# by creating or updating a schedule.
# There might be a use case for adding a single timeslot without any conflicts though.
@extend_schema_view(
retrieve=extend_schema(summary="Retrieve a single timeslot."),
update=extend_schema(summary="Update an existing timeslot."),
partial_update=extend_schema(summary="Partially update an existing timeslot."),
destroy=extend_schema(summary="Delete an existing timeslot."),
list=extend_schema(
summary="List all timeslots.",
description=dedent(
"""
By default, only timeslots ranging from now + 60 days will be displayed.
You may override this default overriding start and/or end parameter.
"""
),
),
)
class APITimeSlotViewSet(
DisabledObjectPermissionCheckMixin,
NestedObjectFinderMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet,
):
ROUTE_FILTER_LOOKUPS = {
"show_pk": "schedule__show",
"schedule_pk": "schedule",
}

Ernesto Rico Schmidt
committed
filterset_class = filters.TimeSlotFilterSet
pagination_class = LimitOffsetPagination
queryset = TimeSlot.objects.all().order_by("-start")

Ernesto Rico Schmidt
committed
serializer_class = TimeSlotSerializer
show_pk = get_values(self.kwargs, "show_pk")
timeslot = self.get_object()
serializer = TimeSlotSerializer(timeslot, data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
# Return the next repetition
# We do this because the Dashboard needs to update the repetition timeslot as well
# but with another playlist containing the recording instead of the original playlist
if first_repetition := TimeSlot.objects.filter(
schedule__show=show_pk, start__gt=timeslot.start, repetition_of=timeslot
).first():
serializer = TimeSlotSerializer(first_repetition)
# ...or nothing if there isn't one
return Response(serializer.data, status=status.HTTP_200_OK)
@extend_schema_view(
create=extend_schema(summary="Create a new note."),
retrieve=extend_schema(summary="Retrieve a single note."),
update=extend_schema(summary="Update an existing note."),
partial_update=extend_schema(
summary="Partially update an existing note.",
description="Only admins can partially update existing notes.",
),
destroy=extend_schema(summary="Delete an existing note."),
list=extend_schema(summary="List all notes."),
)
class APINoteViewSet(
DisabledObjectPermissionCheckMixin,
NestedObjectFinderMixin,
viewsets.ModelViewSet,
):
ROUTE_FILTER_LOOKUPS = {
"show_pk": "timeslot__show",
"timeslot_pk": "timeslot",
}

Ernesto Rico Schmidt
committed
filterset_class = filters.NoteFilterSet
pagination_class = LimitOffsetPagination
serializer_class = NoteSerializer
def get_queryset(self):

Ernesto Rico Schmidt
committed
"""The queryset contains all the notes if the method is safe or the requesting user is a
superuser, otherwise it only contains the notes for shows owned by the requesting user."""

Ernesto Rico Schmidt
committed
user = self.request.user

Ernesto Rico Schmidt
committed
if self.request.method in permissions.SAFE_METHODS or user.has_perm("program.update_note"):
return Note.objects.all()
else:
return Note.objects.filter(timeslot__schedule__show__owners=user)
class ActiveFilterMixin:
filter_class = filters.ActiveFilterSet
@extend_schema_view(
create=extend_schema(summary="Create a new category."),
retrieve=extend_schema(summary="Retrieve a single category."),
update=extend_schema(summary="Update an existing category."),
partial_update=extend_schema(summary="Partially update an existing category."),
destroy=extend_schema(summary="Delete an existing category."),
list=extend_schema(summary="List all categories."),
)
class APICategoryViewSet(ActiveFilterMixin, viewsets.ModelViewSet):

Ingo Leindecker
committed
queryset = Category.objects.all()
serializer_class = CategorySerializer
@extend_schema_view(
create=extend_schema(summary="Create a new type."),
retrieve=extend_schema(summary="Retrieve a single type."),
update=extend_schema(summary="Update an existing type."),
partial_update=extend_schema(summary="Partially update an existing type."),
destroy=extend_schema(summary="Delete an existing type."),
list=extend_schema(summary="List all types."),
)
class APITypeViewSet(ActiveFilterMixin, viewsets.ModelViewSet):

Ingo Leindecker
committed
queryset = Type.objects.all()
serializer_class = TypeSerializer
@extend_schema_view(
create=extend_schema(summary="Create a new topic."),
retrieve=extend_schema(summary="Retrieve a single topic."),
update=extend_schema(summary="Update an existing topic."),
partial_update=extend_schema(summary="Partially update an existing topic."),
destroy=extend_schema(summary="Delete an existing topic."),
list=extend_schema(summary="List all topics."),
)
class APITopicViewSet(ActiveFilterMixin, viewsets.ModelViewSet):

Ingo Leindecker
committed
queryset = Topic.objects.all()
serializer_class = TopicSerializer

Ingo Leindecker
committed
@extend_schema_view(
create=extend_schema(summary="Create a new music focus."),
retrieve=extend_schema(summary="Retrieve a single music focus."),
update=extend_schema(summary="Update an existing music focus."),
partial_update=extend_schema(summary="Partially update an existing music focus."),
destroy=extend_schema(summary="Delete an existing music focus."),
list=extend_schema(summary="List all music focuses."),
)
class APIMusicFocusViewSet(ActiveFilterMixin, viewsets.ModelViewSet):

Ingo Leindecker
committed
queryset = MusicFocus.objects.all()
serializer_class = MusicFocusSerializer

Ingo Leindecker
committed
@extend_schema_view(
create=extend_schema(summary="Create a new funding category."),
retrieve=extend_schema(summary="Retrieve a single funding category."),
update=extend_schema(summary="Update an existing funding category."),
partial_update=extend_schema(summary="Partially update an existing funding category."),
destroy=extend_schema(summary="Delete an existing funding category."),
list=extend_schema(summary="List all funding categories."),
)
class APIFundingCategoryViewSet(ActiveFilterMixin, viewsets.ModelViewSet):
queryset = FundingCategory.objects.all()
serializer_class = FundingCategorySerializer
@extend_schema_view(
create=extend_schema(summary="Create a new language."),
retrieve=extend_schema(summary="Retrieve a single language."),
update=extend_schema(summary="Update an existing language."),
partial_update=extend_schema(summary="Partially update an existing language."),
destroy=extend_schema(summary="Delete an existing language."),
list=extend_schema(summary="List all languages."),
)
class APILanguageViewSet(ActiveFilterMixin, viewsets.ModelViewSet):

Ingo Leindecker
committed
queryset = Language.objects.all()
serializer_class = LanguageSerializer
@extend_schema_view(
create=extend_schema(summary="Create a new host."),
retrieve=extend_schema(summary="Retrieve a single host."),
update=extend_schema(summary="Update an existing host."),
partial_update=extend_schema(summary="Partially update an existing host."),
destroy=extend_schema(summary="Delete an existing host."),
list=extend_schema(summary="List all hosts."),
)
class APIHostViewSet(ActiveFilterMixin, viewsets.ModelViewSet):

Ingo Leindecker
committed
queryset = Host.objects.all()
serializer_class = HostSerializer
pagination_class = LimitOffsetPagination

Ernesto Rico Schmidt
committed
def create(self, request, *args, **kwargs):
serializer = HostSerializer(
# FIXME: the method get_serializer_context should be used but it does seem to get lost
context={"request": request}, # the serializer needs the request in the context
data=request.data,
)
if serializer.is_valid(raise_exception=True):

Ernesto Rico Schmidt
committed
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
partial = kwargs.get("partial", False)
host = self.get_object()
serializer = HostSerializer(
# FIXME: the method get_serializer_context should be used but it does seem to get lost
context={"request": request}, # the serializer needs the request in the context
data=request.data,
instance=host,
partial=partial,
)
if serializer.is_valid(raise_exception=True):

Ernesto Rico Schmidt
committed
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
def partial_update(self, request, *args, **kwargs):
kwargs["partial"] = True
return self.update(request, *args, **kwargs)
@extend_schema_view(
create=extend_schema(summary="Create a new link type."),
retrieve=extend_schema(summary="Retrieve a single link type."),
update=extend_schema(summary="Update an existing link type."),
partial_update=extend_schema(summary="Partially update an existing link type."),
destroy=extend_schema(summary="Delete an existing link type."),
list=extend_schema(summary="List all link types."),
)
class APILinkTypeViewSet(viewsets.ModelViewSet):
queryset = LinkType.objects.all()
serializer_class = LinkTypeSerializer
def destroy(self, request, *args, **kwargs):
"""Destroying a link type just makes is inactive.
This is needed to preserve all the links that reference a link type."""
link_type = self.get_object()
link_type.is_active = False
link_type.save()
return Response(status=status.HTTP_204_NO_CONTENT)
@extend_schema_view(
create=extend_schema(summary="Create a new license type."),
retrieve=extend_schema(summary="Retrieve a single license type."),
update=extend_schema(summary="Update an existing license type."),
partial_update=extend_schema(summary="Partially update an existing license type."),
destroy=extend_schema(summary="Delete an existing license type."),
list=extend_schema(summary="List all license types."),
)
class APILicenseViewSet(viewsets.ModelViewSet):
queryset = License.objects.all()
serializer_class = LicenseSerializer