Skip to content
Snippets Groups Projects
filters.py 11.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • from django_filters import constants
    
    from django_filters import rest_framework as filters
    from django_filters import widgets
    
    
    from django.db.models import Exists, OuterRef, QuerySet
    
    from django.utils import timezone
    from program import models
    
    from program.services import generate_program_entries
    
    
    
    class StaticFilterHelpTextMixin:
        @classmethod
        def filter_for_field(cls, field, field_name, lookup_expr=None):
            _filter = super().filter_for_field(field, field_name, lookup_expr)
            if "help_text" not in _filter.extra:
                help_texts = getattr(cls.Meta, "help_texts", {})
                _filter.extra["help_text"] = help_texts.get(field_name, "")
            return _filter
    
    
    
    class IntegerInFilter(filters.BaseInFilter):
    
        class QueryArrayWidget(widgets.QueryArrayWidget):
            # see: https://github.com/carltongibson/django-filter/issues/1047
            def value_from_datadict(self, data, files, name):
                new_data = {}
                for key in data.keys():
                    if len(data.getlist(key)) == 1 and "," in data[key]:
                        new_data[key] = data[key]
                    else:
                        new_data[key] = data.getlist(key)
                return super().value_from_datadict(new_data, files, name)
    
    
        def __init__(self, *args, **kwargs):
    
            kwargs.setdefault("widget", self.QueryArrayWidget())
    
    class ShowOrderingFilter(filters.OrderingFilter):
        def filter(self, qs: QuerySet, value):
            if value in constants.EMPTY_VALUES:
                return qs
            ordering = [self.get_ordering_value(param) for param in value]
            fields = (field.lstrip("-") for field in ordering)
            if "is_owner" in fields:
                _id = getattr(self.parent.request.user, "id", None)
                qs = qs.annotate(
                    is_owner=Exists(
                        models.Show.owners.through.objects.filter(user=_id, show=OuterRef("pk")),
                    )
                )
            return qs.order_by(*ordering)
    
    
    
    class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    
        ids = IntegerInFilter(
            field_name="id",
            help_text="Return only shows matching the specified id(s).",
        )
    
        order = ShowOrderingFilter(
    
            fields=["name", "slug", "id", "is_active", "is_owner", "updated_at", "updated_by"],
    
            help_text="Order shows by the given field(s).",
        )
    
        category_ids = IntegerInFilter(
            field_name="category",
    
            help_text="Return only shows of the given category or categories.",
        )
    
        category_slug = filters.CharFilter(
            field_name="category__slug",
            help_text="Return only shows of the given category slug.",
    
        host_ids = IntegerInFilter(
    
            help_text="Return only shows hosted by the given profile ID(s).",
    
        is_active = filters.BooleanFilter(
    
            help_text="Return only currently active/inactive shows.",
    
        is_writable = filters.BooleanFilter(
            method="filter_writable",
            help_text="Return only shows writable by the requesting authenticated user.",
        )
    
        is_public = filters.BooleanFilter(
    
            field_name="is_public",
            help_text="Return only shows that are public/non-public.",
        )
    
        language_ids = IntegerInFilter(
            field_name="language",
    
            help_text="Return only shows of the given language(s).",
    
        music_focus_ids = IntegerInFilter(
    
            field_name="music_focus",
            help_text="Return only shows with given music focus(es).",
        )
    
        music_focus_slug = filters.CharFilter(
            field_name="music_focus__slug",
            help_text="Return only shows with the give music focus slug.",
    
        owner_ids = IntegerInFilter(
    
            field_name="owners",
            help_text="Return only shows that belong to the given owner(s).",
        )
    
        topic_ids = IntegerInFilter(
            field_name="topic",
    
            help_text="Return only shows of the given topic(s).",
        )
    
        topic_slug = filters.CharFilter(
            field_name="topic__slug",
            help_text="Return only shows of the given topic slug.",
    
        type_id = IntegerInFilter(
            field_name="type",
    
            help_text="Return only shows of a given type.",
        )
    
        type_slug = filters.CharFilter(
            field_name="type__slug",
            help_text="Return only shows of the given type slug.",
    
        def filter_writable(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
            user = self.request.user if self.request.user.is_authenticated else None
    
    
            if value and user and (user.is_superuser or user.has_perm("program.update_show")):
    
                return queryset
            elif value and user:
                return queryset.filter(owners=user)
            else:
                return models.Show.objects.none()
    
    
        class Meta:
            model = models.Show
            fields = [
    
                "category_ids",
                "category_slug",
                "host_ids",
                "is_active",
                "is_public",
    
                "language_ids",
                "music_focus_ids",
                "music_focus_slug",
                "owner_ids",
                "topic_ids",
                "topic_slug",
                "type_id",
                "type_slug",
    
    class ScheduleFilterSet(filters.FilterSet):
    
        ids = IntegerInFilter(
            field_name="id",
            help_text="Return only schedules matching the specified id(s).",
        )
    
        show_ids = IntegerInFilter(
            field_name="show",
            help_text="Return only schedules that belong to the specified show(s).",
        )
    
        exclude_inactive = filters.BooleanFilter(
            method="filter_exclude_inactive",
            help_text="Excludes all schedules that don’t have timeslots in the future.",
        )
    
    
        @staticmethod
        def filter_exclude_inactive(queryset: QuerySet, _: str, value: bool):
    
            if not value:
                return queryset
            return queryset.filter(
                Exists(
                    models.TimeSlot.objects.filter(schedule=OuterRef("pk"), end__gte=timezone.now())
                )
            )
    
    class TimeSlotFilterSet(filters.FilterSet):
    
        ids = IntegerInFilter(
            field_name="id",
            help_text="Return only timeslots matching the specified id(s).",
        )
    
        order = filters.OrderingFilter(
            fields=[field.name for field in models.TimeSlot._meta.get_fields()]
        )
        surrounding = filters.DateTimeFilter(
            method="filter_surrounding",
            label="Return surrounding timeslots",
            help_text=(
                "Returns the 10 nearest timeslots around the specified datetime. "
                "If specified without a datetime value the current date and time is assumed."
            ),
        )
    
        starts_before = filters.DateTimeFilter(
            field_name="start",
            lookup_expr="lt",
            help_text="Only returns timeslots that start before the specified datetime.",
        )
    
        starts_after = filters.DateTimeFilter(
            field_name="start",
            lookup_expr="gte",
    
            help_text="Only returns timeslots that start at or after the specified datetime.",
    
        ends_before = filters.DateTimeFilter(
            field_name="end",
            lookup_expr="lt",
    
            help_text="Only returns timeslots that end before the specified datetime.",
    
        ends_after = filters.DateTimeFilter(
            field_name="end",
            lookup_expr="gte",
            help_text="Only returns timeslots that end at or after the specified datetime.",
        )
    
        schedule_ids = IntegerInFilter(
    
            field_name="schedule",
            help_text="Return only timeslots that belong to the specified schedule(s).",
        )
    
        show_ids = IntegerInFilter(
    
            field_name="schedule__show",
            help_text="Return only timeslots that belong to the specified show(s).",
        )
    
    
        @staticmethod
        def filter_surrounding(queryset: QuerySet, _: str, value: datetime.datetime):
    
            nearest_timeslots_in_future = (
                models.TimeSlot.objects.filter(start__gte=value)
                .order_by("start")
                .values_list("id", flat=True)[:5]
            )
            nearest_timeslots_in_past = (
                models.TimeSlot.objects.filter(start__lt=value)
                .order_by("-start")
                .values_list("id", flat=True)[:5]
            )
    
            relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(nearest_timeslots_in_past)
    
            return queryset.filter(id__in=relevant_timeslot_ids)
    
        def filter_queryset(self, queryset):
            queryset = super().filter_queryset(queryset)
            # This is for backwards compatibility as the surrounding-filter was formerly implemented
            # by just checking for the existence of the query parameter.
            if self.request.GET.get("surrounding", None) == "":
                queryset = self.filter_surrounding(queryset, "surrounding", timezone.now())
            return queryset
    
    
        class Meta:
            model = models.TimeSlot
            fields = [
    
                "order",
                "start",
                "end",
                "surrounding",
    
                "schedule_ids",
                "show_ids",
    
    class NoteFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    
            field_name="id",
            help_text="Return only notes matching the specified id(s).",
        )
    
        show_ids = IntegerInFilter(
    
            field_name="timeslot__show",
            help_text="Return only notes that belong to the specified show(s).",
        )
    
        show_owner_ids = IntegerInFilter(
    
            field_name="timeslot__show__owners",
    
            help_text="Return only notes by show the specified owner(s): all notes the user may edit.",
        )
    
        timeslot_ids = IntegerInFilter(
    
            field_name="timeslot",
            help_text="Return only notes that belong to the specified timeslot(s).",
        )
    
                "show_ids",
                "show_owner_ids",
                "timeslot_ids",
    
    class ActiveFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    
        is_active = filters.BooleanFilter(field_name="is_active")
    
                "is_active",
    
    
    
    class VirtualTimeslotFilterSet(filters.FilterSet):
        start = filters.IsoDateTimeFilter(method="filter_noop")
        end = filters.IsoDateTimeFilter(method="filter_noop")
    
        cut_at_range_boundaries = filters.BooleanFilter(
            help_text=(
                "If true guarantees that the first and last program entry match the requested range"
                "even if these entries earlier or end later."
            ),
            method="filter_noop",
        )
    
        include_virtual = filters.BooleanFilter(
            help_text="Include virtual timeslot entries (default: false).",
            method="filter_noop",
        )
    
        # Filters using the noop are implemented in the generate_program_entries generator.
        # We do this, so that we have all the bells and whistles of the automatic value conversion,
        # but can still implement custom filter logic on top.
        def filter_noop(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
            return queryset
    
        def filter_queryset(self, queryset: QuerySet):
            queryset = super().filter_queryset(queryset)
            filter_data = self.form.cleaned_data
            return list(
                generate_program_entries(
                    queryset,
                    start=filter_data["start"],
                    end=filter_data["end"],
    
                    include_virtual=bool(filter_data["include_virtual"]),
                    cut_at_range_boundaries=bool(filter_data["cut_at_range_boundaries"]),
    
                )
            )
    
        class Meta:
            model = models.TimeSlot
            fields = ["start", "end", "include_virtual"]