Skip to content
Snippets Groups Projects
filters.py 11.08 KiB
import datetime

from django_filters import constants
from django_filters import rest_framework as filters
from django_filters import widgets

from django import forms
from django.db.models import Exists, OuterRef, Q, QuerySet
from django.utils import timezone
from program import models


class StaticFilterHelpTextMixin:
    @classmethod
    def filter_for_field(cls, field, field_name, lookup_expr=None):
        _filter = super().filter_for_field(field, field_name, lookup_expr)
        if "help_text" not in _filter.extra:
            help_texts = getattr(cls.Meta, "help_texts", {})
            _filter.extra["help_text"] = help_texts.get(field_name, "")
        return _filter


class IntegerInFilter(filters.BaseInFilter):
    class QueryArrayWidget(widgets.QueryArrayWidget):
        # see: https://github.com/carltongibson/django-filter/issues/1047
        def value_from_datadict(self, data, files, name):
            new_data = {}
            for key in data.keys():
                if len(data.getlist(key)) == 1 and "," in data[key]:
                    new_data[key] = data[key]
                else:
                    new_data[key] = data.getlist(key)
            return super().value_from_datadict(new_data, files, name)

    field_class = forms.IntegerField

    def __init__(self, *args, **kwargs):
        kwargs.setdefault("widget", self.QueryArrayWidget())
        super().__init__(*args, **kwargs)


class ShowOrderingFilter(filters.OrderingFilter):
    def filter(self, qs: QuerySet, value):
        if value in constants.EMPTY_VALUES:
            return qs
        ordering = [self.get_ordering_value(param) for param in value]
        fields = (field.lstrip("-") for field in ordering)
        if "is_owner" in fields:
            _id = getattr(self.parent.request.user, "id", None)
            qs = qs.annotate(
                is_owner=Exists(
                    models.Show.owners.through.objects.filter(user=_id, show=OuterRef("pk")),
                )
            )
        return qs.order_by(*ordering)


class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    order = ShowOrderingFilter(
        fields=["name", "slug", "id", "is_active", "is_owner", "updated_at", "updated_by"],
        help_text="Order shows by the given field(s).",
    )
    category_ids = IntegerInFilter(
        field_name="category",
        help_text="Return only shows of the given category or categories.",
    )
    category_slug = filters.CharFilter(
        field_name="category__slug",
        help_text="Return only shows of the given category slug.",
    )
    host_ids = IntegerInFilter(
        field_name="hosts",
        help_text="Return only shows assigned to the given host(s).",
    )
    is_active = filters.BooleanFilter(
        field_name="is_active",
        method="filter_active",
        help_text=(
            "Return only currently running shows (with timeslots in the future) if true "
            "or past or upcoming shows if false."
        ),
    )
    is_writable = filters.BooleanFilter(
        method="filter_writable",
        help_text="Return only shows writable by the requesting authenticated user.",
    )
    is_public = filters.BooleanFilter(
        field_name="is_public",
        help_text="Return only shows that are public/non-public.",
    )
    language_ids = IntegerInFilter(
        field_name="language",
        help_text="Return only shows of the given language(s).",
    )
    music_focus_ids = IntegerInFilter(
        field_name="music_focus",
        help_text="Return only shows with given music focus(es).",
    )
    music_focus_slug = filters.CharFilter(
        field_name="music_focus__slug",
        help_text="Return only shows with the give music focus slug.",
    )
    owner_ids = IntegerInFilter(
        field_name="owners",
        help_text="Return only shows that belong to the given owner(s).",
    )
    topic_ids = IntegerInFilter(
        field_name="topic",
        help_text="Return only shows of the given topic(s).",
    )
    topic_slug = filters.CharFilter(
        field_name="topic__slug",
        help_text="Return only shows of the given topic slug.",
    )
    type_id = IntegerInFilter(
        field_name="type",
        help_text="Return only shows of a given type.",
    )
    type_slug = filters.CharFilter(
        field_name="type__slug",
        help_text="Return only shows of the given type slug.",
    )

    def filter_active(self, queryset: QuerySet, name: str, value: bool):
        # Filter currently running shows
        # Get currently running schedules to filter by first
        # For single dates we test if there'll be one in the future (and ignore the until date)
        # TODO: Really consider first_date? (=currently active, not just upcoming ones)
        # Add limit for future?
        show_ids = (
            models.Schedule.objects.filter(
                # not "once" schedules with first_date in the past and last_date in the future
                Q(
                    rrule__freq__gt=0,
                    first_date__lte=timezone.now(),
                    last_date__gte=timezone.now(),
                )
                # "once" schedules with first_date in the future
                | Q(rrule__freq=0, first_date__gte=timezone.now())
            )
            .distinct()
            .values_list("show_id", flat=True)
        )
        if value:
            # Filter active shows based on timeslots as well as on the is_active flag
            # Even if there are future timeslots but is_active=True the show will be considered as
            # inactive
            return queryset.filter(id__in=show_ids, is_active=True)
        else:
            return queryset.exclude(id__in=show_ids, is_active=True)

    def filter_writable(self, queryset: QuerySet, _: str, value: bool) -> QuerySet:
        user = self.request.user if self.request.user.is_authenticated else None

        # user_is_privileged = (
        #     user.groups.filter(name=settings.PRIVILEGED_GROUP).exists() if user else False
        # )

        if value and user.is_superuser:
            return queryset
        elif value and user:
            return queryset.filter(owners=user)
        else:
            return models.Show.objects.none()

    class Meta:
        model = models.Show
        fields = [
            "order",
            "category_ids",
            "category_slug",
            "host_ids",
            "is_active",
            "is_public",
            "is_writable",
            "language_ids",
            "music_focus_ids",
            "music_focus_slug",
            "owner_ids",
            "topic_ids",
            "topic_slug",
            "type_id",
            "type_slug",
        ]


class ScheduleFilterSet(filters.FilterSet):
    show_ids = IntegerInFilter(
        field_name="show",
        help_text="Return only schedules that belong to the specified show(s).",
    )
    exclude_inactive = filters.BooleanFilter(
        method="filter_exclude_inactive",
        help_text="Excludes all schedules that don’t have timeslots in the future.",
    )

    def filter_exclude_inactive(self, queryset: QuerySet, name: str, value: bool):
        if not value:
            return queryset
        return queryset.filter(
            Exists(
                models.TimeSlot.objects.filter(schedule=OuterRef("pk"), end__gte=timezone.now())
            )
        )


class TimeSlotFilterSet(filters.FilterSet):
    order = filters.OrderingFilter(
        fields=[field.name for field in models.TimeSlot._meta.get_fields()]
    )
    surrounding = filters.DateTimeFilter(
        method="filter_surrounding",
        label="Return surrounding timeslots",
        help_text=(
            "Returns the 10 nearest timeslots around the specified datetime. "
            "If specified without a datetime value the current date and time is assumed."
        ),
    )
    starts_before = filters.DateTimeFilter(
        field_name="start",
        lookup_expr="lt",
        help_text="Only returns timeslots that start before the specified datetime.",
    )
    starts_after = filters.DateTimeFilter(
        field_name="start",
        lookup_expr="gte",
        help_text="Only returns timeslots that start at or after the specified datetime.",
    )
    ends_before = filters.DateTimeFilter(
        field_name="end",
        lookup_expr="lt",
        help_text="Only returns timeslots that end before the specified datetime.",
    )
    ends_after = filters.DateTimeFilter(
        field_name="end",
        lookup_expr="gte",
        help_text="Only returns timeslots that end at or after the specified datetime.",
    )
    schedule_ids = IntegerInFilter(
        field_name="schedule",
        help_text="Return only timeslots that belong to the specified schedule(s).",
    )
    show_ids = IntegerInFilter(
        field_name="schedule__show",
        help_text="Return only timeslots that belong to the specified show(s).",
    )

    def filter_surrounding(self, queryset: QuerySet, name: str, value: datetime.datetime):
        nearest_timeslots_in_future = (
            models.TimeSlot.objects.filter(start__gte=value)
            .order_by("start")
            .values_list("id", flat=True)[:5]
        )
        nearest_timeslots_in_past = (
            models.TimeSlot.objects.filter(start__lt=value)
            .order_by("-start")
            .values_list("id", flat=True)[:5]
        )
        relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(nearest_timeslots_in_past)
        return queryset.filter(id__in=relevant_timeslot_ids)

    def filter_queryset(self, queryset):
        queryset = super().filter_queryset(queryset)
        # This is for backwards compatibility as the surrounding-filter was formerly implemented
        # by just checking for the existence of the query parameter.
        if self.request.GET.get("surrounding", None) == "":
            queryset = self.filter_surrounding(queryset, "surrounding", timezone.now())
        return queryset

    class Meta:
        model = models.TimeSlot
        fields = [
            "order",
            "start",
            "end",
            "surrounding",
            "schedule_ids",
            "show_ids",
        ]


class NoteFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    ids = IntegerInFilter(
        field_name="id",
        help_text="Return only notes matching the specified id(s).",
    )
    show_ids = IntegerInFilter(
        field_name="timeslot__show",
        help_text="Return only notes that belong to the specified show(s).",
    )
    show_owner_ids = IntegerInFilter(
        field_name="timeslot__show__owners",
        help_text="Return only notes by show the specified owner(s): all notes the user may edit.",
    )
    timeslot_ids = IntegerInFilter(
        field_name="timeslot",
        help_text="Return only notes that belong to the specified timeslot(s).",
    )

    class Meta:
        model = models.Note
        fields = [
            "ids",
            "show_ids",
            "show_owner_ids",
            "timeslot_ids",
        ]


class ActiveFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
    is_active = filters.BooleanFilter(field_name="is_active")

    class Meta:
        fields = [
            "is_active",
        ]