-
Ernesto Rico Schmidt authoredErnesto Rico Schmidt authored
services.py 26.89 KiB
#
# steering, Programme/schedule management for AURA
#
# Copyright (C) 2017, Ingo Leindecker
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from datetime import datetime, time, timedelta
from typing import TypedDict
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrule
from rest_framework.exceptions import ValidationError
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q, QuerySet
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from program.models import Note, RRule, Schedule, ScheduleConflictError, Show, TimeSlot
from program.serializers import ScheduleSerializer, TimeSlotSerializer
from program.utils import parse_date, parse_datetime, parse_time
class ScheduleData(TypedDict):
add_business_days_only: bool | None
add_days_no: int | None
by_weekday: int | None
default_playlist_id: int | None
dryrun: bool | None
end_time: str
first_date: str
id: int | None
is_repetition: bool | None
last_date: str | None
rrule_id: int
show_id: int | None
start_time: str
class Collision(TypedDict):
end: str
timeslot_id: int
memo: str
note_id: int | None
playlist_id: int | None
schedule_id: int
show_id: int
show_name: str
start: str
class ProjectedEntry(TypedDict):
collisions: list[Collision]
end: str
error: str | None
hash: str
solution_choices: set[str]
start: str
class Conflicts(TypedDict):
notes: dict
playlists: dict
projected: list[ProjectedEntry]
solutions: dict[str, str]
class ScheduleCreateUpdateData(TypedDict):
notes: dict
playlists: dict
schedule: ScheduleData
solutions: dict[str, str]
def create_timeslot(start: str, end: str, schedule: Schedule) -> TimeSlot:
"""Creates and returns an unsaved timeslot with the given `start`, `end` and `schedule`."""
return TimeSlot(
start=parse_datetime(start),
end=parse_datetime(end),
schedule=schedule,
)
def resolve_conflicts(data: ScheduleCreateUpdateData, schedule_pk: int | None, show_pk: int):
"""
Resolves conflicts
Expects JSON POST/PUT data from /shows/1/schedules/
Returns a list of dicts if errors were found
Returns an empty list if resolution was successful
"""
schedule = data["schedule"]
solutions = data.get("solutions", []) # only needed if conflicts exist
new_schedule = instantiate_upcoming_schedule(schedule, show_pk, schedule_pk)
last_date_is_unknown = new_schedule.last_date is None # we need to keep track of this
# FIXME: refactor this to eliminate the duplication
if last_date_is_unknown:
if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR:
year = timezone.now().year
new_schedule.last_date = timezone.datetime(year, 12, 31).date()
else:
new_schedule.last_date = new_schedule.first_date + timedelta(
days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE
)
show = new_schedule.show
conflicts = make_conflicts(schedule, schedule_pk, show_pk)
if new_schedule.rrule.freq > 0 and new_schedule.first_date == new_schedule.last_date:
raise ValidationError(
_("Start and end dates must not be the same."),
code="no-same-day-start-and-end",
)
if new_schedule.last_date < new_schedule.first_date:
raise ValidationError(
_("End date mustn't be before start."),
code="no-start-after-end",
)
num_conflicts = len([pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0])
if len(solutions) != num_conflicts:
raise ScheduleConflictError(
_("Numbers of conflicts and solutions don't match."),
code="one-solution-per-conflict",
conflicts=conflicts,
)
to_create: list[TimeSlot] = []
to_update: list[TimeSlot] = []
to_delete: list[TimeSlot] = []
errors = {}
for timeslot in conflicts["projected"]:
# If no solution necessary: Create the projected timeslot and skip
if "solution_choices" not in timeslot or len(timeslot["collisions"]) == 0:
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
continue
# Check hash (if start, end, rrule or by_weekday changed)
if not timeslot["hash"] in solutions:
errors[timeslot["hash"]] = _("This change on the timeslot is not allowed.")
continue
# If no resolution given: skip
if solutions[timeslot["hash"]] == "":
errors[timeslot["hash"]] = _("No solution given.")
continue
# If resolution is not accepted for this conflict: SKIP
if not solutions[timeslot["hash"]] in timeslot["solution_choices"]:
errors[timeslot["hash"]] = _("Given solution is not accepted for this conflict.")
continue
"""Conflict resolution"""
existing = timeslot["collisions"][0]
solution = solutions[timeslot["hash"]]
if solution == "theirs":
# - Discard the projected timeslot
# - Keep the existing collision(s)
continue
if solution == "ours":
# - Create the projected timeslot
# - Delete the existing collision(s)
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
# Delete collision(s)
for collision in timeslot["collisions"]:
try:
to_delete.append(TimeSlot.objects.get(pk=collision["timeslot_id"]))
except ObjectDoesNotExist:
pass
if solution == "theirs-end":
# - Keep the existing timeslot
# - Create projected with end of existing start
to_create.append(
create_timeslot(timeslot["start"], existing["start"], new_schedule),
)
if solution == "ours-end":
# - Create the projected timeslot
# - Change the start of the existing collision to projected end
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
existing_ts.start = parse_datetime(timeslot["end"])
to_update.append(existing_ts)
if solution == "theirs-start":
# - Keep existing
# - Create projected with start time of existing end
to_create.append(
create_timeslot(existing["end"], timeslot["end"], new_schedule),
)
if solution == "ours-start":
# - Create the projected timeslot
# - Change end of existing to projected start
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
existing_ts.end = parse_datetime(timeslot["start"])
to_update.append(existing_ts)
if solution == "theirs-both":
# - Keep existing
# - Create two projected timeslots with end of existing start and start of existing end
to_create.append(
create_timeslot(timeslot["start"], existing["start"], new_schedule),
)
to_create.append(
create_timeslot(existing["end"], timeslot["end"], new_schedule),
)
if solution == "ours-both":
# - Create projected
# - Split existing into two
# - Set existing end time to projected start
# - Create another one with start = projected end and end = existing end
to_create.append(
create_timeslot(timeslot["start"], timeslot["end"], new_schedule),
)
existing_ts = TimeSlot.objects.get(pk=existing["timeslot_id"])
existing_ts.end = parse_datetime(timeslot["start"])
to_update.append(existing_ts)
to_create.append(
create_timeslot(timeslot["end"], existing["end"], new_schedule),
)
# If there were any errors, don't make any db changes yet
# but add error messages and return already chosen solutions
if len(errors) > 0:
conflicts = make_conflicts(
ScheduleSerializer().to_representation(new_schedule), new_schedule.pk, show.pk
)
partly_resolved = conflicts["projected"]
saved_solutions = {}
# Add already chosen resolutions and error message to conflict
for index, projected_entry in enumerate(conflicts["projected"]):
# The element should only exist if there was a collision
if len(projected_entry["collisions"]) > 0:
saved_solutions[projected_entry["hash"]] = ""
if (
projected_entry["hash"] in solutions
and solutions[projected_entry["hash"]] in projected_entry["solution_choices"]
):
saved_solutions[projected_entry["hash"]] = solutions[projected_entry["hash"]]
if projected_entry["hash"] in errors:
partly_resolved[index]["error"] = errors[projected_entry["hash"]]
# Re-insert post data
conflicts["projected"] = partly_resolved
conflicts["solutions"] = saved_solutions
conflicts["notes"] = data.get("notes")
conflicts["playlists"] = data.get("playlists")
raise ScheduleConflictError(
_("Not all conflicts have been resolved."),
code="unresolved-conflicts",
conflicts=conflicts,
)
remaining_timeslots = TimeSlot.objects.filter(
schedule=new_schedule,
start__gt=timezone.make_aware(datetime.combine(new_schedule.last_date, time(0, 0))),
)
for timeslot in remaining_timeslots:
to_delete.append(timeslot)
# If 'dryrun' is true, just return the projected changes instead of executing them
if "dryrun" in schedule and schedule["dryrun"]:
return {
"create": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_create],
"update": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_update],
"delete": [TimeSlotSerializer().to_representation(timeslot) for timeslot in to_delete],
}
# Database changes if no errors found
if last_date_is_unknown:
new_schedule.last_date = None
if to_create:
new_schedule.save()
for timeslot in to_update:
timeslot.save(update_fields=["start", "end"])
for timeslot in to_create:
timeslot.schedule = new_schedule
# Reassign playlists
if "playlists" in data and timeslot.hash in data["playlists"]:
timeslot.playlist_id = int(data["playlists"][timeslot.hash])
timeslot.save()
# Reassign notes
if "notes" in data and timeslot.hash in data["notes"]:
try:
note = Note.objects.get(pk=int(data["notes"][timeslot.hash]))
note.timeslot_id = timeslot.id
note.save(update_fields=["timeslot_id"])
timeslot = TimeSlot.objects.get(pk=timeslot.id)
timeslot.note_id = note.id
timeslot.save(update_fields=["note_id"])
except ObjectDoesNotExist:
pass
for timeslot in to_delete:
timeslot.delete()
return ScheduleSerializer().to_representation(new_schedule)
def instantiate_upcoming_schedule(
data: ScheduleData, show_pk: int, pk: int | None = None
) -> Schedule:
"""
Returns an upcoming schedule instance for conflict resolution.
If the data does not contain a last_date, the Schedule instance will not contain a last_date.
"""
rrule = RRule.objects.get(pk=data["rrule_id"])
show = Show.objects.get(pk=show_pk)
is_repetition = data["is_repetition"] if "is_repetition" in data else False
# default is `False`
add_business_days_only = (
data["add_business_days_only"] if "add_business_days_only" in data else False
)
# default is `None`
add_days_no = data.get("add_days_no")
by_weekday = data.get("by_weekday")
default_playlist_id = data.get("default_playlist_id")
first_date = parse_date(data["first_date"])
start_time = parse_time(data["start_time"])
end_time = parse_time(data["end_time"])
# last_date may not be present in data
last_date = parse_date(data["last_date"]) if data.get("last_date") is not None else None
return Schedule(
pk=pk,
by_weekday=by_weekday,
rrule=rrule,
first_date=first_date,
start_time=start_time,
end_time=end_time,
last_date=last_date,
is_repetition=is_repetition,
default_playlist_id=default_playlist_id,
show=show,
add_days_no=add_days_no,
add_business_days_only=add_business_days_only,
)
def make_conflicts(data: ScheduleData, schedule_pk: int | None, show_pk: int) -> Conflicts:
"""
Retrieves POST vars
Generates a schedule
Generates conflicts: Returns timeslots, collisions, solutions as JSON
Returns conflicts dict
"""
# Generate schedule to be saved
new_schedule = instantiate_upcoming_schedule(data, show_pk, schedule_pk)
# Copy if first_date changes for generating timeslots
schedule_copy = new_schedule
# Generate timeslots
# If extending: Get last timeslot and start generating from that date on
if schedule_pk is not None:
existing_schedule = Schedule.objects.get(pk=schedule_pk)
if (
new_schedule.last_date
and existing_schedule.last_date
and new_schedule.last_date > existing_schedule.last_date
):
last_timeslot = (
TimeSlot.objects.filter(schedule=existing_schedule).order_by("start").reverse()[0]
)
schedule_copy.first_date = last_timeslot.start.date() + timedelta(days=1)
timeslots = generate_timeslots(schedule_copy)
# Generate conflicts and add schedule
conflicts = generate_conflicts(timeslots)
# create a new dictionary by adding "schedule" to conflicts
return dict(conflicts, schedule=ScheduleSerializer().to_representation(new_schedule))
def generate_timeslots(schedule: Schedule) -> list[TimeSlot]:
"""
Returns a list of timeslot objects based on a schedule and its rrule
Returns past timeslots as well, starting from first_date (not today)
"""
timeslots = []
if schedule.rrule.freq == 3: # daily: Ignore schedule.by_weekday to set by_weekday
by_weekday_start = by_weekday_end = (0, 1, 2, 3, 4, 5, 6)
elif (
schedule.rrule.freq == 2
and schedule.rrule.interval == 1
and schedule.rrule.by_weekdays is None
): # weekly: Use schedule.by_weekday for by_weekday
by_weekday_start = by_weekday_end = (
int(schedule.by_weekday) if schedule.by_weekday is not None else None
)
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (
by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0
)
elif (
schedule.rrule.freq == 2
and schedule.rrule.interval == 1
and schedule.rrule.by_weekdays == "0,1,2,3,4"
): # weekly on business days: Use schedule.rrule.by_weekdays to set by_weekday
by_weekday_start = by_weekday_end = [
int(wd) for wd in schedule.rrule.by_weekdays.split(",")
]
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (1, 2, 3, 4, 5)
elif (
schedule.rrule.freq == 2
and schedule.rrule.interval == 1
and schedule.rrule.by_weekdays == "5,6"
): # weekly on weekends: Use schedule.rrule.by_weekdays to set by_weekday
by_weekday_start = by_weekday_end = [
int(wd) for wd in schedule.rrule.by_weekdays.split(",")
]
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (6, 0)
elif schedule.rrule.freq == 0: # once: Ignore schedule.by_weekday to set by_weekday
by_weekday_start = by_weekday_end = None
else:
by_weekday_start = by_weekday_end = (
int(schedule.by_weekday) if schedule.by_weekday is not None else None
)
# adjust by_weekday_end if end_time is after midnight
if schedule.end_time < schedule.start_time:
by_weekday_end = (
by_weekday_start + 1 if by_weekday_start and by_weekday_start < 6 else 0
)
# adjust last_date if end_time is after midnight
if schedule.end_time < schedule.start_time:
last_date = schedule.first_date + timedelta(days=+1)
else:
last_date = schedule.first_date
# FIXME: refactor this to eliminate the duplication
if schedule.last_date is None:
if settings.AUTO_SET_LAST_DATE_TO_END_OF_YEAR:
year = timezone.now().year
until = timezone.datetime(year, 12, 31).date()
else:
until = schedule.first_date + timedelta(
days=+settings.AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE
)
else:
until = schedule.last_date
if schedule.rrule.freq == 0: # once:
starts = [datetime.combine(schedule.first_date, schedule.start_time)]
ends = [datetime.combine(last_date, schedule.end_time)]
else:
starts = list(
rrule(
freq=schedule.rrule.freq,
dtstart=datetime.combine(schedule.first_date, schedule.start_time),
interval=schedule.rrule.interval,
until=until + relativedelta(days=+1),
bysetpos=schedule.rrule.by_set_pos,
byweekday=by_weekday_start,
)
)
ends = list(
rrule(
freq=schedule.rrule.freq,
dtstart=datetime.combine(last_date, schedule.end_time),
interval=schedule.rrule.interval,
until=until + relativedelta(days=+1),
bysetpos=schedule.rrule.by_set_pos,
byweekday=by_weekday_end,
)
)
for k in range(min(len(starts), len(ends))):
# Correct dates for the (relatively seldom) case if:
# E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00
# produces wrong end dates if the 1st Tuesday is before the 1st Monday
# In this case we take the next day instead of rrule's calculated end
if starts[k] > ends[k]:
ends[k] = datetime.combine(starts[k] + relativedelta(days=+1), schedule.end_time)
"""
Add a number of days to the generated dates?
This can be helpful for repetitions:
Examples:
1. If RRule is "Every 1st Monday" and we want its repetition always to be on the
following day, the repetition's RRule is the same but add_days_no is 1
If we would set the repetition to "Every 1st Tuesday" instead
we will get unmeant results if the 1st Tuesday is before the 1st Monday
(e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018)
2. If RRule is "Every 1st Friday" and we want its repetition always to be on the
following business day, the repetition's RRule is the same but add_days_no is 1
and add_business_days_only is True (e.g. original date = Fri, March 2 2018;
generated date = Mon, March 5 2018)
In the UI these can be presets:
"On the following day" (add_days_no=1,add_business_days_only=False) or
"On the following business day" (add_days_no=1,add_business_days_only=True)
"""
if schedule.add_days_no is not None and schedule.add_days_no > 0:
# If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning
# from Sunday
weekday = datetime.date(starts[k]).weekday()
if schedule.add_business_days_only and weekday > 3:
days_until_sunday = 6 - weekday
starts[k] = starts[k] + relativedelta(
days=+days_until_sunday + schedule.add_days_no
)
ends[k] = ends[k] + relativedelta(days=+days_until_sunday + schedule.add_days_no)
else:
starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no)
ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no)
if ends[k].date() > schedule.last_date:
schedule.last_date = ends[k].date()
timeslots.append(
TimeSlot(
schedule=schedule,
start=timezone.make_aware(starts[k], is_dst=True),
end=timezone.make_aware(ends[k], is_dst=True),
)
)
return timeslots
def get_colliding_timeslots(timeslot: TimeSlot) -> QuerySet[TimeSlot]:
"""Gets a queryset of timeslot objects colliding with the given instance."""
return TimeSlot.objects.filter(
(Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end))
| (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end))
| (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end))
| (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end))
)
def generate_conflicts(timeslots: list[TimeSlot]) -> Conflicts:
"""
Tests a list of timeslot objects for colliding timeslots in the database
Returns a list of conflicts containing dicts of projected timeslots, collisions and
solutions
"""
conflicts: Conflicts = {}
projected = []
solutions = {}
# Cycle each timeslot
for ts in timeslots:
# Contains collisions
collisions = []
# Contains possible solutions
solution_choices = set()
# Get collisions for each timeslot
collision_list = list(get_colliding_timeslots(ts).order_by("start"))
# Add the projected timeslot
projected_entry = {
"hash": ts.hash,
"start": str(ts.start),
"end": str(ts.end),
}
for c in collision_list:
# Add the collision
collision = {
"timeslot_id": c.id,
"start": str(c.start),
"end": str(c.end),
"playlist_id": c.playlist_id,
"show_id": c.schedule.show.id,
"show_name": c.schedule.show.name,
"schedule_id": c.schedule_id,
"memo": c.memo,
}
# Get note
try:
note = Note.objects.get(timeslot=c.id)
collision["note_id"] = note.pk
except ObjectDoesNotExist:
collision["note_id"] = None
collisions.append(collision)
"""Determine acceptable solutions"""
if len(collision_list) > 1:
# If there is more than one collision: Only these two are supported at the
# moment
solution_choices.add("theirs")
solution_choices.add("ours")
else:
# These two are always possible: Either keep theirs and remove ours or vice
# versa
solution_choices.add("theirs")
solution_choices.add("ours")
# Partly overlapping: projected starts earlier than existing and ends earlier
#
# ex. pr.
# +--+
# | |
# +--+ | |
# | | +--+
# | |
# +--+
#
if ts.end > c.start > ts.start <= c.end:
solution_choices.add("theirs-end")
solution_choices.add("ours-end")
# Partly overlapping: projected starts later than existing and ends later
#
# ex. pr.
# +--+
# | |
# | | +--+
# +--+ | |
# | |
# +--+
#
if c.start <= ts.start < c.end < ts.end:
solution_choices.add("theirs-start")
solution_choices.add("ours-start")
# Fully overlapping: projected starts earlier and ends later than existing
#
# ex. pr.
# +--+
# +--+ | |
# | | | |
# +--+ | |
# +--+
#
if ts.start < c.start and ts.end > c.end:
solution_choices.add("theirs-end")
solution_choices.add("theirs-start")
solution_choices.add("theirs-both")
# Fully overlapping: projected starts later and ends earlier than existing
#
# ex. pr.
# +--+
# | | +--+
# | | | |
# | | +--+
# +--+
#
if ts.start > c.start and ts.end < c.end:
solution_choices.add("ours-end")
solution_choices.add("ours-start")
solution_choices.add("ours-both")
if len(collisions) > 0:
solutions[ts.hash] = ""
projected_entry["collisions"] = collisions
projected_entry["solution_choices"] = solution_choices
projected_entry["error"] = None
projected.append(projected_entry)
conflicts["projected"] = projected
conflicts["solutions"] = solutions
conflicts["notes"] = {}
conflicts["playlists"] = {}
return conflicts
def get_timerange_timeslots(start: datetime, end: datetime) -> QuerySet[TimeSlot]:
"""Gets a queryset of timeslots between the given `start` and `end` datetime."""
return TimeSlot.objects.filter(
# start before `start` and end after `start`
Q(start__lt=start, end__gt=start)
# start after/at `start`, end before/at `end`
| Q(start__gte=start, end__lte=end)
# start before `end`, end after/at `end`
| Q(start__lt=end, end__gte=end)
)