Skip to content
Snippets Groups Projects
filters.py 7.42 KiB
Newer Older
  • Learn to ignore specific revisions
  • from django_filters import rest_framework as filters
    from django_filters import widgets
    
    
    from django.contrib.auth.models import User
    from django.db.models import Q, QuerySet
    from django.utils import timezone
    from program import models
    
    
    class StaticFilterHelpTextMixin:
        @classmethod
        def filter_for_field(cls, field, field_name, lookup_expr=None):
            _filter = super().filter_for_field(field, field_name, lookup_expr)
            if "help_text" not in _filter.extra:
                help_texts = getattr(cls.Meta, "help_texts", {})
                _filter.extra["help_text"] = help_texts.get(field_name, "")
            return _filter
    
    
    class ModelMultipleChoiceFilter(filters.ModelMultipleChoiceFilter):
        def __init__(self, *args, **kwargs):
            kwargs.setdefault("widget", widgets.CSVWidget())
            kwargs["lookup_expr"] = "in"
            super().__init__(*args, **kwargs)
    
        def get_filter_predicate(self, v):
            # There is something wrong with using ModelMultipleChoiceFilter
            # along the CSVWidget that causes lookups to fail.
            # May be related to: https://github.com/carltongibson/django-filter/issues/1103
            return super().get_filter_predicate([v.pk])
    
    
    class ShowFilterSet(StaticFilterHelpTextMixin, filters.FilterSet):
        active = filters.BooleanFilter(
            field_name="is_active",
            method="filter_active",
            help_text=(
                "Return only currently running shows if true or past or upcoming shows if false.",
            ),
        )
        host = ModelMultipleChoiceFilter(
            queryset=models.Host.objects.all(),
            field_name="hosts",
            help_text="Return only shows assigned to the given host(s).",
        )
        # TODO: replace `musicfocus` with `music_focus` when dashboard is updated
        musicfocus = ModelMultipleChoiceFilter(
            queryset=models.MusicFocus.objects.all(),
            field_name="music_focus",
            help_text="Return only shows with given music focus(es).",
        )
        owner = ModelMultipleChoiceFilter(
            queryset=User.objects.all(),
            field_name="owners",
            help_text="Return only shows that belong to the given owner(s).",
        )
        category = ModelMultipleChoiceFilter(
            queryset=models.Category.objects.all(),
            help_text="Return only shows of the given category or categories.",
        )
        language = ModelMultipleChoiceFilter(
            queryset=models.Language.objects.all(),
            help_text="Return only shows of the given language(s).",
        )
        topic = ModelMultipleChoiceFilter(
            queryset=models.Topic.objects.all(),
            help_text="Return only shows of the given topic(s).",
        )
        public = filters.BooleanFilter(
            field_name="is_public",
            help_text="Return only shows that are public/non-public.",
        )
    
        def filter_active(self, queryset: QuerySet, name: str, value: bool):
            # Filter currently running shows
            # Get currently running schedules to filter by first
            # For single dates we test if there'll be one in the future (and ignore the until date)
            # TODO: Really consider first_date? (=currently active, not just upcoming ones)
            # Add limit for future?
            show_ids = (
                models.Schedule.objects.filter(
                    Q(
                        rrule_id__gt=1,
                        first_date__lte=timezone.now(),
                        last_date__gte=timezone.now(),
                    )
                    | Q(rrule_id=1, first_date__gte=timezone.now())
                )
                .distinct()
                .values_list("show_id", flat=True)
            )
            if value:
                # Filter active shows based on timeslots as well as on the is_active flag
                # Even if there are future timeslots but is_active=True the show will be considered as
                # inactive
                return queryset.filter(id__in=show_ids, is_active=True)
            else:
                return queryset.exclude(id__in=show_ids, is_active=True)
    
        class Meta:
            model = models.Show
            help_texts = {
                "type": "Return only shows of a given type.",
            }
            fields = [
                "active",
                "category",
                "host",
                "language",
                "musicfocus",
                "owner",
                "public",
                "topic",
                "type",
            ]
    
    
    
    class TimeSlotFilterSet(filters.FilterSet):
        order = filters.OrderingFilter(
            fields=[field.name for field in models.TimeSlot._meta.get_fields()]
        )
        surrounding = filters.DateTimeFilter(
            method="filter_surrounding",
            label="Return surrounding timeslots",
            help_text=(
                "Returns the 10 nearest timeslots around the specified datetime. "
                "If specified without a datetime value the current date and time is assumed."
            ),
        )
        # The start/end filters will always be applied even if no query parameter has been set.
        # This is because we enforce a value in the clean_start and clean_end methods
        # of the filterset form.
        start = filters.DateFilter(
            method="filter_start",
            help_text=(
                "Only returns timeslots after that start on or after the specified date. "
                "By default, this is set to the current date."
            ),
        )
        end = filters.DateFilter(
            method="filter_end",
            help_text=(
                "Only returns timeslots that end on or before the specified date. "
                "By default, this is set to value of the start filter + 60 days."
            ),
        )
    
        def filter_surrounding(
            self, queryset: QuerySet, name: str, value: datetime.datetime
        ):
            nearest_timeslots_in_future = (
                models.TimeSlot.objects.filter(start__gte=value)
                .order_by("start")
                .values_list("id", flat=True)[:5]
            )
            nearest_timeslots_in_past = (
                models.TimeSlot.objects.filter(start__lt=value)
                .order_by("-start")
                .values_list("id", flat=True)[:5]
            )
            relevant_timeslot_ids = list(nearest_timeslots_in_future) + list(
                nearest_timeslots_in_past
            )
            return queryset.filter(id__in=relevant_timeslot_ids)
    
        def filter_start(self, queryset: QuerySet, name: str, value: datetime.date):
            start = timezone.make_aware(datetime.datetime.combine(value, datetime.time.min))
            return queryset.filter(start__gte=start)
    
        def filter_end(self, queryset: QuerySet, name: str, value: datetime.date):
            end = timezone.make_aware(datetime.datetime.combine(value, datetime.time.max))
            return queryset.filter(end__lte=end)
    
        def filter_queryset(self, queryset):
            queryset = super().filter_queryset(queryset)
            # This is for backwards compatibility as the surrounding-filter was formerly implemented
            # by just checking for the existence of the query parameter.
            if self.request.GET.get("surrounding", None) == "":
                queryset = self.filter_surrounding(queryset, "surrounding", timezone.now())
            return queryset
    
        class Meta:
            model = models.TimeSlot
            fields = [
                "order",
                "start",
                "end",
                "surrounding",
            ]
    
            class form(forms.Form):
                def clean_start(self):
                    start = self.cleaned_data.get("start", None)
                    return start or timezone.now().date()
    
                def clean_end(self):
                    end = self.cleaned_data.get("end", None)
                    return end or self.cleaned_data["start"] + datetime.timedelta(days=60)