# # steering, Programme/schedule management for AURA # # Copyright (C) 2017, Ingo Leindecker # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # from datetime import datetime, time, timedelta from typing import TypedDict from dateutil.relativedelta import relativedelta from dateutil.rrule import rrule from rest_framework.exceptions import ValidationError from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q, QuerySet from django.utils import timezone from django.utils.translation import gettext_lazy as _ from program.models import Note, RRule, Schedule, ScheduleConflictError, Show, TimeSlot from program.serializers import ScheduleSerializer, TimeSlotSerializer from program.utils import parse_date, parse_datetime, parse_time from steering.settings import ( AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE, AUTO_SET_LAST_DATE_TO_END_OF_YEAR, ) class ScheduleData(TypedDict): add_business_days_only: bool | None add_days_no: int | None by_weekday: int | None default_playlist_id: int | None dryrun: bool | None end_time: str first_date: str id: int | None is_repetition: bool | None last_date: str | None rrule_id: int show_id: int | None start_time: str class Collision(TypedDict): end: str timeslot_id: int memo: str note_id: int | None playlist_id: int | None schedule_id: int show_id: int show_name: str start: str class ProjectedEntry(TypedDict): collisions: list[Collision] end: str error: str | None hash: str solution_choices: set[str] start: str class Conflicts(TypedDict): notes: dict playlists: dict projected: list[ProjectedEntry] solutions: dict[str, str] class ScheduleCreateUpdateData(TypedDict): notes: dict playlists: dict schedule: ScheduleData solutions: dict[str, str] def create_timeslot(start: str, end: str, schedule: Schedule) -> TimeSlot: """Creates and returns an unsaved timeslot with the given `start`, `end` and `schedule`.""" return TimeSlot( start=parse_datetime(start), end=parse_datetime(end), schedule=schedule, ) def resolve_conflicts(data: ScheduleCreateUpdateData, schedule_pk: int | None, show_pk: int): """ Resolves conflicts Expects JSON POST/PUT data from /shows/1/schedules/ Returns a list of dicts if errors were found Returns an empty list if resolution was successful """ schedule = data["schedule"] solutions = data.get("solutions", []) # only needed if conflicts exist new_schedule = instantiate_upcoming_schedule(schedule, show_pk, schedule_pk) last_date_is_unknown = new_schedule.last_date is None # we need to keep track of this # FIXME: refactor this to eliminate the duplication if last_date_is_unknown: if AUTO_SET_LAST_DATE_TO_END_OF_YEAR: year = timezone.now().year new_schedule.last_date = timezone.datetime(year, 12, 31).date() else: new_schedule.last_date = new_schedule.first_date + timedelta( days=+AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE ) show = new_schedule.show conflicts = make_conflicts(schedule, schedule_pk, show_pk) if new_schedule.rrule.freq > 0 and new_schedule.first_date == new_schedule.last_date: raise ValidationError( _("Start and end dates must not be the same."), code="no-same-day-start-and-end", ) if new_schedule.last_date < new_schedule.first_date: raise ValidationError( _("End date mustn't be before start."), code="no-start-after-end", ) num_conflicts = len([pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0]) if len(solutions) != num_conflicts: raise ScheduleConflictError( _("Numbers of conflicts and solutions don't match."), code="one-solution-per-conflict", conflicts=conflicts, ) to_create: list[TimeSlot] = [] to_update: list[TimeSlot] = [] to_delete: list[TimeSlot] = [] errors = {} for timeslot in conflicts["projected"]: # If no solution necessary: Create the projected timeslot and skip if "solution_choices" not in timeslot or len(timeslot["collisions"]) == 0: to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) continue # Check hash (if start, end, rrule or by_weekday changed) if not timeslot["hash"] in solutions: errors[timeslot["hash"]] = _("This change on the timeslot is not allowed.") continue # If no resolution given: skip if solutions[timeslot["hash"]] == "": errors[timeslot["hash"]] = _("No solution given.") continue # If resolution is not accepted for this conflict: SKIP if not solutions[timeslot["hash"]] in timeslot["solution_choices"]: errors[timeslot["hash"]] = _("Given solution is not accepted for this conflict.") continue """Conflict resolution""" existing = timeslot["collisions"][0] solution = solutions[timeslot["hash"]] if solution == "theirs": # - Discard the projected timeslot # - Keep the existing collision(s) continue if solution == "ours": # - Create the projected timeslot # - Delete the existing collision(s) to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) # Delete collision(s) for collision in timeslot["collisions"]: try: to_delete.append(TimeSlot.objects.get(pk=collision["timeslot_id"])) except ObjectDoesNotExist: pass if solution == "theirs-end": # - Keep the existing timeslot # - Create projected with end of existing start to_create.append( create_timeslot(timeslot["start"], existing["start"], new_schedule), ) if solution == "ours-end": # - Create the projected timeslot # - Change the start of the existing collision to projected end to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"]) existing_ts.start = parse_datetime(timeslot["end"]) to_update.append(existing_ts) if solution == "theirs-start": # - Keep existing # - Create projected with start time of existing end to_create.append( create_timeslot(existing["end"], timeslot["end"], new_schedule), ) if solution == "ours-start": # - Create the projected timeslot # - Change end of existing to projected start to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"]) existing_ts.end = parse_datetime(timeslot["start"]) to_update.append(existing_ts) if solution == "theirs-both": # - Keep existing # - Create two projected timeslots with end of existing start and start of existing end to_create.append( create_timeslot(timeslot["start"], existing["start"], new_schedule), ) to_create.append( create_timeslot(existing["end"], timeslot["end"], new_schedule), ) if solution == "ours-both": # - Create projected # - Split existing into two # - Set existing end time to projected start # - Create another one with start = projected end and end = existing end to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"]) existing_ts.end = parse_datetime(timeslot["start"]) to_update.append(existing_ts) to_create.append( create_timeslot(timeslot["end"], existing["end"], new_schedule), ) # If there were any errors, don't make any db changes yet # but add error messages and return already chosen solutions if len(errors) > 0: conflicts = make_conflicts( ScheduleSerializer().to_representation(new_schedule), new_schedule.pk, show.pk ) partly_resolved = conflicts["projected"] saved_solutions = {} # Add already chosen resolutions and error message to conflict for index, projected_entry in enumerate(conflicts["projected"]): # The element should only exist if there was a collision if len(projected_entry["collisions"]) > 0: saved_solutions[projected_entry["hash"]] = "" if ( projected_entry["hash"] in solutions and solutions[projected_entry["hash"]] in projected_entry["solution_choices"] ): saved_solutions[projected_entry["hash"]] = solutions[projected_entry["hash"]] if projected_entry["hash"] in errors: partly_resolved[index]["error"] = errors[projected_entry["hash"]] # Re-insert post data conflicts["projected"] = partly_resolved conflicts["solutions"] = saved_solutions conflicts["notes"] = data.get("notes") conflicts["playlists"] = data.get("playlists") raise ScheduleConflictError( _("Not all conflicts have been resolved."), code="unresolved-conflicts", conflicts=conflicts, ) remaining_timeslots = TimeSlot.objects.filter( schedule=new_schedule, start__gt=timezone.make_aware(datetime.combine(new_schedule.last_date, time(0, 0))), ) for timeslot in remaining_timeslots: to_delete.append(timeslot) # If 'dryrun' is true, just return the projected changes instead of executing them if "dryrun" in schedule and schedule["dryrun"]: return { "create": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_create], "update": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_update], "delete": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_delete], } # Database changes if no errors found if last_date_is_unknown: new_schedule.last_date = None if to_create: new_schedule.save() for timeslot in to_update: timeslot.save(update_fields=["start", "end"]) for timeslot in to_create: timeslot.schedule = new_schedule # Reassign playlists if "playlists" in data and timeslot.hash in data["playlists"]: timeslot.playlist_id = int(data["playlists"][timeslot.hash]) timeslot.save() # Reassign notes if "notes" in data and timeslot.hash in data["notes"]: try: note = Note.objects.get(pk=int(data["notes"][timeslot.hash])) note.timeslot_id = timeslot.id note.save(update_fields=["timeslot_id"]) timeslot = TimeSlot.objects.get(pk=timeslot.id) timeslot.note_id = note.id timeslot.save(update_fields=["note_id"]) except ObjectDoesNotExist: pass for timeslot in to_delete: timeslot.delete() return ScheduleSerializer().to_representation(new_schedule) def instantiate_upcoming_schedule( data: ScheduleData, show_pk: int, pk: int | None = None ) -> Schedule: """ Returns an upcoming schedule instance for conflict resolution. If the data does not contain a last_date, the Schedule instance will not contain a last_date. """ rrule = RRule.objects.get(pk=data["rrule_id"]) show = Show.objects.get(pk=show_pk) is_repetition = data["is_repetition"] if "is_repetition" in data else False # default is `False` add_business_days_only = ( data["add_business_days_only"] if "add_business_days_only" in data else False ) # default is `None` add_days_no = data.get("add_days_no") by_weekday = data.get("by_weekday") default_playlist_id = data.get("default_playlist_id") first_date = parse_date(data["first_date"]) start_time = parse_time(data["start_time"]) end_time = parse_time(data["end_time"]) # last_date may not be present in data last_date = parse_date(data["last_date"]) if data.get("last_date") is not None else None return Schedule( pk=pk, by_weekday=by_weekday, rrule=rrule, first_date=first_date, start_time=start_time, end_time=end_time, last_date=last_date, is_repetition=is_repetition, default_playlist_id=default_playlist_id, show=show, add_days_no=add_days_no, add_business_days_only=add_business_days_only, ) def make_conflicts(data: ScheduleData, schedule_pk: int | None, show_pk: int) -> Conflicts: """ Retrieves POST vars Generates a schedule Generates conflicts: Returns timeslots, collisions, solutions as JSON Returns conflicts dict """ # Generate schedule to be saved new_schedule = instantiate_upcoming_schedule(data, show_pk, schedule_pk) # Copy if first_date changes for generating timeslots schedule_copy = new_schedule # Generate timeslots # If extending: Get last timeslot and start generating from that date on if schedule_pk is not None: existing_schedule = Schedule.objects.get(pk=schedule_pk) if ( new_schedule.last_date and existing_schedule.last_date and new_schedule.last_date > existing_schedule.last_date ): last_timeslot = ( TimeSlot.objects.filter(schedule=existing_schedule).order_by("start").reverse()[0] ) schedule_copy.first_date = last_timeslot.start.date() + timedelta(days=1) timeslots = generate_timeslots(schedule_copy) # Generate conflicts and add schedule conflicts = generate_conflicts(timeslots) # create a new dictionary by adding "schedule" to conflicts return dict(conflicts, schedule=ScheduleSerializer().to_representation(new_schedule)) def generate_timeslots(schedule: Schedule) -> list[TimeSlot]: """ Returns a list of timeslot objects based on a schedule and its rrule Returns past timeslots as well, starting from first_date (not today) """ timeslots = [] if schedule.rrule.freq == 3: # daily: Ignore schedule.by_weekday to set by_weekday by_weekday_start = by_weekday_end = (0, 1, 2, 3, 4, 5, 6) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays is None ): # weekly: Use schedule.by_weekday for by_weekday by_weekday_start = by_weekday_end = ( int(schedule.by_weekday) if schedule.by_weekday is not None else None ) # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = ( by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0 ) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays == "0,1,2,3,4" ): # weekly on business days: Use schedule.rrule.by_weekdays to set by_weekday by_weekday_start = by_weekday_end = [ int(wd) for wd in schedule.rrule.by_weekdays.split(",") ] # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = (1, 2, 3, 4, 5) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays == "5,6" ): # weekly on weekends: Use schedule.rrule.by_weekdays to set by_weekday by_weekday_start = by_weekday_end = [ int(wd) for wd in schedule.rrule.by_weekdays.split(",") ] # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = (6, 0) elif schedule.rrule.freq == 0: # once: Ignore schedule.by_weekday to set by_weekday by_weekday_start = by_weekday_end = None else: by_weekday_start = by_weekday_end = ( int(schedule.by_weekday) if schedule.by_weekday is not None else None ) # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = ( by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0 ) # adjust last_date if end_time is after midnight if schedule.end_time < schedule.start_time: last_date = schedule.first_date + timedelta(days=+1) else: last_date = schedule.first_date # FIXME: refactor this to eliminate the duplication if schedule.last_date is None: if AUTO_SET_LAST_DATE_TO_END_OF_YEAR: year = timezone.now().year until = timezone.datetime(year, 12, 31).date() else: until = schedule.first_date + timedelta(days=+AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE) else: until = schedule.last_date if schedule.rrule.freq == 0: # once: starts = [datetime.combine(schedule.first_date, schedule.start_time)] ends = [datetime.combine(last_date, schedule.end_time)] else: starts = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(schedule.first_date, schedule.start_time), interval=schedule.rrule.interval, until=until + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_start, ) ) ends = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(last_date, schedule.end_time), interval=schedule.rrule.interval, until=until + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_end, ) ) for k in range(min(len(starts), len(ends))): # Correct dates for the (relatively seldom) case if: # E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00 # produces wrong end dates if the 1st Tuesday is before the 1st Monday # In this case we take the next day instead of rrule's calculated end if starts[k] > ends[k]: ends[k] = datetime.combine(starts[k] + relativedelta(days=+1), schedule.end_time) """ Add a number of days to the generated dates? This can be helpful for repetitions: Examples: 1. If RRule is "Every 1st Monday" and we want its repetition always to be on the following day, the repetition's RRule is the same but add_days_no is 1 If we would set the repetition to "Every 1st Tuesday" instead we will get unmeant results if the 1st Tuesday is before the 1st Monday (e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018) 2. If RRule is "Every 1st Friday" and we want its repetition always to be on the following business day, the repetition's RRule is the same but add_days_no is 1 and add_business_days_only is True (e.g. original date = Fri, March 2 2018; generated date = Mon, March 5 2018) In the UI these can be presets: "On the following day" (add_days_no=1,add_business_days_only=False) or "On the following business day" (add_days_no=1,add_business_days_only=True) """ if schedule.add_days_no is not None and schedule.add_days_no > 0: # If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning # from Sunday weekday = datetime.date(starts[k]).weekday() if schedule.add_business_days_only and weekday > 3: days_until_sunday = 6 - weekday starts[k] = starts[k] + relativedelta( days=+days_until_sunday + schedule.add_days_no ) ends[k] = ends[k] + relativedelta(days=+days_until_sunday + schedule.add_days_no) else: starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no) ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no) if ends[k].date() > schedule.last_date: schedule.last_date = ends[k].date() timeslots.append( TimeSlot( schedule=schedule, start=timezone.make_aware(starts[k], is_dst=True), end=timezone.make_aware(ends[k], is_dst=True), ) ) return timeslots def get_colliding_timeslots(timeslot: TimeSlot) -> QuerySet[TimeSlot]: """Gets a queryset of timeslot objects colliding with the given instance.""" return TimeSlot.objects.filter( (Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end)) | (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end)) ) def generate_conflicts(timeslots: list[TimeSlot]) -> Conflicts: """ Tests a list of timeslot objects for colliding timeslots in the database Returns a list of conflicts containing dicts of projected timeslots, collisions and solutions """ conflicts: Conflicts = {} projected = [] solutions = {} # Cycle each timeslot for ts in timeslots: # Contains collisions collisions = [] # Contains possible solutions solution_choices = set() # Get collisions for each timeslot collision_list = list(get_colliding_timeslots(ts).order_by("start")) # Add the projected timeslot projected_entry = { "hash": ts.hash, "start": str(ts.start), "end": str(ts.end), } for c in collision_list: # Add the collision collision = { "timeslot_id": c.id, "start": str(c.start), "end": str(c.end), "playlist_id": c.playlist_id, "show_id": c.schedule.show.id, "show_name": c.schedule.show.name, "schedule_id": c.schedule_id, "memo": c.memo, } # Get note try: note = Note.objects.get(timeslot=c.id) collision["note_id"] = note.pk except ObjectDoesNotExist: collision["note_id"] = None collisions.append(collision) """Determine acceptable solutions""" if len(collision_list) > 1: # If there is more than one collision: Only these two are supported at the # moment solution_choices.add("theirs") solution_choices.add("ours") else: # These two are always possible: Either keep theirs and remove ours or vice # versa solution_choices.add("theirs") solution_choices.add("ours") # Partly overlapping: projected starts earlier than existing and ends earlier # # ex. pr. # +--+ # | | # +--+ | | # | | +--+ # | | # +--+ # if ts.end > c.start > ts.start <= c.end: solution_choices.add("theirs-end") solution_choices.add("ours-end") # Partly overlapping: projected starts later than existing and ends later # # ex. pr. # +--+ # | | # | | +--+ # +--+ | | # | | # +--+ # if c.start <= ts.start < c.end < ts.end: solution_choices.add("theirs-start") solution_choices.add("ours-start") # Fully overlapping: projected starts earlier and ends later than existing # # ex. pr. # +--+ # +--+ | | # | | | | # +--+ | | # +--+ # if ts.start < c.start and ts.end > c.end: solution_choices.add("theirs-end") solution_choices.add("theirs-start") solution_choices.add("theirs-both") # Fully overlapping: projected starts later and ends earlier than existing # # ex. pr. # +--+ # | | +--+ # | | | | # | | +--+ # +--+ # if ts.start > c.start and ts.end < c.end: solution_choices.add("ours-end") solution_choices.add("ours-start") solution_choices.add("ours-both") if len(collisions) > 0: solutions[ts.hash] = "" projected_entry["collisions"] = collisions projected_entry["solution_choices"] = solution_choices projected_entry["error"] = None projected.append(projected_entry) conflicts["projected"] = projected conflicts["solutions"] = solutions conflicts["notes"] = {} conflicts["playlists"] = {} return conflicts def get_timerange_timeslots(start: datetime, end: datetime) -> QuerySet[TimeSlot]: """Gets a queryset of timeslots between the given `start` and `end` datetime.""" return TimeSlot.objects.filter( # start before `start` and end after `start` Q(start__lt=start, end__gt=start) # start after/at `start`, end before/at `end` | Q(start__gte=start, end__lte=end) # start before `end`, end after/at `end` | Q(start__lt=end, end__gte=end) )