Newer
Older
import datetime
from django_filters import constants
from django_filters import rest_framework as filters
from django_filters import widgets
from django import forms
from django.db.models import Exists, OuterRef, QuerySet
from django.utils import timezone
from program import models
from program.services import generate_program_entries
class StaticFilterHelpTextMixin:
@classmethod
def filter_for_field(cls, field, field_name, lookup_expr=None):
_filter = super().filter_for_field(field, field_name, lookup_expr)
if "help_text" not in _filter.extra:
help_texts = getattr(cls.Meta, "help_texts", {})
_filter.extra["help_text"] = help_texts.get(field_name, "")
return _filter
class IntegerInFilter(filters.BaseInFilter):
class QueryArrayWidget(widgets.QueryArrayWidget):
# see: https://github.com/carltongibson/django-filter/issues/1047
def value_from_datadict(self, data, files, name):
new_data = {}
for key in data.keys():
if len(data.getlist(key)) == 1 and "," in data[key]:
new_data[key] = data[key]
else:
new_data[key] = data.getlist(key)
return super().value_from_datadict(new_data, files, name)
field_class = forms.IntegerField
def __init__(self, *args, **kwargs):
kwargs.setdefault("widget", self.QueryArrayWidget())
super().__init__(*args, **kwargs)
class ShowOrderingFilter(filters.OrderingFilter):
def filter(self, qs: QuerySet, value):
if value in constants.EMPTY_VALUES:
return qs
ordering = [self.get_ordering_value(param) for param in value]
fields = (field.lstrip("-") for field in ordering)
if "is_owner" in fields:
_id = getattr(self.parent.request.user, "id", None)
qs = qs.annotate(
is_owner=Exists(
models.Show.owners.through.objects.filter(user=_id, show=OuterRef("pk")),
)
)
return qs.order_by(*ordering)
class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only shows matching the specified id(s).",
)
order = ShowOrderingFilter(
fields=["name", "slug", "id", "is_active", "is_owner", "updated_at", "updated_by"],
help_text="Order shows by the given field(s).",
)
category_ids = IntegerInFilter(
field_name="category",
help_text="Return only shows of the given category or categories.",
)
category_slug = filters.CharFilter(
field_name="category__slug",
help_text="Return only shows of the given category slug.",
help_text="Return only shows hosted by the given profile ID(s).",
field_name="is_active",
help_text="Return only currently active/inactive shows.",
is_writable = filters.BooleanFilter(
method="filter_writable",
help_text="Return only shows writable by the requesting authenticated user.",
)
field_name="is_public",
help_text="Return only shows that are public/non-public.",
)
language_ids = IntegerInFilter(
field_name="language",
help_text="Return only shows of the given language(s).",
field_name="music_focus",
help_text="Return only shows with given music focus(es).",
)
music_focus_slug = filters.CharFilter(
field_name="music_focus__slug",
help_text="Return only shows with the give music focus slug.",
field_name="owners",
help_text="Return only shows that belong to the given owner(s).",
)
topic_ids = IntegerInFilter(
field_name="topic",
help_text="Return only shows of the given topic(s).",
)
topic_slug = filters.CharFilter(
field_name="topic__slug",
help_text="Return only shows of the given topic slug.",
type_id = IntegerInFilter(
field_name="type",
help_text="Return only shows of a given type.",
)
type_slug = filters.CharFilter(
field_name="type__slug",
help_text="Return only shows of the given type slug.",
def filter_writable(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
user = self.request.user if self.request.user.is_authenticated else None
if value and user and (user.is_superuser or user.has_perm("program.update_show")):
return queryset
elif value and user:
return queryset.filter(owners=user)
else:
return models.Show.objects.none()
class Meta:
model = models.Show
fields = [
"category_ids",
"category_slug",
"host_ids",
"is_active",
"is_public",
"language_ids",
"music_focus_ids",
"music_focus_slug",
"owner_ids",
"topic_ids",
"topic_slug",
"type_id",
"type_slug",
class ScheduleFilterSet(filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only schedules matching the specified id(s).",
)
show_ids = IntegerInFilter(
field_name="show",
help_text="Return only schedules that belong to the specified show(s).",
)
exclude_inactive = filters.BooleanFilter(
method="filter_exclude_inactive",
help_text="Excludes all schedules that don’t have timeslots in the future.",
)
@staticmethod
def filter_exclude_inactive(queryset: QuerySet, _: str, value: bool):
if not value:
return queryset
return queryset.filter(
Exists(
models.TimeSlot.objects.filter(schedule=OuterRef("pk"), end__gte=timezone.now())
)
)
class TimeSlotFilterSet(filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only timeslots matching the specified id(s).",
)
order = filters.OrderingFilter(
fields=[field.name for field in models.TimeSlot._meta.get_fields()]
)
surrounding = filters.DateTimeFilter(
method="filter_surrounding",
label="Return surrounding timeslots",
help_text=(
"Returns the 10 nearest timeslots around the specified datetime. "
"If specified without a datetime value the current date and time is assumed."
),
)
starts_before = filters.DateTimeFilter(
field_name="start",
lookup_expr="lt",
help_text="Only returns timeslots that start before the specified datetime.",
)
starts_after = filters.DateTimeFilter(
field_name="start",
lookup_expr="gte",
help_text="Only returns timeslots that start at or after the specified datetime.",
ends_before = filters.DateTimeFilter(
field_name="end",
lookup_expr="lt",
help_text="Only returns timeslots that end before the specified datetime.",
ends_after = filters.DateTimeFilter(
field_name="end",
lookup_expr="gte",
help_text="Only returns timeslots that end at or after the specified datetime.",
)
field_name="schedule",
help_text="Return only timeslots that belong to the specified schedule(s).",
)
field_name="schedule__show",
help_text="Return only timeslots that belong to the specified show(s).",
)
@staticmethod
def filter_surrounding(queryset: QuerySet, _: str, value: datetime.datetime):
nearest_timeslots_in_future = (
models.TimeSlot.objects.filter(start__gte=value)
.order_by("start")
.values_list("id", flat=True)[:5]
)
nearest_timeslots_in_past = (
models.TimeSlot.objects.filter(start__lt=value)
.order_by("-start")
.values_list("id", flat=True)[:5]
)
relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(nearest_timeslots_in_past)
return queryset.filter(id__in=relevant_timeslot_ids)
def filter_queryset(self, queryset):
queryset = super().filter_queryset(queryset)
# This is for backwards compatibility as the surrounding-filter was formerly implemented
# by just checking for the existence of the query parameter.
if self.request.GET.get("surrounding", None) == "":
queryset = self.filter_surrounding(queryset, "surrounding", timezone.now())
return queryset
class Meta:
model = models.TimeSlot
fields = [
"order",
"start",
"end",
"surrounding",
class NoteFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
ids = IntegerInFilter(
field_name="id",
help_text="Return only notes matching the specified id(s).",
)
field_name="timeslot__show",
help_text="Return only notes that belong to the specified show(s).",
)
field_name="timeslot__show__owners",
help_text="Return only notes by show the specified owner(s): all notes the user may edit.",
)
field_name="timeslot",
help_text="Return only notes that belong to the specified timeslot(s).",
)
class Meta:
model = models.Note
fields = [
"ids",
"show_ids",
"show_owner_ids",
"timeslot_ids",
class ActiveFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
is_active = filters.BooleanFilter(field_name="is_active")
class Meta:
fields = [
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class VirtualTimeslotFilterSet(filters.FilterSet):
start = filters.IsoDateTimeFilter(method="filter_noop")
end = filters.IsoDateTimeFilter(method="filter_noop")
include_virtual = filters.BooleanFilter(
help_text="Include virtual timeslot entries (default: false).",
method="filter_noop",
)
# Filters using the noop are implemented in the generate_program_entries generator.
# We do this, so that we have all the bells and whistles of the automatic value conversion,
# but can still implement custom filter logic on top.
def filter_noop(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
return queryset
def filter_queryset(self, queryset: QuerySet):
queryset = super().filter_queryset(queryset)
filter_data = self.form.cleaned_data
return list(
generate_program_entries(
queryset,
start=filter_data["start"],
end=filter_data["end"],
include_virtual=filter_data["include_virtual"],
)
)
class Meta:
model = models.TimeSlot
fields = ["start", "end", "include_virtual"]