# # steering, Programme/schedule management for AURA # # Copyright (C) 2017, Ingo Leindecker # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # import copy from collections.abc import Iterator from datetime import datetime, time, timedelta from dateutil.relativedelta import relativedelta from dateutil.rrule import rrule from rest_framework.exceptions import ValidationError from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q, QuerySet from django.utils import timezone from django.utils.translation import gettext_lazy as _ from program.models import ( Note, ProgramEntry, RadioSettings, RRule, Schedule, ScheduleConflictError, Show, TimeSlot, ) from program.serializers import ScheduleSerializer, TimeSlotSerializer from program.typing import Conflicts, ScheduleCreateUpdateData, ScheduleData from program.utils import parse_date, parse_datetime, parse_time def create_timeslot(start: str, end: str, schedule: Schedule) -> TimeSlot: """Creates and returns an unsaved timeslot with the given `start`, `end` and `schedule`.""" return TimeSlot( start=parse_datetime(start), end=parse_datetime(end), schedule=schedule, ) def resolve_conflicts(data: ScheduleCreateUpdateData, schedule_pk: int | None, show_pk: int): """ Resolves conflicts Expects JSON POST/PUT data from /shows/1/schedules/ Returns a list of dicts if errors were found Returns an empty list if resolution was successful """ schedule = data["schedule"] solutions = data.get("solutions", []) # only needed if conflicts exist new_schedule = instantiate_upcoming_schedule(schedule, show_pk, schedule_pk) last_date_is_unknown = new_schedule.last_date is None # we need to keep track of this # FIXME: refactor this to eliminate the duplication if last_date_is_unknown: if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR: year = timezone.now().year new_schedule.last_date = timezone.datetime(year, 12, 31).date() else: new_schedule.last_date = new_schedule.first_date + timedelta( days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE ) show = new_schedule.show conflicts = make_conflicts(schedule, schedule_pk, show_pk) if new_schedule.rrule.freq > 0 and new_schedule.first_date == new_schedule.last_date: raise ValidationError( _("Start and end dates must not be the same."), code="no-same-day-start-and-end", ) if new_schedule.last_date < new_schedule.first_date: raise ValidationError( _("End date mustn't be before start."), code="no-start-after-end", ) num_conflicts = len([pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0]) if len(solutions) != num_conflicts: raise ScheduleConflictError( _("Numbers of conflicts and solutions don't match."), code="one-solution-per-conflict", conflicts=conflicts, ) to_create: list[TimeSlot] = [] to_update: list[TimeSlot] = [] to_delete: list[TimeSlot] = [] to_instantiate: list[Schedule] = [] # only needed for the "ours-both" solution errors = {} for timeslot in conflicts["projected"]: # If no solution necessary: Create the projected timeslot and skip if "solution_choices" not in timeslot or len(timeslot["collisions"]) == 0: to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) continue # Check hash (if start, end, rrule or by_weekday changed) if not timeslot["hash"] in solutions: errors[timeslot["hash"]] = _("This change on the timeslot is not allowed.") continue # If no resolution given: skip if solutions[timeslot["hash"]] == "": errors[timeslot["hash"]] = _("No solution given.") continue # If resolution is not accepted for this conflict: SKIP if not solutions[timeslot["hash"]] in timeslot["solution_choices"]: errors[timeslot["hash"]] = _("Given solution is not accepted for this conflict.") continue existing = timeslot["collisions"][0] solution = solutions[timeslot["hash"]] if solution == "theirs": # - Discard the projected timeslot # - Keep the existing collision(s) continue if solution == "ours": # - Create the projected timeslot # - Delete the existing collision(s) to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) # Delete collision(s) for collision in timeslot["collisions"]: try: to_delete.append(TimeSlot.objects.get(pk=collision["timeslot_id"])) except ObjectDoesNotExist: pass if solution == "theirs-end": # - Keep the existing timeslot # - Create projected with end of existing start to_create.append( create_timeslot(timeslot["start"], existing["start"], new_schedule), ) if solution == "ours-end": # - Create the projected timeslot # - Change the start of the existing collision to projected end to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"]) existing_ts.start = parse_datetime(timeslot["end"]) to_update.append(existing_ts) if solution == "theirs-start": # - Keep existing # - Create projected with start time of existing end to_create.append( create_timeslot(existing["end"], timeslot["end"], new_schedule), ) if solution == "ours-start": # - Create the projected timeslot # - Change end of existing to projected start to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"]) existing_ts.end = parse_datetime(timeslot["start"]) to_update.append(existing_ts) if solution == "theirs-both": # - Keep existing # - Create two projected timeslots with end of existing start and start of existing end to_create.append( create_timeslot(timeslot["start"], existing["start"], new_schedule), ) to_create.append( create_timeslot(existing["end"], timeslot["end"], new_schedule), ) if solution == "ours-both": # - Create projected # - Split existing into two # - Set existing end time to projected start # - Clone the existing with start = projected end and end = existing end to_create.append( create_timeslot(timeslot["start"], timeslot["end"], new_schedule), ) existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"]) existing_ts.end = parse_datetime(timeslot["start"]) to_update.append(existing_ts) cloned_schedule = copy.copy(existing_ts.schedule) cloned_schedule.pk = None cloned_schedule.start_time = parse_datetime(timeslot["end"]) cloned_schedule.end_time = parse_datetime(existing["end"]) # keep track of the schedule to instantiate it before creating a timeslot if needed to_instantiate.append(cloned_schedule) to_create.append(create_timeslot(timeslot["end"], existing["end"], cloned_schedule)) # If there were any errors, don't make any db changes yet # but add error messages and return already chosen solutions if len(errors) > 0: conflicts = make_conflicts( ScheduleSerializer().to_representation(new_schedule), new_schedule.pk, show.pk ) partly_resolved = conflicts["projected"] saved_solutions = {} # Add already chosen resolutions and error message to conflict for index, projected_entry in enumerate(conflicts["projected"]): # The element should only exist if there was a collision if len(projected_entry["collisions"]) > 0: saved_solutions[projected_entry["hash"]] = "" if ( projected_entry["hash"] in solutions and solutions[projected_entry["hash"]] in projected_entry["solution_choices"] ): saved_solutions[projected_entry["hash"]] = solutions[projected_entry["hash"]] if projected_entry["hash"] in errors: partly_resolved[index]["error"] = errors[projected_entry["hash"]] # Re-insert post data conflicts["projected"] = partly_resolved conflicts["solutions"] = saved_solutions conflicts["notes"] = data.get("notes") conflicts["playlists"] = data.get("playlists") raise ScheduleConflictError( _("Not all conflicts have been resolved."), code="unresolved-conflicts", conflicts=conflicts, ) remaining_timeslots = TimeSlot.objects.filter( schedule=new_schedule, start__gt=timezone.make_aware(datetime.combine(new_schedule.last_date, time(0, 0))), ) for timeslot in remaining_timeslots: to_delete.append(timeslot) # If 'dryrun' is true, just return the projected changes instead of executing them if "dryrun" in schedule and schedule["dryrun"]: return { "create": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_create], "update": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_update], "delete": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_delete], } # Database changes if no errors found if last_date_is_unknown: new_schedule.last_date = None for cloned_schedule in to_instantiate: cloned_schedule.save() if to_create: new_schedule.save() for timeslot in to_update: timeslot.save(update_fields=["start", "end"]) for timeslot in to_create: # Reassign playlists if "playlists" in data and timeslot.hash in data["playlists"]: timeslot.playlist_id = int(data["playlists"][timeslot.hash]) timeslot.save() # Reassign notes if "notes" in data and timeslot.hash in data["notes"]: try: note = Note.objects.get(pk=int(data["notes"][timeslot.hash])) note.timeslot_id = timeslot.id note.save(update_fields=["timeslot_id"]) timeslot = TimeSlot.objects.get(pk=timeslot.id) timeslot.note_id = note.id timeslot.save(update_fields=["note_id"]) except ObjectDoesNotExist: pass for timeslot in to_delete: timeslot.delete() return ScheduleSerializer().to_representation(new_schedule) def instantiate_upcoming_schedule( data: ScheduleData, show_pk: int, pk: int | None = None ) -> Schedule: """ Returns an upcoming schedule instance for conflict resolution. If the data does not contain a last_date, the Schedule instance will not contain a last_date. """ rrule = RRule.objects.get(pk=data["rrule_id"]) show = Show.objects.get(pk=show_pk) is_repetition = data.get("is_repetition", False) add_business_days_only = data.get("add_business_days_only", False) # default is `None` add_days_no = data.get("add_days_no") by_weekday = data.get("by_weekday") default_playlist_id = data.get("default_playlist_id") # required first_date = parse_date(data["first_date"]) start_time = parse_time(data["start_time"]) end_time = parse_time(data["end_time"]) # last_date may not be present in data, parse_date returns `None` in this case last_date = parse_date(data.get("last_date")) return Schedule( pk=pk, by_weekday=by_weekday, rrule=rrule, first_date=first_date, start_time=start_time, end_time=end_time, last_date=last_date, is_repetition=is_repetition, default_playlist_id=default_playlist_id, show=show, add_days_no=add_days_no, add_business_days_only=add_business_days_only, ) def make_conflicts(data: ScheduleData, schedule_pk: int | None, show_pk: int) -> Conflicts: """ Retrieves POST vars Generates a schedule Generates conflicts: Returns timeslots, collisions, solutions as JSON Returns conflicts dict """ # Generate schedule to be saved new_schedule = instantiate_upcoming_schedule(data, show_pk, schedule_pk) # Copy if first_date changes for generating timeslots schedule_copy = new_schedule # Generate timeslots # If extending: Get last timeslot and start generating from that date on if schedule_pk is not None: existing_schedule = Schedule.objects.get(pk=schedule_pk) if ( new_schedule.last_date and existing_schedule.last_date and new_schedule.last_date > existing_schedule.last_date ): last_timeslot = ( TimeSlot.objects.filter(schedule=existing_schedule).order_by("start").reverse()[0] ) schedule_copy.first_date = last_timeslot.start.date() + timedelta(days=1) timeslots = generate_timeslots(schedule_copy) # Generate conflicts and add schedule conflicts = generate_conflicts(timeslots) # create a new dictionary by adding "schedule" to conflicts return dict(conflicts, schedule=ScheduleSerializer().to_representation(new_schedule)) def generate_timeslots(schedule: Schedule) -> list[TimeSlot]: """ Returns a list of timeslot objects based on a schedule and its rrule Returns past timeslots as well, starting from first_date (not today) """ timeslots = [] if schedule.rrule.freq == 3: # daily: Ignore schedule.by_weekday to set by_weekday by_weekday_start = by_weekday_end = (0, 1, 2, 3, 4, 5, 6) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays is None ): # weekly: Use schedule.by_weekday for by_weekday by_weekday_start = by_weekday_end = ( int(schedule.by_weekday) if schedule.by_weekday is not None else None ) # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = ( by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0 ) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays == "0,1,2,3,4" ): # weekly on business days: Use schedule.rrule.by_weekdays to set by_weekday by_weekday_start = by_weekday_end = [ int(wd) for wd in schedule.rrule.by_weekdays.split(",") ] # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = (1, 2, 3, 4, 5) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays == "5,6" ): # weekly on weekends: Use schedule.rrule.by_weekdays to set by_weekday by_weekday_start = by_weekday_end = [ int(wd) for wd in schedule.rrule.by_weekdays.split(",") ] # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = (6, 0) elif schedule.rrule.freq == 0: # once: Ignore schedule.by_weekday to set by_weekday by_weekday_start = by_weekday_end = None else: by_weekday_start = by_weekday_end = ( int(schedule.by_weekday) if schedule.by_weekday is not None else None ) # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = ( by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0 ) # adjust last_date if end_time is after midnight if schedule.end_time < schedule.start_time: last_date = schedule.first_date + timedelta(days=+1) else: last_date = schedule.first_date # FIXME: refactor this to eliminate the duplication if schedule.last_date is None: if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR: year = timezone.now().year until = timezone.datetime(year, 12, 31).date() else: until = schedule.first_date + timedelta( days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE ) else: until = schedule.last_date if schedule.rrule.freq == 0: # once: starts = [datetime.combine(schedule.first_date, schedule.start_time)] ends = [datetime.combine(last_date, schedule.end_time)] else: starts = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(schedule.first_date, schedule.start_time), interval=schedule.rrule.interval, until=until + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_start, ) ) ends = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(last_date, schedule.end_time), interval=schedule.rrule.interval, until=until + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_end, ) ) for k in range(min(len(starts), len(ends))): # Correct dates for the (relatively seldom) case if: # E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00 # produces wrong end dates if the 1st Tuesday is before the 1st Monday # In this case we take the next day instead of rrule's calculated end if starts[k] > ends[k]: ends[k] = datetime.combine(starts[k] + relativedelta(days=+1), schedule.end_time) """ Add a number of days to the generated dates? This can be helpful for repetitions: Examples: 1. If RRule is "Every 1st Monday" and we want its repetition always to be on the following day, the repetition's RRule is the same but add_days_no is 1 If we would set the repetition to "Every 1st Tuesday" instead we will get unmeant results if the 1st Tuesday is before the 1st Monday (e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018) 2. If RRule is "Every 1st Friday" and we want its repetition always to be on the following business day, the repetition's RRule is the same but add_days_no is 1 and add_business_days_only is True (e.g. original date = Fri, March 2 2018; generated date = Mon, March 5 2018) In the UI these can be presets: "On the following day" (add_days_no=1,add_business_days_only=False) or "On the following business day" (add_days_no=1,add_business_days_only=True) """ if schedule.add_days_no is not None and schedule.add_days_no > 0: # If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning # from Sunday weekday = datetime.date(starts[k]).weekday() if schedule.add_business_days_only and weekday > 3: days_until_sunday = 6 - weekday starts[k] = starts[k] + relativedelta( days=+days_until_sunday + schedule.add_days_no ) ends[k] = ends[k] + relativedelta(days=+days_until_sunday + schedule.add_days_no) else: starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no) ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no) if ends[k].date() > schedule.last_date: schedule.last_date = ends[k].date() timeslots.append( TimeSlot( schedule=schedule, start=timezone.make_aware(starts[k], is_dst=True), end=timezone.make_aware(ends[k], is_dst=True), ) ) return timeslots def get_colliding_timeslots(timeslot: TimeSlot) -> QuerySet[TimeSlot]: """Gets a queryset of timeslot objects colliding with the given instance.""" return TimeSlot.objects.filter( (Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end)) | (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end)) ) def generate_conflicts(timeslots: list[TimeSlot]) -> Conflicts: """ Tests a list of timeslot objects for colliding timeslots in the database Returns a list of conflicts containing dicts of projected timeslots, collisions and solutions """ conflicts: Conflicts = {} projected = [] solutions = {} for timeslot in timeslots: collisions = [] solution_choices = set() colliding_timeslots = list(get_colliding_timeslots(timeslot).order_by("start")) projected_entry = { "hash": timeslot.hash, "start": str(timeslot.start), "end": str(timeslot.end), } for existing in colliding_timeslots: collision = { "timeslot_id": existing.id, "start": str(existing.start), "end": str(existing.end), "playlist_id": existing.playlist_id, "show_id": existing.schedule.show.id, "show_name": existing.schedule.show.name, "schedule_id": existing.schedule_id, "memo": existing.memo, } try: note = Note.objects.get(timeslot=existing.id) collision["note_id"] = note.pk except ObjectDoesNotExist: collision["note_id"] = None collisions.append(collision) if len(colliding_timeslots) > 1: # If there is more than one collision: Only these two are supported at the moment solution_choices.add("theirs") solution_choices.add("ours") else: # These two are always possible: Either keep theirs and remove ours or vice versa solution_choices.add("theirs") solution_choices.add("ours") # Partly overlapping: timeslot start before existing start, end before existing end # # ex. pr. # +--+ # | | # +--+ | | # | | +--+ # | | # +--+ # if timeslot.start < existing.start and timeslot.end < existing.end: solution_choices.add("theirs-end") solution_choices.add("ours-end") # Partly overlapping: timeslot start after existing start, end after existing ends # # ex. pr. # +--+ # | | # | | +--+ # +--+ | | # | | # +--+ # if timeslot.start > existing.start and timeslot.end > existing.end: solution_choices.add("theirs-start") solution_choices.add("ours-start") # Fully overlapping: timeslot start before existing start, end after existing end # # ex. pr. # +--+ # +--+ | | # | | | | # +--+ | | # +--+ # if timeslot.start < existing.start and timeslot.end > existing.end: solution_choices.add("theirs-end") solution_choices.add("theirs-start") solution_choices.add("theirs-both") # Fully overlapping: timeslot start after existing start, end before existing end # # ex. pr. # +--+ # | | +--+ # | | | | # | | +--+ # +--+ # if timeslot.start > existing.start and timeslot.end < existing.end: solution_choices.add("ours-end") solution_choices.add("ours-start") solution_choices.add("ours-both") if len(collisions) > 0: solutions[timeslot.hash] = "" projected_entry["collisions"] = collisions projected_entry["solution_choices"] = solution_choices projected_entry["error"] = None projected.append(projected_entry) conflicts["projected"] = projected conflicts["solutions"] = solutions conflicts["notes"] = {} conflicts["playlists"] = {} return conflicts def generate_program_entries( queryset: QuerySet[TimeSlot], *, start: datetime | None, end: datetime | None, include_virtual: bool, ) -> Iterator[ProgramEntry]: """Gets list of timerange entries between the given `timerange_start` and `timerange_end`. Include virtual timerange entries if requested.""" def create_entry(start: datetime, end: datetime, show: Show, timeslot: TimeSlot | None = None): return ProgramEntry( id=f"{start.isoformat()}...{end.isoformat()}", start=start, end=end, timeslot=timeslot, show=show, ) def create_timeslot_entry(timeslot: TimeSlot): return create_entry(timeslot.start, timeslot.end, timeslot.schedule.show, timeslot) if start is None: start = timezone.now() if end is None: end = start + timedelta(days=1) queryset = queryset.filter(start__gte=start, start__lt=end) if not include_virtual: yield from (create_timeslot_entry(timeslot) for timeslot in queryset) return radio_settings: RadioSettings | None = RadioSettings.objects.first() fallback_show = radio_settings.fallback_show if radio_settings is not None else None if fallback_show is None: raise ValueError("Radio settings must set fallback show if include_virtual is True.") entry_start = start timeslot: TimeSlot for timeslot in queryset: if timeslot.start > entry_start: yield create_entry(entry_start, timeslot.start, fallback_show) yield create_timeslot_entry(timeslot) entry_start = timeslot.end if entry_start != end: yield create_entry(entry_start, end, fallback_show)