Newer
Older
import datetime
from django_filters import constants
from django_filters import rest_framework as filters
from django_filters import widgets
from django import forms
from django.db.models import Exists, OuterRef, Q, QuerySet
from django.utils import timezone
from program import models
from program.services import generate_program_entries
class StaticFilterHelpTextMixin:
@classmethod
def filter_for_field(cls, field, field_name, lookup_expr=None):
_filter = super().filter_for_field(field, field_name, lookup_expr)
if "help_text" not in _filter.extra:
help_texts = getattr(cls.Meta, "help_texts", {})
_filter.extra["help_text"] = help_texts.get(field_name, "")
return _filter
class StaticQueryBooleanFilter(filters.BooleanFilter):
def __init__(self, *args, query: Q | None = None, **kwargs):
if query is None:
raise ValueError("query must not be None or unset.")
self.query = query
super().__init__(*args, **kwargs)
def filter(self, qs, value):
if value is True:
return qs.filter(self.query)
if value is False:
return qs.filter(~self.query)
return qs
class IntegerInFilter(filters.BaseInFilter):
class QueryArrayWidget(widgets.QueryArrayWidget):
# see: https://github.com/carltongibson/django-filter/issues/1047
def value_from_datadict(self, data, files, name):
new_data = {}
for key in data.keys():
if len(data.getlist(key)) == 1 and "," in data[key]:
new_data[key] = data[key]
else:
new_data[key] = data.getlist(key)
return super().value_from_datadict(new_data, files, name)
field_class = forms.IntegerField
def __init__(self, *args, **kwargs):
kwargs.setdefault("widget", self.QueryArrayWidget())
super().__init__(*args, **kwargs)
class ShowOrderingFilter(filters.OrderingFilter):
def filter(self, qs: QuerySet, value):
if value in constants.EMPTY_VALUES:
return qs
ordering = [self.get_ordering_value(param) for param in value]
fields = (field.lstrip("-") for field in ordering)
if "is_owner" in fields:
_id = getattr(self.parent.request.user, "id", None)
qs = qs.annotate(
is_owner=Exists(
models.Show.owners.through.objects.filter(user=_id, show=OuterRef("pk")),
)
)
return qs.order_by(*ordering)
class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only shows matching the specified id(s).",
)
order = ShowOrderingFilter(
fields=["name", "slug", "id", "is_active", "is_owner", "updated_at", "updated_by"],
help_text="Order shows by the given field(s).",
)
category_ids = IntegerInFilter(
field_name="category",
help_text="Return only shows of the given category or categories.",
)
category_slug = filters.CharFilter(
field_name="category__slug",
help_text="Return only shows of the given category slug.",
help_text="Return only shows hosted by the given profile ID(s).",
field_name="is_active",
help_text="Return only currently active/inactive shows.",
is_writable = filters.BooleanFilter(
method="filter_writable",
help_text="Return only shows writable by the requesting authenticated user.",
)
field_name="is_public",
help_text="Return only shows that are public/non-public.",
)
language_ids = IntegerInFilter(
field_name="language",
help_text="Return only shows of the given language(s).",
field_name="music_focus",
help_text="Return only shows with given music focus(es).",
)
music_focus_slug = filters.CharFilter(
field_name="music_focus__slug",
help_text="Return only shows with the give music focus slug.",
field_name="owners",
help_text="Return only shows that belong to the given owner(s).",
)
topic_ids = IntegerInFilter(
field_name="topic",
help_text="Return only shows of the given topic(s).",
)
topic_slug = filters.CharFilter(
field_name="topic__slug",
help_text="Return only shows of the given topic slug.",
type_id = IntegerInFilter(
field_name="type",
help_text="Return only shows of a given type.",
)
type_slug = filters.CharFilter(
field_name="type__slug",
help_text="Return only shows of the given type slug.",
def filter_writable(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
user = self.request.user if self.request.user.is_authenticated else None
if value and user and (user.is_superuser or user.has_perm("program.update_show")):
return queryset
elif value and user:
return queryset.filter(owners=user)
else:
return models.Show.objects.none()
class Meta:
model = models.Show
fields = [
"category_ids",
"category_slug",
"host_ids",
"is_active",
"is_public",
"language_ids",
"music_focus_ids",
"music_focus_slug",
"owner_ids",
"topic_ids",
"topic_slug",
"type_id",
"type_slug",
class ScheduleFilterSet(filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only schedules matching the specified id(s).",
)
show_ids = IntegerInFilter(
field_name="show",
help_text="Return only schedules that belong to the specified show(s).",
)
exclude_inactive = filters.BooleanFilter(
method="filter_exclude_inactive",
help_text="Excludes all schedules that don’t have timeslots in the future.",
)
@staticmethod
def filter_exclude_inactive(queryset: QuerySet, _: str, value: bool):
if not value:
return queryset
return queryset.filter(
Exists(
models.TimeSlot.objects.filter(schedule=OuterRef("pk"), end__gte=timezone.now())
)
)
class TimeSlotFilterSet(filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only timeslots matching the specified id(s).",
)
order = filters.OrderingFilter(
fields=[field.name for field in models.TimeSlot._meta.get_fields()]
)
surrounding = filters.DateTimeFilter(
method="filter_surrounding",
label="Return surrounding timeslots",
help_text=(
"Returns the 10 nearest timeslots around the specified datetime. "
"If specified without a datetime value the current date and time is assumed."
),
)
starts_before = filters.DateTimeFilter(
field_name="start",
lookup_expr="lt",
help_text="Only returns timeslots that start before the specified datetime.",
)
starts_after = filters.DateTimeFilter(
field_name="start",
lookup_expr="gte",
help_text="Only returns timeslots that start at or after the specified datetime.",
ends_before = filters.DateTimeFilter(
field_name="end",
lookup_expr="lt",
help_text="Only returns timeslots that end before the specified datetime.",
ends_after = filters.DateTimeFilter(
field_name="end",
lookup_expr="gte",
help_text="Only returns timeslots that end at or after the specified datetime.",
)
field_name="schedule",
help_text="Return only timeslots that belong to the specified schedule(s).",
)
field_name="schedule__show",
help_text="Return only timeslots that belong to the specified show(s).",
)
@staticmethod
def filter_surrounding(queryset: QuerySet, _: str, value: datetime.datetime):
nearest_timeslots_in_future = (
models.TimeSlot.objects.filter(start__gte=value)
.order_by("start")
.values_list("id", flat=True)[:5]
)
nearest_timeslots_in_past = (
models.TimeSlot.objects.filter(start__lt=value)
.order_by("-start")
.values_list("id", flat=True)[:5]
)
relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(nearest_timeslots_in_past)
return queryset.filter(id__in=relevant_timeslot_ids)
def filter_queryset(self, queryset):
queryset = super().filter_queryset(queryset)
# This is for backwards compatibility as the surrounding-filter was formerly implemented
# by just checking for the existence of the query parameter.
if self.request.GET.get("surrounding", None) == "":
queryset = self.filter_surrounding(queryset, "surrounding", timezone.now())
return queryset
class Meta:
model = models.TimeSlot
fields = [
"order",
"start",
"end",
"surrounding",
class EpisodeFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
order = filters.OrderingFilter(
fields=["title", "id", "updated_at", "updated_by", "has_timeslots"],
)
ids = IntegerInFilter(
field_name="id",
help_text="Return only episodes matching the specified id(s).",
field_name="show",
help_text="Return only episodes that belong to the specified show(s).",
field_name="show__owners",
help_text="Return only episodes of the specified owner(s).",
field_name="timeslots",
help_text="Return only episodes that belong to the specified timeslot(s).",
has_timeslots = filters.BooleanFilter(
label="Has timeslots",
help_text="Returns only timeslots that either have or have not any timeslots.",
)
has_title = StaticQueryBooleanFilter(
query=~Q(title=""),
help_text=(
"If true returns episodes with titles. If false returns episodes without titles."
),
)
class Meta:
model = models.Episode
fields = [
"ids",
"show_ids",
"show_owner_ids",
"timeslot_ids",
class ActiveFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
is_active = filters.BooleanFilter(field_name="is_active")
class Meta:
fields = [
class VirtualTimeslotFilterSet(filters.FilterSet):
start = filters.IsoDateTimeFilter(method="filter_noop")
end = filters.IsoDateTimeFilter(method="filter_noop")
cut_at_range_boundaries = filters.BooleanFilter(
help_text=(
"If true guarantees that the first and last program entry match the requested range "
"even if these entries earlier or end later."
),
method="filter_noop",
)
include_virtual = filters.BooleanFilter(
help_text="Include virtual timeslot entries (default: false).",
method="filter_noop",
)
# Filters using the noop are implemented in the generate_program_entries generator.
# We do this, so that we have all the bells and whistles of the automatic value conversion,
# but can still implement custom filter logic on top.
def filter_noop(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
return queryset
def filter_queryset(self, queryset: QuerySet):
queryset = super().filter_queryset(queryset)
filter_data = self.form.cleaned_data
return list(
generate_program_entries(
queryset,
start=filter_data["start"],
end=filter_data["end"],
include_virtual=bool(filter_data["include_virtual"]),
cut_at_range_boundaries=bool(filter_data["cut_at_range_boundaries"]),
)
)
class Meta:
model = models.TimeSlot
fields = ["start", "end", "include_virtual"]
class PlaylistFilter(filters.FilterSet):
contains_file_ids = IntegerInFilter(
field_name="entries__file_id",
help_text="Return only playlists that use to the specified file ID(s).",
show_ids = IntegerInFilter(
field_name="show_id",
help_text="Return only playlists for the specified show ID.",
)
fields = ("contains_file_ids", "show_ids")
model = models.Playlist