import datetime from django_filters import constants from django_filters import rest_framework as filters from django_filters import widgets from django import forms from django.db.models import Exists, OuterRef, Q, QuerySet from django.utils import timezone from program import models from program.services import generate_program_entries class StaticFilterHelpTextMixin: @classmethod def filter_for_field(cls, field, field_name, lookup_expr=None): _filter = super().filter_for_field(field, field_name, lookup_expr) if "help_text" not in _filter.extra: help_texts = getattr(cls.Meta, "help_texts", {}) _filter.extra["help_text"] = help_texts.get(field_name, "") return _filter class StaticQueryBooleanFilter(filters.BooleanFilter): def __init__(self, *args, query: Q | None = None, **kwargs): if query is None: raise ValueError("query must not be None or unset.") self.query = query super().__init__(*args, **kwargs) def filter(self, qs, value): if value is True: return qs.filter(self.query) if value is False: return qs.filter(~self.query) return qs class IntegerInFilter(filters.BaseInFilter): class QueryArrayWidget(widgets.QueryArrayWidget): # see: https://github.com/carltongibson/django-filter/issues/1047 def value_from_datadict(self, data, files, name): new_data = {} for key in data.keys(): if len(data.getlist(key)) == 1 and "," in data[key]: new_data[key] = data[key] else: new_data[key] = data.getlist(key) return super().value_from_datadict(new_data, files, name) field_class = forms.IntegerField def __init__(self, *args, **kwargs): kwargs.setdefault("widget", self.QueryArrayWidget()) super().__init__(*args, **kwargs) class ShowOrderingFilter(filters.OrderingFilter): def filter(self, qs: QuerySet, value): if value in constants.EMPTY_VALUES: return qs ordering = [self.get_ordering_value(param) for param in value] fields = (field.lstrip("-") for field in ordering) if "is_owner" in fields: _id = getattr(self.parent.request.user, "id", None) qs = qs.annotate( is_owner=Exists( models.Show.owners.through.objects.filter(user=_id, show=OuterRef("pk")), ) ) return qs.order_by(*ordering) class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet): ids = IntegerInFilter( field_name="id", help_text="Return only shows matching the specified id(s).", ) order = ShowOrderingFilter( fields=["name", "slug", "id", "is_active", "is_owner", "updated_at", "updated_by"], help_text="Order shows by the given field(s).", ) category_ids = IntegerInFilter( field_name="category", help_text="Return only shows of the given category or categories.", ) category_slug = filters.CharFilter( field_name="category__slug", help_text="Return only shows of the given category slug.", ) host_ids = IntegerInFilter( field_name="hosts", help_text="Return only shows hosted by the given profile ID(s).", ) is_active = filters.BooleanFilter( field_name="is_active", help_text="Return only currently active/inactive shows.", ) is_writable = filters.BooleanFilter( method="filter_writable", help_text="Return only shows writable by the requesting authenticated user.", ) is_public = filters.BooleanFilter( field_name="is_public", help_text="Return only shows that are public/non-public.", ) language_ids = IntegerInFilter( field_name="language", help_text="Return only shows of the given language(s).", ) music_focus_ids = IntegerInFilter( field_name="music_focus", help_text="Return only shows with given music focus(es).", ) music_focus_slug = filters.CharFilter( field_name="music_focus__slug", help_text="Return only shows with the give music focus slug.", ) owner_ids = IntegerInFilter( field_name="owners", help_text="Return only shows that belong to the given owner(s).", ) topic_ids = IntegerInFilter( field_name="topic", help_text="Return only shows of the given topic(s).", ) topic_slug = filters.CharFilter( field_name="topic__slug", help_text="Return only shows of the given topic slug.", ) type_id = IntegerInFilter( field_name="type", help_text="Return only shows of a given type.", ) type_slug = filters.CharFilter( field_name="type__slug", help_text="Return only shows of the given type slug.", ) def filter_writable(self, queryset: QuerySet, _: str, value: bool) -> QuerySet: user = self.request.user if self.request.user.is_authenticated else None if value and user and (user.is_superuser or user.has_perm("program.update_show")): return queryset elif value and user: return queryset.filter(owners=user) else: return models.Show.objects.none() class Meta: model = models.Show fields = [ "ids", "order", "category_ids", "category_slug", "host_ids", "is_active", "is_public", "is_writable", "language_ids", "music_focus_ids", "music_focus_slug", "owner_ids", "topic_ids", "topic_slug", "type_id", "type_slug", ] class ScheduleFilterSet(filters.FilterSet): ids = IntegerInFilter( field_name="id", help_text="Return only schedules matching the specified id(s).", ) show_ids = IntegerInFilter( field_name="show", help_text="Return only schedules that belong to the specified show(s).", ) exclude_inactive = filters.BooleanFilter( method="filter_exclude_inactive", help_text="Excludes all schedules that don’t have timeslots in the future.", ) @staticmethod def filter_exclude_inactive(queryset: QuerySet, _: str, value: bool): if not value: return queryset return queryset.filter( Exists( models.TimeSlot.objects.filter(schedule=OuterRef("pk"), end__gte=timezone.now()) ) ) class TimeSlotFilterSet(filters.FilterSet): ids = IntegerInFilter( field_name="id", help_text="Return only timeslots matching the specified id(s).", ) order = filters.OrderingFilter( fields=[field.name for field in models.TimeSlot._meta.get_fields()] ) surrounding = filters.DateTimeFilter( method="filter_surrounding", label="Return surrounding timeslots", help_text=( "Returns the 10 nearest timeslots around the specified datetime. " "If specified without a datetime value the current date and time is assumed." ), ) starts_before = filters.DateTimeFilter( field_name="start", lookup_expr="lt", help_text="Only returns timeslots that start before the specified datetime.", ) starts_after = filters.DateTimeFilter( field_name="start", lookup_expr="gte", help_text="Only returns timeslots that start at or after the specified datetime.", ) ends_before = filters.DateTimeFilter( field_name="end", lookup_expr="lt", help_text="Only returns timeslots that end before the specified datetime.", ) ends_after = filters.DateTimeFilter( field_name="end", lookup_expr="gte", help_text="Only returns timeslots that end at or after the specified datetime.", ) schedule_ids = IntegerInFilter( field_name="schedule", help_text="Return only timeslots that belong to the specified schedule(s).", ) show_ids = IntegerInFilter( field_name="schedule__show", help_text="Return only timeslots that belong to the specified show(s).", ) @staticmethod def filter_surrounding(queryset: QuerySet, _: str, value: datetime.datetime): nearest_timeslots_in_future = ( models.TimeSlot.objects.filter(start__gte=value) .order_by("start") .values_list("id", flat=True)[:5] ) nearest_timeslots_in_past = ( models.TimeSlot.objects.filter(start__lt=value) .order_by("-start") .values_list("id", flat=True)[:5] ) relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(nearest_timeslots_in_past) return queryset.filter(id__in=relevant_timeslot_ids) def filter_queryset(self, queryset): queryset = super().filter_queryset(queryset) # This is for backwards compatibility as the surrounding-filter was formerly implemented # by just checking for the existence of the query parameter. if self.request.GET.get("surrounding", None) == "": queryset = self.filter_surrounding(queryset, "surrounding", timezone.now()) return queryset class Meta: model = models.TimeSlot fields = [ "ids", "order", "start", "end", "surrounding", "schedule_ids", "show_ids", ] class EpisodeFilterSet(StaticFilterHelpTextMixin, filters.FilterSet): order = filters.OrderingFilter( fields=["title", "id", "updated_at", "updated_by", "has_timeslots"], ) ids = IntegerInFilter( field_name="id", help_text="Return only episodes matching the specified id(s).", ) show_ids = IntegerInFilter( field_name="show", help_text="Return only episodes that belong to the specified show(s).", ) show_owner_ids = IntegerInFilter( field_name="show__owners", help_text="Return only episodes of the specified owner(s).", ) timeslot_ids = IntegerInFilter( field_name="timeslots", help_text="Return only episodes that belong to the specified timeslot(s).", ) has_timeslots = filters.BooleanFilter( label="Has timeslots", help_text="Returns only timeslots that either have or have not any timeslots.", ) has_title = StaticQueryBooleanFilter( query=~Q(title=""), help_text=( "If true returns episodes with titles. If false returns episodes without titles." ), ) class Meta: model = models.Episode fields = [ "ids", "show_ids", "show_owner_ids", "timeslot_ids", "order", ] class ActiveFilterSet(StaticFilterHelpTextMixin, filters.FilterSet): is_active = filters.BooleanFilter(field_name="is_active") class Meta: fields = [ "is_active", ] class VirtualTimeslotFilterSet(filters.FilterSet): start = filters.IsoDateTimeFilter(method="filter_noop") end = filters.IsoDateTimeFilter(method="filter_noop") cut_at_range_boundaries = filters.BooleanFilter( help_text=( "If true guarantees that the first and last program entry match the requested range " "even if these entries earlier or end later." ), method="filter_noop", ) include_virtual = filters.BooleanFilter( help_text="Include virtual timeslot entries (default: false).", method="filter_noop", ) # Filters using the noop are implemented in the generate_program_entries generator. # We do this, so that we have all the bells and whistles of the automatic value conversion, # but can still implement custom filter logic on top. def filter_noop(self, queryset: QuerySet, _: str, value: bool) -> QuerySet: return queryset def filter_queryset(self, queryset: QuerySet): queryset = super().filter_queryset(queryset) filter_data = self.form.cleaned_data return list( generate_program_entries( queryset, start=filter_data["start"], end=filter_data["end"], include_virtual=bool(filter_data["include_virtual"]), cut_at_range_boundaries=bool(filter_data["cut_at_range_boundaries"]), ) ) class Meta: model = models.TimeSlot fields = ["start", "end", "include_virtual"] class PlaylistFilter(filters.FilterSet): contains_file_ids = IntegerInFilter( field_name="entries__file_id", help_text="Return only playlists that use to the specified file ID(s).", ) show_ids = IntegerInFilter( field_name="show_id", help_text="Return only playlists for the specified show ID.", ) class Meta: fields = ("contains_file_ids", "show_ids") model = models.Playlist