Skip to content
Snippets Groups Projects
Verified Commit 2b5537b3 authored by Ernesto Rico Schmidt's avatar Ernesto Rico Schmidt
Browse files

Add type annotations

parent 7b30f5e7
No related branches found
No related tags found
No related merge requests found
Pipeline #3102 passed
......@@ -41,6 +41,7 @@ class ScheduleData(TypedDict):
add_days_no: int | None
by_weekday: int | None
default_playlist_id: int | None
dryrun: bool | None
end_time: str
first_date: str
id: int | None
......@@ -79,8 +80,14 @@ class Conflicts(TypedDict):
solutions: dict[str, str]
# TODO: add type annotations
def resolve_conflicts(data, schedule_pk, show_pk):
class ScheduleCreateUpdateData(TypedDict):
notes: dict
playlists: dict
schedule: ScheduleData
solutions: dict[str, str]
def resolve_conflicts(data: ScheduleCreateUpdateData, schedule_pk: int | None, show_pk: int):
"""
Resolves conflicts
Expects JSON POST/PUT data from /shows/1/schedules/
......@@ -89,21 +96,20 @@ def resolve_conflicts(data, schedule_pk, show_pk):
Returns an empty list if resolution was successful
"""
sdl = data["schedule"]
solutions = data.get("solutions", [])
schedule = data["schedule"]
solutions = data.get("solutions", []) # only needed if conflicts exist
# Regenerate conflicts
schedule = instantiate_upcoming_schedule(sdl, show_pk, schedule_pk)
show = schedule.show
conflicts = make_conflicts(sdl, schedule_pk, show_pk)
new_schedule = instantiate_upcoming_schedule(schedule, show_pk, schedule_pk)
show = new_schedule.show
conflicts = make_conflicts(schedule, schedule_pk, show_pk)
if schedule.rrule.freq > 0 and schedule.first_date == schedule.last_date:
if new_schedule.rrule.freq > 0 and new_schedule.first_date == new_schedule.last_date:
raise ValidationError(
_("Start and end dates must not be the same."),
code="no-same-day-start-and-end",
)
if schedule.last_date < schedule.first_date:
if new_schedule.last_date < new_schedule.first_date:
raise ValidationError(
_("End date mustn't be before start."),
code="no-start-after-end",
......@@ -118,177 +124,159 @@ def resolve_conflicts(data, schedule_pk, show_pk):
conflicts=conflicts,
)
# Projected timeslots to create
create = []
to_create: list[TimeSlot] = []
to_update: list[TimeSlot] = []
to_delete: list[TimeSlot] = []
# Existing timeslots to update
update = []
# Existing timeslots to delete
delete = []
# Error messages
errors = {}
for ts in conflicts["projected"]:
# If no solution necessary
#
# - Create the projected timeslot and skip
#
if "solution_choices" not in ts or len(ts["collisions"]) < 1:
projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show)
create.append(projected_ts)
for timeslot in conflicts["projected"]:
# If no solution necessary: Create the projected timeslot and skip
if "solution_choices" not in timeslot or len(timeslot["collisions"]) == 0:
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], timeslot["end"], new_schedule, show
),
)
continue
# Check hash (if start, end, rrule or by_weekday changed)
if not ts["hash"] in solutions:
errors[ts["hash"]] = _("This change on the timeslot is not allowed.")
if not timeslot["hash"] in solutions:
errors[timeslot["hash"]] = _("This change on the timeslot is not allowed.")
continue
# If no resolution given
#
# - Skip
#
if solutions[ts["hash"]] == "":
errors[ts["hash"]] = _("No solution given.")
# If no resolution given: skip
if solutions[timeslot["hash"]] == "":
errors[timeslot["hash"]] = _("No solution given.")
continue
# If resolution is not accepted for this conflict
#
# - Skip
#
if not solutions[ts["hash"]] in ts["solution_choices"]:
errors[ts["hash"]] = _("Given solution is not accepted for this conflict.")
# If resolution is not accepted for this conflict: SKIP
if not solutions[timeslot["hash"]] in timeslot["solution_choices"]:
errors[timeslot["hash"]] = _("Given solution is not accepted for this conflict.")
continue
"""Conflict resolution"""
existing = ts["collisions"][0]
solution = solutions[ts["hash"]]
existing = timeslot["collisions"][0]
solution = solutions[timeslot["hash"]]
# theirs
#
# - Discard the projected timeslot
# - Keep the existing collision(s)
#
if solution == "theirs":
# - Discard the projected timeslot
# - Keep the existing collision(s)
continue
# ours
#
# - Create the projected timeslot
# - Delete the existing collision(s)
#
if solution == "ours":
projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show)
create.append(projected_ts)
# - Create the projected timeslot
# - Delete the existing collision(s)
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], timeslot["end"], new_schedule, show
),
)
# Delete collision(s)
for ex in ts["collisions"]:
for collision in timeslot["collisions"]:
try:
existing_ts = TimeSlot.objects.get(pk=ex["id"])
delete.append(existing_ts)
to_delete.append(TimeSlot.objects.get(pk=collision["id"]))
except ObjectDoesNotExist:
pass
# theirs-end
#
# - Keep the existing timeslot
# - Create projected with end of existing start
#
if solution == "theirs-end":
projected_ts = TimeSlot.objects.instantiate(
ts["start"], existing["start"], schedule, show
# - Keep the existing timeslot
# - Create projected with end of existing start
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], existing["start"], new_schedule, show
),
)
create.append(projected_ts)
# ours-end
#
# - Create the projected timeslot
# - Change the start of the existing collision to projected end
#
if solution == "ours-end":
projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show)
create.append(projected_ts)
# - Create the projected timeslot
# - Change the start of the existing collision to projected end
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], timeslot["end"], new_schedule, show
),
)
existing_ts = TimeSlot.objects.get(pk=existing["id"])
existing_ts.start = parse_datetime(ts["end"])
update.append(existing_ts)
# theirs-start
#
# - Keep existing
# - Create projected with start time of existing end
#
existing_ts.start = parse_datetime(timeslot["end"])
to_update.append(existing_ts)
if solution == "theirs-start":
projected_ts = TimeSlot.objects.instantiate(existing["end"], ts["end"], schedule, show)
create.append(projected_ts)
# ours-start
#
# - Create the projected timeslot
# - Change end of existing to projected start
#
# - Keep existing
# - Create projected with start time of existing end
to_create.append(
TimeSlot.objects.instantiate(existing["end"], timeslot["end"], new_schedule, show),
)
if solution == "ours-start":
projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show)
create.append(projected_ts)
# - Create the projected timeslot
# - Change end of existing to projected start
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], timeslot["end"], new_schedule, show
),
)
existing_ts = TimeSlot.objects.get(pk=existing["id"])
existing_ts.end = parse_datetime(ts["start"])
update.append(existing_ts)
# theirs-both
#
# - Keep existing
# - Create two projected timeslots with end of existing start and start of existing
# end
#
existing_ts.end = parse_datetime(timeslot["start"])
to_update.append(existing_ts)
if solution == "theirs-both":
projected_ts = TimeSlot.objects.instantiate(
ts["start"], existing["start"], schedule, show
# - Keep existing
# - Create two projected timeslots with end of existing start and start of existing end
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], existing["start"], new_schedule, show
),
)
create.append(projected_ts)
projected_ts = TimeSlot.objects.instantiate(existing["end"], ts["end"], schedule, show)
create.append(projected_ts)
# ours-both
#
# - Create projected
# - Split existing into two:
# - Set existing end time to projected start
# - Create another one with start = projected end and end = existing end
#
to_create.append(
TimeSlot.objects.instantiate(existing["end"], timeslot["end"], new_schedule, show),
)
if solution == "ours-both":
projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show)
create.append(projected_ts)
# - Create projected
# - Split existing into two
# - Set existing end time to projected start
# - Create another one with start = projected end and end = existing end
to_create.append(
TimeSlot.objects.instantiate(
timeslot["start"], timeslot["end"], new_schedule, show
),
)
existing_ts = TimeSlot.objects.get(pk=existing["id"])
existing_ts.end = parse_datetime(ts["start"])
update.append(existing_ts)
existing_ts.end = parse_datetime(timeslot["start"])
to_update.append(existing_ts)
projected_ts = TimeSlot.objects.instantiate(ts["end"], existing["end"], schedule, show)
create.append(projected_ts)
to_create.append(
TimeSlot.objects.instantiate(timeslot["end"], existing["end"], new_schedule, show),
)
# If there were any errors, don't make any db changes yet
# but add error messages and return already chosen solutions
if len(errors) > 0:
conflicts = make_conflicts(model_to_dict(schedule), schedule.pk, show.pk)
conflicts = make_conflicts(model_to_dict(new_schedule), new_schedule.pk, show.pk)
partly_resolved = conflicts["projected"]
saved_solutions = {}
# Add already chosen resolutions and error message to conflict
for index, c in enumerate(conflicts["projected"]):
for index, projected_entry in enumerate(conflicts["projected"]):
# The element should only exist if there was a collision
if len(c["collisions"]) > 0:
saved_solutions[c["hash"]] = ""
if len(projected_entry["collisions"]) > 0:
saved_solutions[projected_entry["hash"]] = ""
if c["hash"] in solutions and solutions[c["hash"]] in c["solution_choices"]:
saved_solutions[c["hash"]] = solutions[c["hash"]]
if (
projected_entry["hash"] in solutions
and solutions[projected_entry["hash"]] in projected_entry["solution_choices"]
):
saved_solutions[projected_entry["hash"]] = solutions[projected_entry["hash"]]
if c["hash"] in errors:
partly_resolved[index]["error"] = errors[c["hash"]]
if projected_entry["hash"] in errors:
partly_resolved[index]["error"] = errors[projected_entry["hash"]]
# Re-insert post data
conflicts["projected"] = partly_resolved
......@@ -302,61 +290,55 @@ def resolve_conflicts(data, schedule_pk, show_pk):
conflicts=conflicts,
)
# Collect upcoming timeslots to delete which might still remain
del_timeslots = TimeSlot.objects.filter(
schedule=schedule,
start__gt=timezone.make_aware(datetime.combine(schedule.last_date, time(0, 0))),
remaining_timeslots = TimeSlot.objects.filter(
schedule=new_schedule,
start__gt=timezone.make_aware(datetime.combine(new_schedule.last_date, time(0, 0))),
)
for del_ts in del_timeslots:
delete.append(del_ts)
for timeslot in remaining_timeslots:
to_delete.append(timeslot)
# If 'dryrun' is true, just return the projected changes instead of executing them
if "dryrun" in sdl and sdl["dryrun"]:
if "dryrun" in schedule and schedule["dryrun"]:
return {
"create": [model_to_dict(ts) for ts in create],
"update": [model_to_dict(ts) for ts in update],
"delete": [model_to_dict(ts) for ts in delete],
"create": [model_to_dict(timeslot) for timeslot in to_create],
"update": [model_to_dict(timeslot) for timeslot in to_update],
"delete": [model_to_dict(timeslot) for timeslot in to_delete],
}
"""Database changes if no errors found"""
# Database changes if no errors found
# Only save schedule if timeslots were created
if create:
# Create or update schedule
schedule.save()
if to_create:
new_schedule.save()
# Update timeslots
for ts in update:
ts.save(update_fields=["start", "end"])
for timeslot in to_update:
timeslot.save(update_fields=["start", "end"])
# Create timeslots
for ts in create:
ts.schedule = schedule
for timeslot in to_create:
timeslot.schedule = new_schedule
# Reassign playlists
if "playlists" in data and ts.hash in data["playlists"]:
ts.playlist_id = int(data["playlists"][ts.hash])
if "playlists" in data and timeslot.hash in data["playlists"]:
timeslot.playlist_id = int(data["playlists"][timeslot.hash])
ts.save()
timeslot.save()
# Reassign notes
if "notes" in data and ts.hash in data["notes"]:
if "notes" in data and timeslot.hash in data["notes"]:
try:
note = Note.objects.get(pk=int(data["notes"][ts.hash]))
note.timeslot_id = ts.id
note = Note.objects.get(pk=int(data["notes"][timeslot.hash]))
note.timeslot_id = timeslot.id
note.save(update_fields=["timeslot_id"])
timeslot = TimeSlot.objects.get(pk=ts.id)
timeslot = TimeSlot.objects.get(pk=timeslot.id)
timeslot.note_id = note.id
timeslot.save(update_fields=["note_id"])
except ObjectDoesNotExist:
pass
# Delete manually resolved timeslots and those after until
for dl in delete:
dl.delete()
for timeslot in to_delete:
timeslot.delete()
return model_to_dict(schedule)
return model_to_dict(new_schedule)
def instantiate_upcoming_schedule(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment