Skip to content
Snippets Groups Projects
services.py 26.89 KiB
#
# steering, Programme/schedule management for AURA
#
# Copyright (C) 2017, Ingo Leindecker
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

from datetime import datetime, time, timedelta
from typing import TypedDict

from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrule
from rest_framework.exceptions import ValidationError

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q, QuerySet
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from program.models import Note, RRule, Schedule, ScheduleConflictError, Show, TimeSlot
from program.serializers import ScheduleSerializer, TimeSlotSerializer
from program.utils import parse_date, parse_datetime, parse_time


class ScheduleData(TypedDict):
    add_business_days_only: bool | None
    add_days_no: int | None
    by_weekday: int | None
    default_playlist_id: int | None
    dryrun: bool | None
    end_time: str
    first_date: str
    id: int | None
    is_repetition: bool | None
    last_date: str | None
    rrule_id: int
    show_id: int | None
    start_time: str


class Collision(TypedDict):
    end: str
    timeslot_id: int
    memo: str
    note_id: int | None
    playlist_id: int | None
    schedule_id: int
    show_id: int
    show_name: str
    start: str


class ProjectedEntry(TypedDict):
    collisions: list[Collision]
    end: str
    error: str | None
    hash: str
    solution_choices: set[str]
    start: str


class Conflicts(TypedDict):
    notes: dict
    playlists: dict
    projected: list[ProjectedEntry]
    solutions: dict[str, str]


class ScheduleCreateUpdateData(TypedDict):
    notes: dict
    playlists: dict
    schedule: ScheduleData
    solutions: dict[str, str]


def create_timeslot(start: str, end: str, schedule: Schedule) -> TimeSlot:
    """Creates and returns an unsaved timeslot with the given `start`, `end` and `schedule`."""

    return TimeSlot(
        start=parse_datetime(start),
        end=parse_datetime(end),
        schedule=schedule,
    )


def resolve_conflicts(data: ScheduleCreateUpdateData, schedule_pk: int | None, show_pk: int):
    """
    Resolves conflicts
    Expects JSON POST/PUT data from /shows/1/schedules/

    Returns a list of dicts if errors were found
    Returns an empty list if resolution was successful
    """

    schedule = data["schedule"]
    solutions = data.get("solutions", [])  # only needed if conflicts exist

    new_schedule = instantiate_upcoming_schedule(schedule, show_pk, schedule_pk)

    last_date_is_unknown = new_schedule.last_date is None  # we need to keep track of this

    # FIXME: refactor this to eliminate the duplication
    if last_date_is_unknown:
        if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR:
            year = timezone.now().year
            new_schedule.last_date = timezone.datetime(year, 12, 31).date()
        else:
            new_schedule.last_date = new_schedule.first_date + timedelta(
                days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE
            )

    show = new_schedule.show
    conflicts = make_conflicts(schedule, schedule_pk, show_pk)

    if new_schedule.rrule.freq > 0 and new_schedule.first_date == new_schedule.last_date:
        raise ValidationError(
            _("Start and end dates must not be the same."),
            code="no-same-day-start-and-end",
        )

    if new_schedule.last_date < new_schedule.first_date:
        raise ValidationError(
            _("End date mustn't be before start."),
            code="no-start-after-end",
        )

    num_conflicts = len([pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0])

    if len(solutions) != num_conflicts:
        raise ScheduleConflictError(
            _("Numbers of conflicts and solutions don't match."),
            code="one-solution-per-conflict",
            conflicts=conflicts,
        )

    to_create: list[TimeSlot] = []
    to_update: list[TimeSlot] = []
    to_delete: list[TimeSlot] = []

    errors = {}

    for timeslot in conflicts["projected"]:
        # If no solution necessary: Create the projected timeslot and skip
        if "solution_choices" not in timeslot or len(timeslot["collisions"]) == 0:
            to_create.append(
                create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
            )
            continue

        # Check hash (if start, end, rrule or by_weekday changed)
        if not timeslot["hash"] in solutions:
            errors[timeslot["hash"]] = _("This change on the timeslot is not allowed.")
            continue

        # If no resolution given: skip
        if solutions[timeslot["hash"]] == "":
            errors[timeslot["hash"]] = _("No solution given.")
            continue

        # If resolution is not accepted for this conflict: SKIP
        if not solutions[timeslot["hash"]] in timeslot["solution_choices"]:
            errors[timeslot["hash"]] = _("Given solution is not accepted for this conflict.")
            continue

        """Conflict resolution"""

        existing = timeslot["collisions"][0]
        solution = solutions[timeslot["hash"]]

        if solution == "theirs":
            # - Discard the projected timeslot
            # - Keep the existing collision(s)
            continue

        if solution == "ours":
            # - Create the projected timeslot
            # - Delete the existing collision(s)
            to_create.append(
                create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
            )

            # Delete collision(s)
            for collision in timeslot["collisions"]:
                try:
                    to_delete.append(TimeSlot.objects.get(pk=collision["timeslot_id"]))
                except ObjectDoesNotExist:
                    pass

        if solution == "theirs-end":
            # - Keep the existing timeslot
            # - Create projected with end of existing start
            to_create.append(
                create_timeslot(timeslot["start"], existing["start"], new_schedule),
            )

        if solution == "ours-end":
            # - Create the projected timeslot
            # - Change the start of the existing collision to projected end
            to_create.append(
                create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
            )

            existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
            existing_ts.start = parse_datetime(timeslot["end"])
            to_update.append(existing_ts)

        if solution == "theirs-start":
            # - Keep existing
            # - Create projected with start time of existing end
            to_create.append(
                create_timeslot(existing["end"], timeslot["end"], new_schedule),
            )

        if solution == "ours-start":
            # - Create the projected timeslot
            # - Change end of existing to projected start
            to_create.append(
                create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
            )

            existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
            existing_ts.end = parse_datetime(timeslot["start"])
            to_update.append(existing_ts)

        if solution == "theirs-both":
            # - Keep existing
            # - Create two projected timeslots with end of existing start and start of existing end
            to_create.append(
                create_timeslot(timeslot["start"], existing["start"], new_schedule),
            )

            to_create.append(
                create_timeslot(existing["end"], timeslot["end"], new_schedule),
            )

        if solution == "ours-both":
            # - Create projected
            # - Split existing into two
            #   - Set existing end time to projected start
            #   - Create another one with start = projected end and end = existing end
            to_create.append(
                create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
            )

            existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
            existing_ts.end = parse_datetime(timeslot["start"])
            to_update.append(existing_ts)

            to_create.append(
                create_timeslot(timeslot["end"], existing["end"], new_schedule),
            )

    # If there were any errors, don't make any db changes yet
    # but add error messages and return already chosen solutions
    if len(errors) > 0:
        conflicts = make_conflicts(
            ScheduleSerializer().to_representation(new_schedule), new_schedule.pk, show.pk
        )

        partly_resolved = conflicts["projected"]
        saved_solutions = {}

        # Add already chosen resolutions and error message to conflict
        for index, projected_entry in enumerate(conflicts["projected"]):
            # The element should only exist if there was a collision
            if len(projected_entry["collisions"]) > 0:
                saved_solutions[projected_entry["hash"]] = ""

            if (
                projected_entry["hash"] in solutions
                and solutions[projected_entry["hash"]] in projected_entry["solution_choices"]
            ):
                saved_solutions[projected_entry["hash"]] = solutions[projected_entry["hash"]]

            if projected_entry["hash"] in errors:
                partly_resolved[index]["error"] = errors[projected_entry["hash"]]

        # Re-insert post data
        conflicts["projected"] = partly_resolved
        conflicts["solutions"] = saved_solutions
        conflicts["notes"] = data.get("notes")
        conflicts["playlists"] = data.get("playlists")

        raise ScheduleConflictError(
            _("Not all conflicts have been resolved."),
            code="unresolved-conflicts",
            conflicts=conflicts,
        )

    remaining_timeslots = TimeSlot.objects.filter(
        schedule=new_schedule,
        start__gt=timezone.make_aware(datetime.combine(new_schedule.last_date, time(0, 0))),
    )
    for timeslot in remaining_timeslots:
        to_delete.append(timeslot)

    # If 'dryrun' is true, just return the projected changes instead of executing them
    if "dryrun" in schedule and schedule["dryrun"]:
        return {
            "create": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_create],
            "update": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_update],
            "delete": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_delete],
        }

    # Database changes if no errors found

    if last_date_is_unknown:
        new_schedule.last_date = None

    if to_create:
        new_schedule.save()

    for timeslot in to_update:
        timeslot.save(update_fields=["start", "end"])

    for timeslot in to_create:
        timeslot.schedule = new_schedule

        # Reassign playlists
        if "playlists" in data and timeslot.hash in data["playlists"]:
            timeslot.playlist_id = int(data["playlists"][timeslot.hash])

        timeslot.save()

        # Reassign notes
        if "notes" in data and timeslot.hash in data["notes"]:
            try:
                note = Note.objects.get(pk=int(data["notes"][timeslot.hash]))
                note.timeslot_id = timeslot.id
                note.save(update_fields=["timeslot_id"])

                timeslot = TimeSlot.objects.get(pk=timeslot.id)
                timeslot.note_id = note.id
                timeslot.save(update_fields=["note_id"])
            except ObjectDoesNotExist:
                pass

    for timeslot in to_delete:
        timeslot.delete()

    return ScheduleSerializer().to_representation(new_schedule)


def instantiate_upcoming_schedule(
    data: ScheduleData, show_pk: int, pk: int | None = None
) -> Schedule:
    """
    Returns an upcoming schedule instance for conflict resolution.

    If the data does not contain a last_date, the Schedule instance will not contain a last_date.
    """

    rrule = RRule.objects.get(pk=data["rrule_id"])
    show = Show.objects.get(pk=show_pk)

    is_repetition = data["is_repetition"] if "is_repetition" in data else False

    # default is `False`
    add_business_days_only = (
        data["add_business_days_only"] if "add_business_days_only" in data else False
    )

    # default is `None`
    add_days_no = data.get("add_days_no")
    by_weekday = data.get("by_weekday")
    default_playlist_id = data.get("default_playlist_id")

    first_date = parse_date(data["first_date"])
    start_time = parse_time(data["start_time"])
    end_time = parse_time(data["end_time"])

    # last_date may not be present in data
    last_date = parse_date(data["last_date"]) if data.get("last_date") is not None else None

    return Schedule(
        pk=pk,
        by_weekday=by_weekday,
        rrule=rrule,
        first_date=first_date,
        start_time=start_time,
        end_time=end_time,
        last_date=last_date,
        is_repetition=is_repetition,
        default_playlist_id=default_playlist_id,
        show=show,
        add_days_no=add_days_no,
        add_business_days_only=add_business_days_only,
    )


def make_conflicts(data: ScheduleData, schedule_pk: int | None, show_pk: int) -> Conflicts:
    """
    Retrieves POST vars
    Generates a schedule
    Generates conflicts: Returns timeslots, collisions, solutions as JSON
    Returns conflicts dict
    """

    # Generate schedule to be saved
    new_schedule = instantiate_upcoming_schedule(data, show_pk, schedule_pk)

    # Copy if first_date changes for generating timeslots
    schedule_copy = new_schedule

    # Generate timeslots

    # If extending: Get last timeslot and start generating from that date on
    if schedule_pk is not None:
        existing_schedule = Schedule.objects.get(pk=schedule_pk)

        if (
            new_schedule.last_date
            and existing_schedule.last_date
            and new_schedule.last_date > existing_schedule.last_date
        ):
            last_timeslot = (
                TimeSlot.objects.filter(schedule=existing_schedule).order_by("start").reverse()[0]
            )
            schedule_copy.first_date = last_timeslot.start.date() + timedelta(days=1)

    timeslots = generate_timeslots(schedule_copy)

    # Generate conflicts and add schedule
    conflicts = generate_conflicts(timeslots)

    # create a new dictionary by adding "schedule" to conflicts
    return dict(conflicts, schedule=ScheduleSerializer().to_representation(new_schedule))


def generate_timeslots(schedule: Schedule) -> list[TimeSlot]:
    """
    Returns a list of timeslot objects based on a schedule and its rrule
    Returns past timeslots as well, starting from first_date (not today)
    """
    timeslots = []

    if schedule.rrule.freq == 3:  # daily: Ignore schedule.by_weekday to set by_weekday
        by_weekday_start = by_weekday_end = (0, 1, 2, 3, 4, 5, 6)
    elif (
        schedule.rrule.freq == 2
        and schedule.rrule.interval == 1
        and schedule.rrule.by_weekdays is None
    ):  # weekly: Use schedule.by_weekday for by_weekday
        by_weekday_start = by_weekday_end = (
            int(schedule.by_weekday) if schedule.by_weekday is not None else None
        )

        # adjust by_weekday_end if end_time is after midnight
        if schedule.end_time < schedule.start_time:
            by_weekday_end = (
                by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0
            )
    elif (
        schedule.rrule.freq == 2
        and schedule.rrule.interval == 1
        and schedule.rrule.by_weekdays == "0,1,2,3,4"
    ):  # weekly on business days: Use schedule.rrule.by_weekdays to set by_weekday
        by_weekday_start = by_weekday_end = [
            int(wd) for wd in schedule.rrule.by_weekdays.split(",")
        ]

        # adjust by_weekday_end if end_time is after midnight
        if schedule.end_time < schedule.start_time:
            by_weekday_end = (1, 2, 3, 4, 5)
    elif (
        schedule.rrule.freq == 2
        and schedule.rrule.interval == 1
        and schedule.rrule.by_weekdays == "5,6"
    ):  # weekly on weekends: Use schedule.rrule.by_weekdays to set by_weekday
        by_weekday_start = by_weekday_end = [
            int(wd) for wd in schedule.rrule.by_weekdays.split(",")
        ]

        # adjust by_weekday_end if end_time is after midnight
        if schedule.end_time < schedule.start_time:
            by_weekday_end = (6, 0)
    elif schedule.rrule.freq == 0:  # once: Ignore schedule.by_weekday to set by_weekday
        by_weekday_start = by_weekday_end = None
    else:
        by_weekday_start = by_weekday_end = (
            int(schedule.by_weekday) if schedule.by_weekday is not None else None
        )

        # adjust by_weekday_end if end_time is after midnight
        if schedule.end_time < schedule.start_time:
            by_weekday_end = (
                by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0
            )

    # adjust last_date if end_time is after midnight
    if schedule.end_time < schedule.start_time:
        last_date = schedule.first_date + timedelta(days=+1)
    else:
        last_date = schedule.first_date

    # FIXME: refactor this to eliminate the duplication
    if schedule.last_date is None:
        if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR:
            year = timezone.now().year
            until = timezone.datetime(year, 12, 31).date()
        else:
            until = schedule.first_date + timedelta(
                days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE
            )
    else:
        until = schedule.last_date

    if schedule.rrule.freq == 0:  # once:
        starts = [datetime.combine(schedule.first_date, schedule.start_time)]
        ends = [datetime.combine(last_date, schedule.end_time)]
    else:
        starts = list(
            rrule(
                freq=schedule.rrule.freq,
                dtstart=datetime.combine(schedule.first_date, schedule.start_time),
                interval=schedule.rrule.interval,
                until=until + relativedelta(days=+1),
                bysetpos=schedule.rrule.by_set_pos,
                byweekday=by_weekday_start,
            )
        )
        ends = list(
            rrule(
                freq=schedule.rrule.freq,
                dtstart=datetime.combine(last_date, schedule.end_time),
                interval=schedule.rrule.interval,
                until=until + relativedelta(days=+1),
                bysetpos=schedule.rrule.by_set_pos,
                byweekday=by_weekday_end,
            )
        )

    for k in range(min(len(starts), len(ends))):
        # Correct dates for the (relatively seldom) case if:
        # E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00
        #       produces wrong end dates if the 1st Tuesday is before the 1st Monday
        #       In this case we take the next day instead of rrule's calculated end
        if starts[k] > ends[k]:
            ends[k] = datetime.combine(starts[k] + relativedelta(days=+1), schedule.end_time)

        """
        Add a number of days to the generated dates?

        This can be helpful for repetitions:

        Examples:

          1. If RRule is "Every 1st Monday" and we want its repetition always to be on the
             following day, the repetition's RRule is the same but add_days_no is 1

             If we would set the repetition to "Every 1st Tuesday" instead
             we will get unmeant results if the 1st Tuesday is before the 1st Monday
             (e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018)

          2. If RRule is "Every 1st Friday" and we want its repetition always to be on the
             following business day, the repetition's RRule is the same but add_days_no is 1
             and add_business_days_only is True (e.g. original date = Fri, March 2 2018;
             generated date = Mon, March 5 2018)

        In the UI these can be presets:
            "On the following day" (add_days_no=1,add_business_days_only=False) or
            "On the following business day" (add_days_no=1,add_business_days_only=True)

        """
        if schedule.add_days_no is not None and schedule.add_days_no > 0:
            # If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning
            # from Sunday
            weekday = datetime.date(starts[k]).weekday()
            if schedule.add_business_days_only and weekday > 3:
                days_until_sunday = 6 - weekday
                starts[k] = starts[k] + relativedelta(
                    days=+days_until_sunday + schedule.add_days_no
                )
                ends[k] = ends[k] + relativedelta(days=+days_until_sunday + schedule.add_days_no)
            else:
                starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no)
                ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no)

            if ends[k].date() > schedule.last_date:
                schedule.last_date = ends[k].date()
        timeslots.append(
            TimeSlot(
                schedule=schedule,
                start=timezone.make_aware(starts[k], is_dst=True),
                end=timezone.make_aware(ends[k], is_dst=True),
            )
        )

    return timeslots


def get_colliding_timeslots(timeslot: TimeSlot) -> QuerySet[TimeSlot]:
    """Gets a queryset of timeslot objects colliding with the given instance."""

    return TimeSlot.objects.filter(
        (Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end))
        | (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end))
        | (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end))
        | (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end))
    )


def generate_conflicts(timeslots: list[TimeSlot]) -> Conflicts:
    """
    Tests a list of timeslot objects for colliding timeslots in the database
    Returns a list of conflicts containing dicts of projected timeslots, collisions and
    solutions
    """

    conflicts: Conflicts = {}
    projected = []
    solutions = {}

    # Cycle each timeslot
    for ts in timeslots:
        # Contains collisions
        collisions = []

        # Contains possible solutions
        solution_choices = set()

        # Get collisions for each timeslot
        collision_list = list(get_colliding_timeslots(ts).order_by("start"))

        # Add the projected timeslot
        projected_entry = {
            "hash": ts.hash,
            "start": str(ts.start),
            "end": str(ts.end),
        }

        for c in collision_list:
            # Add the collision
            collision = {
                "timeslot_id": c.id,
                "start": str(c.start),
                "end": str(c.end),
                "playlist_id": c.playlist_id,
                "show_id": c.schedule.show.id,
                "show_name": c.schedule.show.name,
                "schedule_id": c.schedule_id,
                "memo": c.memo,
            }

            # Get note
            try:
                note = Note.objects.get(timeslot=c.id)
                collision["note_id"] = note.pk
            except ObjectDoesNotExist:
                collision["note_id"] = None

            collisions.append(collision)

            """Determine acceptable solutions"""

            if len(collision_list) > 1:
                # If there is more than one collision: Only these two are supported at the
                # moment
                solution_choices.add("theirs")
                solution_choices.add("ours")
            else:
                # These two are always possible: Either keep theirs and remove ours or vice
                # versa
                solution_choices.add("theirs")
                solution_choices.add("ours")

                # Partly overlapping: projected starts earlier than existing and ends earlier
                #
                #    ex.  pr.
                #        +--+
                #        |  |
                #   +--+ |  |
                #   |  | +--+
                #   |  |
                #   +--+
                #
                if ts.end > c.start > ts.start <= c.end:
                    solution_choices.add("theirs-end")
                    solution_choices.add("ours-end")

                # Partly overlapping: projected starts later than existing and ends later
                #
                #    ex.  pr.
                #   +--+
                #   |  |
                #   |  | +--+
                #   +--+ |  |
                #        |  |
                #        +--+
                #
                if c.start <= ts.start < c.end < ts.end:
                    solution_choices.add("theirs-start")
                    solution_choices.add("ours-start")

                # Fully overlapping: projected starts earlier and ends later than existing
                #
                #    ex.  pr.
                #        +--+
                #   +--+ |  |
                #   |  | |  |
                #   +--+ |  |
                #        +--+
                #
                if ts.start < c.start and ts.end > c.end:
                    solution_choices.add("theirs-end")
                    solution_choices.add("theirs-start")
                    solution_choices.add("theirs-both")

                # Fully overlapping: projected starts later and ends earlier than existing
                #
                #    ex.  pr.
                #   +--+
                #   |  | +--+
                #   |  | |  |
                #   |  | +--+
                #   +--+
                #
                if ts.start > c.start and ts.end < c.end:
                    solution_choices.add("ours-end")
                    solution_choices.add("ours-start")
                    solution_choices.add("ours-both")

        if len(collisions) > 0:
            solutions[ts.hash] = ""

        projected_entry["collisions"] = collisions
        projected_entry["solution_choices"] = solution_choices
        projected_entry["error"] = None
        projected.append(projected_entry)

    conflicts["projected"] = projected
    conflicts["solutions"] = solutions
    conflicts["notes"] = {}
    conflicts["playlists"] = {}

    return conflicts


def get_timerange_timeslots(start: datetime, end: datetime) -> QuerySet[TimeSlot]:
    """Gets a queryset of timeslots between the given `start` and `end` datetime."""

    return TimeSlot.objects.filter(
        # start before `start` and end after `start`
        Q(start__lt=start, end__gt=start)
        # start after/at `start`, end before/at `end`
        | Q(start__gte=start, end__lte=end)
        # start before `end`, end after/at `end`
        | Q(start__lt=end, end__gte=end)
    )