-
Konrad Mohrfeldt authoredKonrad Mohrfeldt authored
services.py 27.56 KiB
#
# steering, Programme/schedule management for AURA
#
# Copyright (C) 2017, Ingo Leindecker
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import copy
from collections.abc import Iterator
from datetime import datetime, time, timedelta
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrule
from rest_framework.exceptions import ValidationError
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q, QuerySet
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from program.models import (
Note,
ProgramEntry,
RadioSettings,
RRule,
Schedule,
ScheduleConflictError,
Show,
TimeSlot,
)
from program.serializers import ScheduleSerializer, TimeSlotSerializer
from program.typing import Conflicts, ScheduleCreateUpdateData, ScheduleData
from program.utils import parse_date, parse_datetime, parse_time
def create_timeslot(start: str, end: str, schedule: Schedule) -> TimeSlot:
"""Creates and returns an unsaved timeslot with the given `start`, `end` and `schedule`."""
return TimeSlot(
start=parse_datetime(start),
end=parse_datetime(end),
schedule=schedule,
)
def resolve_conflicts(data: ScheduleCreateUpdateData, schedule_pk: int | None, show_pk: int):
"""
Resolves conflicts
Expects JSON POST/PUT data from /shows/1/schedules/
Returns a list of dicts if errors were found
Returns an empty list if resolution was successful
"""
schedule = data["schedule"]
solutions = data.get("solutions", []) # only needed if conflicts exist
new_schedule = instantiate_upcoming_schedule(schedule, show_pk, schedule_pk)
last_date_is_unknown = new_schedule.last_date is None # we need to keep track of this
# FIXME: refactor this to eliminate the duplication
if last_date_is_unknown:
if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR:
year = timezone.now().year
new_schedule.last_date = timezone.datetime(year, 12, 31).date()
else:
new_schedule.last_date = new_schedule.first_date + timedelta(
days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE
)
show = new_schedule.show
conflicts = make_conflicts(schedule, schedule_pk, show_pk)
if new_schedule.rrule.freq > 0 and new_schedule.first_date == new_schedule.last_date:
raise ValidationError(
_("Start and end dates must not be the same."),
code="no-same-day-start-and-end",
)
if new_schedule.last_date < new_schedule.first_date:
raise ValidationError(
_("End date mustn't be before start."),
code="no-start-after-end",
)
num_conflicts = len([pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0])
if len(solutions) != num_conflicts:
raise ScheduleConflictError(
_("Numbers of conflicts and solutions don't match."),
code="one-solution-per-conflict",
conflicts=conflicts,
)
to_create: list[TimeSlot] = []
to_update: list[TimeSlot] = []
to_delete: list[TimeSlot] = []
to_instantiate: list[Schedule] = [] # only needed for the "ours-both" solution
errors = {}
for timeslot in conflicts["projected"]:
# If no solution necessary: Create the projected timeslot and skip
if "solution_choices" not in timeslot or len(timeslot["collisions"]) == 0:
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
continue
# Check hash (if start, end, rrule or by_weekday changed)
if not timeslot["hash"] in solutions:
errors[timeslot["hash"]] = _("This change on the timeslot is not allowed.")
continue
# If no resolution given: skip
if solutions[timeslot["hash"]] == "":
errors[timeslot["hash"]] = _("No solution given.")
continue
# If resolution is not accepted for this conflict: SKIP
if not solutions[timeslot["hash"]] in timeslot["solution_choices"]:
errors[timeslot["hash"]] = _("Given solution is not accepted for this conflict.")
continue
existing = timeslot["collisions"][0]
solution = solutions[timeslot["hash"]]
if solution == "theirs":
# - Discard the projected timeslot
# - Keep the existing collision(s)
continue
if solution == "ours":
# - Create the projected timeslot
# - Delete the existing collision(s)
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
# Delete collision(s)
for collision in timeslot["collisions"]:
try:
to_delete.append(TimeSlot.objects.get(pk=collision["timeslot_id"]))
except ObjectDoesNotExist:
pass
if solution == "theirs-end":
# - Keep the existing timeslot
# - Create projected with end of existing start
to_create.append(
create_timeslot(timeslot["start"], existing["start"], new_schedule),
)
if solution == "ours-end":
# - Create the projected timeslot
# - Change the start of the existing collision to projected end
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
existing_ts.start = parse_datetime(timeslot["end"])
to_update.append(existing_ts)
if solution == "theirs-start":
# - Keep existing
# - Create projected with start time of existing end
to_create.append(
create_timeslot(existing["end"], timeslot["end"], new_schedule),
)
if solution == "ours-start":
# - Create the projected timeslot
# - Change end of existing to projected start
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
existing_ts.end = parse_datetime(timeslot["start"])
to_update.append(existing_ts)
if solution == "theirs-both":
# - Keep existing
# - Create two projected timeslots with end of existing start and start of existing end
to_create.append(
create_timeslot(timeslot["start"], existing["start"], new_schedule),
)
to_create.append(
create_timeslot(existing["end"], timeslot["end"], new_schedule),
)
if solution == "ours-both":
# - Create projected
# - Split existing into two
# - Set existing end time to projected start
# - Clone the existing with start = projected end and end = existing end
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
existing_ts.end = parse_datetime(timeslot["start"])
to_update.append(existing_ts)
cloned_schedule = copy.copy(existing_ts.schedule)
cloned_schedule.pk = None
cloned_schedule.start_time = parse_datetime(timeslot["end"])
cloned_schedule.end_time = parse_datetime(existing["end"])
# keep track of the schedule to instantiate it before creating a timeslot if needed
to_instantiate.append(cloned_schedule)
to_create.append(create_timeslot(timeslot["end"], existing["end"], cloned_schedule))
# If there were any errors, don't make any db changes yet
# but add error messages and return already chosen solutions
if len(errors) > 0:
conflicts = make_conflicts(
ScheduleSerializer().to_representation(new_schedule), new_schedule.pk, show.pk
)
partly_resolved = conflicts["projected"]
saved_solutions = {}
# Add already chosen resolutions and error message to conflict
for index, projected_entry in enumerate(conflicts["projected"]):
# The element should only exist if there was a collision
if len(projected_entry["collisions"]) > 0:
saved_solutions[projected_entry["hash"]] = ""
if (
projected_entry["hash"] in solutions
and solutions[projected_entry["hash"]] in projected_entry["solution_choices"]
):
saved_solutions[projected_entry["hash"]] = solutions[projected_entry["hash"]]
if projected_entry["hash"] in errors:
partly_resolved[index]["error"] = errors[projected_entry["hash"]]
# Re-insert post data
conflicts["projected"] = partly_resolved
conflicts["solutions"] = saved_solutions
conflicts["notes"] = data.get("notes")
conflicts["playlists"] = data.get("playlists")
raise ScheduleConflictError(
_("Not all conflicts have been resolved."),
code="unresolved-conflicts",
conflicts=conflicts,
)
remaining_timeslots = TimeSlot.objects.filter(
schedule=new_schedule,
start__gt=timezone.make_aware(datetime.combine(new_schedule.last_date, time(0, 0))),
)
for timeslot in remaining_timeslots:
to_delete.append(timeslot)
# If 'dryrun' is true, just return the projected changes instead of executing them
if "dryrun" in schedule and schedule["dryrun"]:
return {
"create": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_create],
"update": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_update],
"delete": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_delete],
}
# Database changes if no errors found
if last_date_is_unknown:
new_schedule.last_date = None
for cloned_schedule in to_instantiate:
cloned_schedule.save()
if to_create:
new_schedule.save()
for timeslot in to_update:
timeslot.save(update_fields=["start", "end"])
for timeslot in to_create:
# Reassign playlists
if "playlists" in data and timeslot.hash in data["playlists"]:
timeslot.playlist_id = int(data["playlists"][timeslot.hash])
timeslot.save()
# Reassign notes
if "notes" in data and timeslot.hash in data["notes"]:
try:
note = Note.objects.get(pk=int(data["notes"][timeslot.hash]))
note.timeslot_id = timeslot.id
note.save(update_fields=["timeslot_id"])
timeslot = TimeSlot.objects.get(pk=timeslot.id)
timeslot.note_id = note.id
timeslot.save(update_fields=["note_id"])
except ObjectDoesNotExist:
pass
for timeslot in to_delete:
timeslot.delete()
return ScheduleSerializer().to_representation(new_schedule)
def instantiate_upcoming_schedule(
data: ScheduleData, show_pk: int, pk: int | None = None
) -> Schedule:
"""
Returns an upcoming schedule instance for conflict resolution.
If the data does not contain a last_date, the Schedule instance will not contain a last_date.
"""
rrule = RRule.objects.get(pk=data["rrule_id"])
show = Show.objects.get(pk=show_pk)
is_repetition = data.get("is_repetition", False)
add_business_days_only = data.get("add_business_days_only", False)
# default is `None`
add_days_no = data.get("add_days_no")
by_weekday = data.get("by_weekday")
default_playlist_id = data.get("default_playlist_id")
# required
first_date = parse_date(data["first_date"])
start_time = parse_time(data["start_time"])
end_time = parse_time(data["end_time"])
# last_date may not be present in data, parse_date returns `None` in this case
last_date = parse_date(data.get("last_date"))
return Schedule(
pk=pk,
by_weekday=by_weekday,
rrule=rrule,
first_date=first_date,
start_time=start_time,
end_time=end_time,
last_date=last_date,
is_repetition=is_repetition,
default_playlist_id=default_playlist_id,
show=show,
add_days_no=add_days_no,
add_business_days_only=add_business_days_only,
)
def make_conflicts(data: ScheduleData, schedule_pk: int | None, show_pk: int) -> Conflicts:
"""
Retrieves POST vars
Generates a schedule
Generates conflicts: Returns timeslots, collisions, solutions as JSON
Returns conflicts dict
"""
# Generate schedule to be saved
new_schedule = instantiate_upcoming_schedule(data, show_pk, schedule_pk)
# Copy if first_date changes for generating timeslots
schedule_copy = new_schedule
# Generate timeslots
# If extending: Get last timeslot and start generating from that date on
if schedule_pk is not None:
existing_schedule = Schedule.objects.get(pk=schedule_pk)
if (
new_schedule.last_date
and existing_schedule.last_date
and new_schedule.last_date > existing_schedule.last_date
):
last_timeslot = (
TimeSlot.objects.filter(schedule=existing_schedule).order_by("start").reverse()[0]
)
schedule_copy.first_date = last_timeslot.start.date() + timedelta(days=1)
timeslots = generate_timeslots(schedule_copy)
# Generate conflicts and add schedule
conflicts = generate_conflicts(timeslots)
# create a new dictionary by adding "schedule" to conflicts
return dict(conflicts, schedule=ScheduleSerializer().to_representation(new_schedule))
def generate_timeslots(schedule: Schedule) -> list[TimeSlot]:
"""
Returns a list of timeslot objects based on a schedule and its rrule
Returns past timeslots as well, starting from first_date (not today)
"""
timeslots = []
if schedule.rrule.freq == 3: # daily: Ignore schedule.by_weekday to set by_weekday
by_weekday_start = by_weekday_end = (0, 1, 2, 3, 4, 5, 6)
elif (
schedule.rrule.freq == 2
and schedule.rrule.interval == 1
and schedule.rrule.by_weekdays is None
): # weekly: Use schedule.by_weekday for by_weekday
by_weekday_start = by_weekday_end = (
int(schedule.by_weekday) if schedule.by_weekday is not None else None
)
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (
by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0
)
elif (
schedule.rrule.freq == 2
and schedule.rrule.interval == 1
and schedule.rrule.by_weekdays == "0,1,2,3,4"
): # weekly on business days: Use schedule.rrule.by_weekdays to set by_weekday
by_weekday_start = by_weekday_end = [
int(wd) for wd in schedule.rrule.by_weekdays.split(",")
]
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (1, 2, 3, 4, 5)
elif (
schedule.rrule.freq == 2
and schedule.rrule.interval == 1
and schedule.rrule.by_weekdays == "5,6"
): # weekly on weekends: Use schedule.rrule.by_weekdays to set by_weekday
by_weekday_start = by_weekday_end = [
int(wd) for wd in schedule.rrule.by_weekdays.split(",")
]
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (6, 0)
elif schedule.rrule.freq == 0: # once: Ignore schedule.by_weekday to set by_weekday
by_weekday_start = by_weekday_end = None
else:
by_weekday_start = by_weekday_end = (
int(schedule.by_weekday) if schedule.by_weekday is not None else None
)
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (
by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0
)
# adjust last_date if end_time is after midnight
if schedule.end_time < schedule.start_time:
last_date = schedule.first_date + timedelta(days=+1)
else:
last_date = schedule.first_date
# FIXME: refactor this to eliminate the duplication
if schedule.last_date is None:
if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR:
year = timezone.now().year
until = timezone.datetime(year, 12, 31).date()
else:
until = schedule.first_date + timedelta(
days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE
)
else:
until = schedule.last_date
if schedule.rrule.freq == 0: # once:
starts = [datetime.combine(schedule.first_date, schedule.start_time)]
ends = [datetime.combine(last_date, schedule.end_time)]
else:
starts = list(
rrule(
freq=schedule.rrule.freq,
dtstart=datetime.combine(schedule.first_date, schedule.start_time),
interval=schedule.rrule.interval,
until=until + relativedelta(days=+1),
bysetpos=schedule.rrule.by_set_pos,
byweekday=by_weekday_start,
)
)
ends = list(
rrule(
freq=schedule.rrule.freq,
dtstart=datetime.combine(last_date, schedule.end_time),
interval=schedule.rrule.interval,
until=until + relativedelta(days=+1),
bysetpos=schedule.rrule.by_set_pos,
byweekday=by_weekday_end,
)
)
for k in range(min(len(starts), len(ends))):
# Correct dates for the (relatively seldom) case if:
# E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00
# produces wrong end dates if the 1st Tuesday is before the 1st Monday
# In this case we take the next day instead of rrule's calculated end
if starts[k] > ends[k]:
ends[k] = datetime.combine(starts[k] + relativedelta(days=+1), schedule.end_time)
"""
Add a number of days to the generated dates?
This can be helpful for repetitions:
Examples:
1. If RRule is "Every 1st Monday" and we want its repetition always to be on the
following day, the repetition's RRule is the same but add_days_no is 1
If we would set the repetition to "Every 1st Tuesday" instead
we will get unmeant results if the 1st Tuesday is before the 1st Monday
(e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018)
2. If RRule is "Every 1st Friday" and we want its repetition always to be on the
following business day, the repetition's RRule is the same but add_days_no is 1
and add_business_days_only is True (e.g. original date = Fri, March 2 2018;
generated date = Mon, March 5 2018)
In the UI these can be presets:
"On the following day" (add_days_no=1,add_business_days_only=False) or
"On the following business day" (add_days_no=1,add_business_days_only=True)
"""
if schedule.add_days_no is not None and schedule.add_days_no > 0:
# If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning
# from Sunday
weekday = datetime.date(starts[k]).weekday()
if schedule.add_business_days_only and weekday > 3:
days_until_sunday = 6 - weekday
starts[k] = starts[k] + relativedelta(
days=+days_until_sunday + schedule.add_days_no
)
ends[k] = ends[k] + relativedelta(days=+days_until_sunday + schedule.add_days_no)
else:
starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no)
ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no)
if ends[k].date() > schedule.last_date:
schedule.last_date = ends[k].date()
timeslots.append(
TimeSlot(
schedule=schedule,
start=timezone.make_aware(starts[k], is_dst=True),
end=timezone.make_aware(ends[k], is_dst=True),
)
)
return timeslots
def get_colliding_timeslots(timeslot: TimeSlot) -> QuerySet[TimeSlot]:
"""Gets a queryset of timeslot objects colliding with the given instance."""
return TimeSlot.objects.filter(
(Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end))
| (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end))
| (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end))
| (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end))
)
def generate_conflicts(timeslots: list[TimeSlot]) -> Conflicts:
"""
Tests a list of timeslot objects for colliding timeslots in the database
Returns a list of conflicts containing dicts of projected timeslots, collisions and
solutions
"""
conflicts: Conflicts = {}
projected = []
solutions = {}
for timeslot in timeslots:
collisions = []
solution_choices = set()
colliding_timeslots = list(get_colliding_timeslots(timeslot).order_by("start"))
projected_entry = {
"hash": timeslot.hash,
"start": str(timeslot.start),
"end": str(timeslot.end),
}
for existing in colliding_timeslots:
collision = {
"timeslot_id": existing.id,
"start": str(existing.start),
"end": str(existing.end),
"playlist_id": existing.playlist_id,
"show_id": existing.schedule.show.id,
"show_name": existing.schedule.show.name,
"schedule_id": existing.schedule_id,
"memo": existing.memo,
}
try:
note = Note.objects.get(timeslot=existing.id)
collision["note_id"] = note.pk
except ObjectDoesNotExist:
collision["note_id"] = None
collisions.append(collision)
if len(colliding_timeslots) > 1:
# If there is more than one collision: Only these two are supported at the moment
solution_choices.add("theirs")
solution_choices.add("ours")
else:
# These two are always possible: Either keep theirs and remove ours or vice versa
solution_choices.add("theirs")
solution_choices.add("ours")
# Partly overlapping: timeslot start before existing start, end before existing end
#
# ex. pr.
# +--+
# | |
# +--+ | |
# | | +--+
# | |
# +--+
#
if timeslot.start < existing.start and timeslot.end < existing.end:
solution_choices.add("theirs-end")
solution_choices.add("ours-end")
# Partly overlapping: timeslot start after existing start, end after existing ends
#
# ex. pr.
# +--+
# | |
# | | +--+
# +--+ | |
# | |
# +--+
#
if timeslot.start > existing.start and timeslot.end > existing.end:
solution_choices.add("theirs-start")
solution_choices.add("ours-start")
# Fully overlapping: timeslot start before existing start, end after existing end
#
# ex. pr.
# +--+
# +--+ | |
# | | | |
# +--+ | |
# +--+
#
if timeslot.start < existing.start and timeslot.end > existing.end:
solution_choices.add("theirs-end")
solution_choices.add("theirs-start")
solution_choices.add("theirs-both")
# Fully overlapping: timeslot start after existing start, end before existing end
#
# ex. pr.
# +--+
# | | +--+
# | | | |
# | | +--+
# +--+
#
if timeslot.start > existing.start and timeslot.end < existing.end:
solution_choices.add("ours-end")
solution_choices.add("ours-start")
solution_choices.add("ours-both")
if len(collisions) > 0:
solutions[timeslot.hash] = ""
projected_entry["collisions"] = collisions
projected_entry["solution_choices"] = solution_choices
projected_entry["error"] = None
projected.append(projected_entry)
conflicts["projected"] = projected
conflicts["solutions"] = solutions
conflicts["notes"] = {}
conflicts["playlists"] = {}
return conflicts
def generate_program_entries(
queryset: QuerySet[TimeSlot],
*,
start: datetime | None,
end: datetime | None,
include_virtual: bool,
) -> Iterator[ProgramEntry]:
"""Gets list of timerange entries between the given `timerange_start` and `timerange_end`.
Include virtual timerange entries if requested."""
def create_entry(start: datetime, end: datetime, show: Show, timeslot: TimeSlot | None = None):
return ProgramEntry(
id=f"{start.isoformat()}...{end.isoformat()}",
start=start,
end=end,
timeslot=timeslot,
show=show,
)
def create_timeslot_entry(timeslot: TimeSlot):
return create_entry(timeslot.start, timeslot.end, timeslot.schedule.show, timeslot)
if start is None:
start = timezone.now()
if end is None:
end = start + timedelta(days=1)
queryset = queryset.filter(start__gte=start, start__lt=end)
if not include_virtual:
yield from (create_timeslot_entry(timeslot) for timeslot in queryset)
return
radio_settings: RadioSettings | None = RadioSettings.objects.first()
fallback_show = radio_settings.fallback_show if radio_settings is not None else None
if fallback_show is None:
raise ValueError("Radio settings must set fallback show if include_virtual is True.")
entry_start = start
timeslot: TimeSlot
for timeslot in queryset:
if timeslot.start > entry_start:
yield create_entry(entry_start, timeslot.start, fallback_show)
yield create_timeslot_entry(timeslot)
entry_start = timeslot.end
if entry_start != end:
yield create_entry(entry_start, end, fallback_show)