Skip to content
Snippets Groups Projects
filters.py 12.9 KiB
Newer Older
from django_filters import constants
from django_filters import rest_framework as filters
from django_filters import widgets

from django.db.models import Exists, OuterRef, Q, QuerySet
from django.utils import timezone
from program import models
from program.services import generate_program_entries


class StaticFilterHelpTextMixin:
    @classmethod
    def filter_for_field(cls, field, field_name, lookup_expr=None):
        _filter = super().filter_for_field(field, field_name, lookup_expr)
        if "help_text" not in _filter.extra:
            help_texts = getattr(cls.Meta, "help_texts", {})
            _filter.extra["help_text"] = help_texts.get(field_name, "")
        return _filter


class StaticQueryBooleanFilter(filters.BooleanFilter):
    def __init__(self, *args, query: Q | None = None, **kwargs):
        if query is None:
            raise ValueError("query must not be None or unset.")
        self.query = query
        super().__init__(*args, **kwargs)

    def filter(self, qs, value):
        if value is True:
            return qs.filter(self.query)
        if value is False:
            return qs.filter(~self.query)
        return qs


class IntegerInFilter(filters.BaseInFilter):
    class QueryArrayWidget(widgets.QueryArrayWidget):
        # see: https://github.com/carltongibson/django-filter/issues/1047
        def value_from_datadict(self, data, files, name):
            new_data = {}
            for key in data.keys():
                if len(data.getlist(key)) == 1 and "," in data[key]:
                    new_data[key] = data[key]
                else:
                    new_data[key] = data.getlist(key)
            return super().value_from_datadict(new_data, files, name)

    def __init__(self, *args, **kwargs):
        kwargs.setdefault("widget", self.QueryArrayWidget())
class ShowOrderingFilter(filters.OrderingFilter):
    def filter(self, qs: QuerySet, value):
        if value in constants.EMPTY_VALUES:
            return qs
        ordering = [self.get_ordering_value(param) for param in value]
        fields = (field.lstrip("-") for field in ordering)
        if "is_owner" in fields:
            _id = getattr(self.parent.request.user, "id", None)
            qs = qs.annotate(
                is_owner=Exists(
                    models.Show.owners.through.objects.filter(user=_id, show=OuterRef("pk")),
                )
            )
        return qs.order_by(*ordering)


class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    ids = IntegerInFilter(
        field_name="id",
        help_text="Return only shows matching the specified id(s).",
    )
    order = ShowOrderingFilter(
        fields=["name", "slug", "id", "is_active", "is_owner", "updated_at", "updated_by"],
        help_text="Order shows by the given field(s).",
    )
    category_ids = IntegerInFilter(
        field_name="category",
        help_text="Return only shows of the given category or categories.",
    )
    category_slug = filters.CharFilter(
        field_name="category__slug",
        help_text="Return only shows of the given category slug.",
    host_ids = IntegerInFilter(
        help_text="Return only shows hosted by the given profile ID(s).",
    is_active = filters.BooleanFilter(
        help_text="Return only currently active/inactive shows.",
    is_writable = filters.BooleanFilter(
        method="filter_writable",
        help_text="Return only shows writable by the requesting authenticated user.",
    )
    is_public = filters.BooleanFilter(
        field_name="is_public",
        help_text="Return only shows that are public/non-public.",
    )
    language_ids = IntegerInFilter(
        field_name="language",
        help_text="Return only shows of the given language(s).",
    music_focus_ids = IntegerInFilter(
        field_name="music_focus",
        help_text="Return only shows with given music focus(es).",
    )
    music_focus_slug = filters.CharFilter(
        field_name="music_focus__slug",
        help_text="Return only shows with the give music focus slug.",
    owner_ids = IntegerInFilter(
        field_name="owners",
        help_text="Return only shows that belong to the given owner(s).",
    )
    topic_ids = IntegerInFilter(
        field_name="topic",
        help_text="Return only shows of the given topic(s).",
    )
    topic_slug = filters.CharFilter(
        field_name="topic__slug",
        help_text="Return only shows of the given topic slug.",
    type_id = IntegerInFilter(
        field_name="type",
        help_text="Return only shows of a given type.",
    )
    type_slug = filters.CharFilter(
        field_name="type__slug",
        help_text="Return only shows of the given type slug.",
    def filter_writable(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
        user = self.request.user if self.request.user.is_authenticated else None

        if value and user and (user.is_superuser or user.has_perm("program.update_show")):
            return queryset
        elif value and user:
            return queryset.filter(owners=user)
        else:
            return models.Show.objects.none()

    class Meta:
        model = models.Show
        fields = [
            "category_ids",
            "category_slug",
            "host_ids",
            "is_active",
            "is_public",
            "language_ids",
            "music_focus_ids",
            "music_focus_slug",
            "owner_ids",
            "topic_ids",
            "topic_slug",
            "type_id",
            "type_slug",
class ScheduleFilterSet(filters.FilterSet):
    ids = IntegerInFilter(
        field_name="id",
        help_text="Return only schedules matching the specified id(s).",
    )
    show_ids = IntegerInFilter(
        field_name="show",
        help_text="Return only schedules that belong to the specified show(s).",
    )
    exclude_inactive = filters.BooleanFilter(
        method="filter_exclude_inactive",
        help_text="Excludes all schedules that don’t have timeslots in the future.",
    )

    @staticmethod
    def filter_exclude_inactive(queryset: QuerySet, _: str, value: bool):
        if not value:
            return queryset
        return queryset.filter(
            Exists(
                models.TimeSlot.objects.filter(schedule=OuterRef("pk"), end__gte=timezone.now())
            )
        )
class TimeSlotFilterSet(filters.FilterSet):
    ids = IntegerInFilter(
        field_name="id",
        help_text="Return only timeslots matching the specified id(s).",
    )
    order = filters.OrderingFilter(
        fields=[field.name for field in models.TimeSlot._meta.get_fields()]
    )
    surrounding = filters.DateTimeFilter(
        method="filter_surrounding",
        label="Return surrounding timeslots",
        help_text=(
            "Returns the 10 nearest timeslots around the specified datetime. "
            "If specified without a datetime value the current date and time is assumed."
        ),
    )
    starts_before = filters.DateTimeFilter(
        field_name="start",
        lookup_expr="lt",
        help_text="Only returns timeslots that start before the specified datetime.",
    )
    starts_after = filters.DateTimeFilter(
        field_name="start",
        lookup_expr="gte",
        help_text="Only returns timeslots that start at or after the specified datetime.",
    ends_before = filters.DateTimeFilter(
        field_name="end",
        lookup_expr="lt",
        help_text="Only returns timeslots that end before the specified datetime.",
    ends_after = filters.DateTimeFilter(
        field_name="end",
        lookup_expr="gte",
        help_text="Only returns timeslots that end at or after the specified datetime.",
    )
    schedule_ids = IntegerInFilter(
        field_name="schedule",
        help_text="Return only timeslots that belong to the specified schedule(s).",
    )
    show_ids = IntegerInFilter(
        field_name="schedule__show",
        help_text="Return only timeslots that belong to the specified show(s).",
    )

    @staticmethod
    def filter_surrounding(queryset: QuerySet, _: str, value: datetime.datetime):
        nearest_timeslots_in_future = (
            models.TimeSlot.objects.filter(start__gte=value)
            .order_by("start")
            .values_list("id", flat=True)[:5]
        )
        nearest_timeslots_in_past = (
            models.TimeSlot.objects.filter(start__lt=value)
            .order_by("-start")
            .values_list("id", flat=True)[:5]
        )
        relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(nearest_timeslots_in_past)
        return queryset.filter(id__in=relevant_timeslot_ids)

    def filter_queryset(self, queryset):
        queryset = super().filter_queryset(queryset)
        # This is for backwards compatibility as the surrounding-filter was formerly implemented
        # by just checking for the existence of the query parameter.
        if self.request.GET.get("surrounding", None) == "":
            queryset = self.filter_surrounding(queryset, "surrounding", timezone.now())
        return queryset

    class Meta:
        model = models.TimeSlot
        fields = [
            "order",
            "start",
            "end",
            "surrounding",
            "schedule_ids",
            "show_ids",
class EpisodeFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    order = filters.OrderingFilter(
        fields=["title", "id", "updated_at", "updated_by", "has_timeslots"],
    )
        help_text="Return only episodes matching the specified id(s).",
    show_ids = IntegerInFilter(
        field_name="show",
        help_text="Return only episodes that belong to the specified show(s).",
    show_owner_ids = IntegerInFilter(
        field_name="show__owners",
        help_text="Return only episodes of the specified owner(s).",
    timeslot_ids = IntegerInFilter(
        field_name="timeslots",
        help_text="Return only episodes that belong to the specified timeslot(s).",
    has_timeslots = filters.BooleanFilter(
        label="Has timeslots",
        help_text="Returns only timeslots that either have or have not any timeslots.",
    )
    has_title = StaticQueryBooleanFilter(
        query=~Q(title=""),
        help_text=(
            "If true returns episodes with titles. If false returns episodes without titles."
        ),
    )
            "show_ids",
            "show_owner_ids",
            "timeslot_ids",
class ActiveFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    is_active = filters.BooleanFilter(field_name="is_active")
            "is_active",


class VirtualTimeslotFilterSet(filters.FilterSet):
    start = filters.IsoDateTimeFilter(method="filter_noop")
    end = filters.IsoDateTimeFilter(method="filter_noop")
    cut_at_range_boundaries = filters.BooleanFilter(
        help_text=(
            "If true guarantees that the first and last program entry match the requested range "
            "even if these entries earlier or end later."
        ),
        method="filter_noop",
    )
    include_virtual = filters.BooleanFilter(
        help_text="Include virtual timeslot entries (default: false).",
        method="filter_noop",
    )

    # Filters using the noop are implemented in the generate_program_entries generator.
    # We do this, so that we have all the bells and whistles of the automatic value conversion,
    # but can still implement custom filter logic on top.
    def filter_noop(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
        return queryset

    def filter_queryset(self, queryset: QuerySet):
        queryset = super().filter_queryset(queryset)
        filter_data = self.form.cleaned_data
        return list(
            generate_program_entries(
                queryset,
                start=filter_data["start"],
                end=filter_data["end"],
                include_virtual=bool(filter_data["include_virtual"]),
                cut_at_range_boundaries=bool(filter_data["cut_at_range_boundaries"]),
            )
        )

    class Meta:
        model = models.TimeSlot
        fields = ["start", "end", "include_virtual"]
class PlaylistFilter(filters.FilterSet):
    contains_file_ids = IntegerInFilter(
        field_name="entries__file_id",
        help_text="Return only playlists that use to the specified file ID(s).",
    show_ids = IntegerInFilter(
        field_name="show_id",
        help_text="Return only playlists for the specified show ID.",
    )
        fields = ("contains_file_ids", "show_ids")
        model = models.Playlist