# # steering, Programme/schedule management for AURA # # Copyright (C) 2011-2017, 2020, Ernesto Rico Schmidt # Copyright (C) 2017-2019, Ingo Leindecker # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # from datetime import datetime, time, timedelta from dateutil.relativedelta import relativedelta from dateutil.rrule import rrule from rest_framework.exceptions import ValidationError from versatileimagefield.fields import PPOIField, VersatileImageField from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.db import models from django.db.models import Q, QuerySet from django.forms.models import model_to_dict from django.utils import timezone from django.utils.translation import gettext_lazy as _ from program.utils import parse_date, parse_datetime, parse_time from steering.settings import ( AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE, AUTO_SET_LAST_DATE_TO_END_OF_YEAR, THUMBNAIL_SIZES, ) class ScheduleConflictError(ValidationError): def __init__(self, *args, conflicts=None, **kwargs): super().__init__(*args, **kwargs) self.conflicts = conflicts class ModelWithCreatedUpdatedFields(models.Model): """Abstract model that adds: - `created_at`, a `DateTimeField` with `auto_now_add=True` - `updated_at`, a `DateTimeField` with `auto_now=True` - `created_by`, a `CharField` - `updated_by`, a `CharField` """ created_at = models.DateTimeField(auto_now_add=True) created_by = models.CharField(max_length=150) updated_at = models.DateTimeField(auto_now=True, blank=True, null=True) updated_by = models.CharField(blank=True, max_length=150, null=True) class Meta: abstract = True class Type(models.Model): name = models.CharField(max_length=32) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class Category(models.Model): name = models.CharField(max_length=32) subtitle = models.TextField(blank=True, null=True) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) description = models.TextField(blank=True) class Meta: ordering = ("name",) verbose_name_plural = "Categories" def __str__(self): return self.name class Topic(models.Model): name = models.CharField(max_length=32) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class MusicFocus(models.Model): name = models.CharField(max_length=32) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) verbose_name_plural = "Music Focus" def __str__(self): return self.name class FundingCategory(models.Model): name = models.CharField(max_length=32) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) verbose_name_plural = "Funding Categories" def __str__(self): return self.name class Language(models.Model): name = models.CharField(max_length=32) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class Image(models.Model): alt_text = models.TextField(blank=True, null=True) credits = models.TextField(blank=True, null=True) height = models.PositiveIntegerField(blank=True, null=True, editable=False) image = VersatileImageField( blank=True, height_field="height", null=True, ppoi_field="ppoi", upload_to="images", width_field="width", ) owner = models.CharField(max_length=150) ppoi = PPOIField() width = models.PositiveIntegerField(blank=True, null=True, editable=False) def save(self, *args, **kwargs): super().save(*args, **kwargs) if self.image.name and THUMBNAIL_SIZES: for size in THUMBNAIL_SIZES: self.image.thumbnail = self.image.crop[size].name def delete(self, using=None, keep_parents=False): self.image.delete_all_created_images() self.image.delete(save=False) super().delete(using, keep_parents) class Host(ModelWithCreatedUpdatedFields): biography = models.TextField(blank=True, null=True) email = models.EmailField(blank=True) image = models.ForeignKey(Image, null=True, on_delete=models.CASCADE, related_name="hosts") is_active = models.BooleanField(default=True) name = models.CharField(max_length=128) class Meta: ordering = ("name",) def __str__(self): return self.name class LinkType(models.Model): name = models.CharField(max_length=16, help_text="Name of the link type") type = models.CharField(max_length=64, help_text="Type of the link") class Meta: ordering = ("name",) def __str__(self): return self.type class Link(models.Model): type = models.CharField(max_length=64) url = models.URLField() class Meta: abstract = True def __str__(self): return self.url class HostLink(Link): host = models.ForeignKey(Host, on_delete=models.CASCADE, related_name="links") class LicenseType(models.Model): name = models.CharField(max_length=16, help_text="Name of the license type") type = models.CharField(max_length=64, help_text="Type of the license") class Meta: ordering = ("name",) def __str__(self): return self.type class Show(ModelWithCreatedUpdatedFields): category = models.ManyToManyField(Category, blank=True, related_name="shows") cba_series_id = models.IntegerField(blank=True, null=True) default_playlist_id = models.IntegerField(blank=True, null=True) description = models.TextField(blank=True, null=True) email = models.EmailField(blank=True, null=True) funding_category = models.ForeignKey( FundingCategory, null=True, on_delete=models.CASCADE, blank=True, related_name="shows", ) hosts = models.ManyToManyField(Host, blank=True, related_name="shows") image = models.ForeignKey(Image, null=True, on_delete=models.CASCADE, related_name="shows") internal_note = models.TextField(blank=True, null=True) is_active = models.BooleanField(default=True) is_public = models.BooleanField(default=False) language = models.ManyToManyField(Language, blank=True, related_name="shows") # TODO: is this really necessary? logo = models.ForeignKey( Image, blank=True, null=True, on_delete=models.CASCADE, related_name="logo_shows", ) music_focus = models.ManyToManyField(MusicFocus, blank=True, related_name="shows") name = models.CharField(max_length=255) owners = models.ManyToManyField(User, blank=True, related_name="shows") predecessor = models.ForeignKey( "self", blank=True, null=True, on_delete=models.CASCADE, related_name="successors", ) short_description = models.TextField() slug = models.CharField(max_length=255, unique=True) topic = models.ManyToManyField(Topic, blank=True, related_name="shows") type = models.ForeignKey( Type, blank=True, null=True, on_delete=models.CASCADE, related_name="shows" ) class Meta: ordering = ("slug",) def __str__(self): return self.name class ShowLink(Link): show = models.ForeignKey(Show, on_delete=models.CASCADE, related_name="links") class RRule(models.Model): name = models.CharField(max_length=32, unique=True) freq = models.IntegerField( choices=[ (0, "once"), (1, "monthly"), (2, "weekly"), (3, "daily"), ] ) interval = models.IntegerField( default=1, help_text="The interval between each freq iteration.", ) by_set_pos = models.IntegerField( blank=True, choices=[ (1, "first"), (2, "second"), (3, "third"), (4, "fourth"), (5, "fifth"), (-1, "last"), ], null=True, ) by_weekdays = models.CharField( blank=True, choices=[ (None, ""), ("0,1,2,3,4", "business days"), ("5,6", "weekends"), ], null=True, max_length=9, ) count = models.IntegerField( blank=True, null=True, help_text="How many occurrences should be generated.", ) class Meta: ordering = ("pk",) unique_together = ("freq", "interval", "by_set_pos", "by_weekdays") verbose_name = _("recurrence rule") def __str__(self): return self.name class Schedule(models.Model): rrule = models.ForeignKey( RRule, on_delete=models.CASCADE, related_name="schedules", help_text="A recurrence rule.", ) show = models.ForeignKey( Show, on_delete=models.CASCADE, related_name="schedules", help_text="Show the schedule belongs to.", ) by_weekday = models.IntegerField( help_text="Number of the Weekday.", choices=[ (0, "Monday"), (1, "Tuesday"), (2, "Wednesday"), (3, "Thursday"), (4, "Friday"), (5, "Saturday"), (6, "Sunday"), ], null=True, ) first_date = models.DateField(help_text="Start date of schedule.") start_time = models.TimeField(help_text="Start time of schedule.") end_time = models.TimeField(help_text="End time of schedule.") last_date = models.DateField(help_text="End date of schedule.") is_repetition = models.BooleanField( default=False, help_text="Whether the schedule is a repetition.", ) add_days_no = models.IntegerField( blank=True, null=True, help_text=( "Add a number of days to the generated dates. " "This can be useful for repetitions, like 'On the following day'." ), ) add_business_days_only = models.BooleanField( default=False, help_text=( "Whether to add add_days_no but skipping the weekends. " "E.g. if weekday is Friday, the date returned will be the next Monday." ), ) default_playlist_id = models.IntegerField( blank=True, null=True, help_text="A tank ID in case the timeslot's playlist_id is empty.", ) class Meta: ordering = ("first_date", "start_time") # FIXME: this does not belong here @staticmethod def instantiate_upcoming(sdl, show_pk, pk=None): """Returns an upcoming schedule instance for conflict resolution""" pk = int(pk) if pk is not None else None rrule = RRule.objects.get(pk=int(sdl["rrule"])) show = Show.objects.get(pk=int(show_pk)) is_repetition = True if sdl.get("is_repetition") is True else False default_playlist_id = ( int(sdl["default_playlist_id"]) if sdl.get("default_playlist_id") else None ) add_days_no = int(sdl["add_days_no"]) if sdl.get("add_days_no") else None add_business_days_only = True if sdl.get("add_business_days_only") is True else False first_date = parse_date(str(sdl["first_date"])) start_time = ( sdl["start_time"] + ":00" if len(str(sdl["start_time"])) == 5 else sdl["start_time"] ) end_time = sdl["end_time"] + ":00" if len(str(sdl["end_time"])) == 5 else sdl["end_time"] start_time = parse_time(str(start_time)) end_time = parse_time(str(end_time)) if sdl["last_date"]: last_date = parse_date(str(sdl["last_date"])) else: # If last_date was not set, set it to the end of the year or add x days if AUTO_SET_LAST_DATE_TO_END_OF_YEAR: year = timezone.now().year last_date = parse_date(f"{year}-12-31") else: last_date = first_date + timedelta(days=+AUTO_SET_LAST_DATE_TO_DAYS_IN_FUTURE) schedule = Schedule( pk=pk, by_weekday=sdl["by_weekday"], rrule=rrule, first_date=first_date, start_time=start_time, end_time=end_time, last_date=last_date, is_repetition=is_repetition, default_playlist_id=default_playlist_id, show=show, add_days_no=add_days_no, add_business_days_only=add_business_days_only, ) return schedule # FIXME: this does not belong here @staticmethod def generate_timeslots(schedule): """ Returns a list of timeslot objects based on a schedule and its rrule Returns past timeslots as well, starting from first_date (not today) """ timeslots = [] # adjust last_date if end_time is after midnight if schedule.end_time < schedule.start_time: last_date = schedule.first_date + timedelta(days=+1) else: last_date = schedule.first_date if schedule.rrule.freq == 3: # daily: Ignore schedule.by_weekday to set by_weekday by_weekday_start = by_weekday_end = (0, 1, 2, 3, 4, 5, 6) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays is None ): # weekly: Use schedule.by_weekday for by_weekday by_weekday_start = by_weekday_end = int(schedule.by_weekday) # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = by_weekday_start + 1 if by_weekday_start < 6 else 0 elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays == "0,1,2,3,4" ): # weekly on business days: Use schedule.rrule.by_weekdays to set by_weekday by_weekday_start = by_weekday_end = [ int(wd) for wd in schedule.rrule.by_weekdays.split(",") ] # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = (1, 2, 3, 4, 5) elif ( schedule.rrule.freq == 2 and schedule.rrule.interval == 1 and schedule.rrule.by_weekdays == "5,6" ): # weekly on weekends: Use schedule.rrule.by_weekdays to set by_weekday by_weekday_start = by_weekday_end = [ int(wd) for wd in schedule.rrule.by_weekdays.split(",") ] # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = (6, 0) elif schedule.rrule.freq == 0: # once: Ignore schedule.by_weekday to set by_weekday by_weekday_start = by_weekday_end = None else: by_weekday_start = by_weekday_end = ( int(schedule.by_weekday) if schedule.by_weekday is not None else None ) # adjust by_weekday_end if end_time is after midnight if schedule.end_time < schedule.start_time: by_weekday_end = by_weekday_start + 1 if by_weekday_start < 6 else 0 if schedule.rrule.freq == 0: # once: starts = [datetime.combine(schedule.first_date, schedule.start_time)] ends = [datetime.combine(last_date, schedule.end_time)] else: starts = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(schedule.first_date, schedule.start_time), interval=schedule.rrule.interval, until=schedule.last_date + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_start, ) ) ends = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(last_date, schedule.end_time), interval=schedule.rrule.interval, until=schedule.last_date + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_end, ) ) for k in range(min(len(starts), len(ends))): # Correct dates for the (relatively seldom) case if: # E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00 # produces wrong end dates if the 1st Tuesday is before the 1st Monday # In this case we take the next day instead of rrule's calculated end if starts[k] > ends[k]: ends[k] = datetime.combine(starts[k] + relativedelta(days=+1), schedule.end_time) """ Add a number of days to the generated dates? This can be helpful for repetitions: Examples: 1. If RRule is "Every 1st Monday" and we want its repetition always to be on the following day, the repetition's RRule is the same but add_days_no is 1 If we would set the repetition to "Every 1st Tuesday" instead we will get unmeant results if the 1st Tuesday is before the 1st Monday (e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018) 2. If RRule is "Every 1st Friday" and we want its repetition always to be on the following business day, the repetition's RRule is the same but add_days_no is 1 and add_business_days_only is True (e.g. original date = Fri, March 2 2018; generated date = Mon, March 5 2018) In the UI these can be presets: "On the following day" (add_days_no=1,add_business_days_only=False) or "On the following business day" (add_days_no=1,add_business_days_only=True) """ if schedule.add_days_no is not None and schedule.add_days_no > 0: # If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning # from Sunday weekday = datetime.date(starts[k]).weekday() if schedule.add_business_days_only and weekday > 3: days_until_sunday = 6 - weekday starts[k] = starts[k] + relativedelta( days=+days_until_sunday + schedule.add_days_no ) ends[k] = ends[k] + relativedelta( days=+days_until_sunday + schedule.add_days_no ) else: starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no) ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no) if ends[k].date() > schedule.last_date: schedule.last_date = ends[k].date() timeslots.append( TimeSlot( schedule=schedule, start=timezone.make_aware(starts[k], is_dst=True), end=timezone.make_aware(ends[k], is_dst=True), ) ) return timeslots # FIXME: this does not belong here @staticmethod def get_collisions(timeslots): """ Tests a list of timeslot objects for colliding timeslots in the database Returns a list of collisions, containing colliding timeslot IDs or None Keeps indices from input list for later comparison """ collisions = [] for ts in timeslots: collision = TimeSlot.objects.get_colliding_timeslots(ts) if collision: collisions.append(collision[0]) # TODO: Do we really always retrieve one? else: collisions.append(None) return collisions # FIXME: this does not belong here @staticmethod def generate_conflicts(timeslots): """ Tests a list of timeslot objects for colliding timeslots in the database Returns a list of conflicts containing dicts of projected timeslots, collisions and solutions """ conflicts = {} projected = [] solutions = {} # Cycle each timeslot for ts in timeslots: # Contains collisions collisions = [] # Contains possible solutions solution_choices = set() # Get collisions for each timeslot collision_list = list(TimeSlot.objects.get_colliding_timeslots(ts).order_by("start")) # Add the projected timeslot projected_entry = { "hash": ts.hash, "start": str(ts.start), "end": str(ts.end), } for c in collision_list: # Add the collision collision = { "id": c.id, "start": str(c.start), "end": str(c.end), "playlist_id": c.playlist_id, "show": c.show.id, "show_name": c.show.name, "schedule": c.schedule_id, "memo": c.memo, } # Get note try: note = Note.objects.get(timeslot=c.id) collision["note_id"] = note.pk except ObjectDoesNotExist: collision["note_id"] = None collisions.append(collision) """Determine acceptable solutions""" if len(collision_list) > 1: # If there is more than one collision: Only these two are supported at the # moment solution_choices.add("theirs") solution_choices.add("ours") else: # These two are always possible: Either keep theirs and remove ours or vice # versa solution_choices.add("theirs") solution_choices.add("ours") # Partly overlapping: projected starts earlier than existing and ends earlier # # ex. pr. # +--+ # | | # +--+ | | # | | +--+ # | | # +--+ # if ts.end > c.start > ts.start <= c.end: solution_choices.add("theirs-end") solution_choices.add("ours-end") # Partly overlapping: projected starts later than existing and ends later # # ex. pr. # +--+ # | | # | | +--+ # +--+ | | # | | # +--+ # if c.start <= ts.start < c.end < ts.end: solution_choices.add("theirs-start") solution_choices.add("ours-start") # Fully overlapping: projected starts earlier and ends later than existing # # ex. pr. # +--+ # +--+ | | # | | | | # +--+ | | # +--+ # if ts.start < c.start and ts.end > c.end: solution_choices.add("theirs-end") solution_choices.add("theirs-start") solution_choices.add("theirs-both") # Fully overlapping: projected starts later and ends earlier than existing # # ex. pr. # +--+ # | | +--+ # | | | | # | | +--+ # +--+ # if ts.start > c.start and ts.end < c.end: solution_choices.add("ours-end") solution_choices.add("ours-start") solution_choices.add("ours-both") if len(collisions) > 0: solutions[ts.hash] = "" projected_entry["collisions"] = collisions projected_entry["solution_choices"] = solution_choices projected_entry["error"] = None projected.append(projected_entry) conflicts["projected"] = projected conflicts["solutions"] = solutions conflicts["notes"] = {} conflicts["playlists"] = {} return conflicts # FIXME: this does not belong here @staticmethod def make_conflicts(sdl, schedule_pk, show_pk): """ Retrieves POST vars Generates a schedule Generates conflicts: Returns timeslots, collisions, solutions as JSON Returns conflicts dict """ # Generate schedule to be saved schedule = Schedule.instantiate_upcoming(sdl, show_pk, schedule_pk) # Copy if first_date changes for generating timeslots gen_schedule = schedule # Generate timeslots # If extending: Get last timeslot and start generating from that date on if schedule_pk is not None: existing_schedule = Schedule.objects.get(pk=int(schedule_pk)) if schedule.last_date > existing_schedule.last_date: last_timeslot = ( TimeSlot.objects.filter(schedule=existing_schedule) .order_by("start") .reverse()[0] ) gen_schedule.first_date = last_timeslot.start.date() + timedelta(days=1) timeslots = Schedule.generate_timeslots(gen_schedule) # Generate conflicts and add schedule conflicts = Schedule.generate_conflicts(timeslots) conflicts["schedule"] = model_to_dict(schedule) return conflicts # FIXME: this does not belong here @staticmethod def resolve_conflicts(data, schedule_pk, show_pk): """ Resolves conflicts Expects JSON POST/PUT data from /shows/1/schedules/ Returns a list of dicts if errors were found Returns an empty list if resolution was successful """ sdl = data["schedule"] solutions = data.get("solutions", []) # Regenerate conflicts schedule = Schedule.instantiate_upcoming(sdl, show_pk, schedule_pk) show = schedule.show conflicts = Schedule.make_conflicts(sdl, schedule_pk, show_pk) if schedule.rrule.freq > 0 and schedule.first_date == schedule.last_date: raise ValidationError( _("Start and end dates must not be the same."), code="no-same-day-start-and-end", ) if schedule.last_date < schedule.first_date: raise ValidationError( _("End date mustn't be before start."), code="no-start-after-end", ) num_conflicts = len([pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0]) if len(solutions) != num_conflicts: raise ScheduleConflictError( _("Numbers of conflicts and solutions don't match."), code="one-solution-per-conflict", conflicts=conflicts, ) # Projected timeslots to create create = [] # Existing timeslots to update update = [] # Existing timeslots to delete delete = [] # Error messages errors = {} for ts in conflicts["projected"]: # If no solution necessary # # - Create the projected timeslot and skip # if "solution_choices" not in ts or len(ts["collisions"]) < 1: projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show) create.append(projected_ts) continue # Check hash (if start, end, rrule or by_weekday changed) if not ts["hash"] in solutions: errors[ts["hash"]] = _("This change on the timeslot is not allowed.") continue # If no resolution given # # - Skip # if solutions[ts["hash"]] == "": errors[ts["hash"]] = _("No solution given.") continue # If resolution is not accepted for this conflict # # - Skip # if not solutions[ts["hash"]] in ts["solution_choices"]: errors[ts["hash"]] = _("Given solution is not accepted for this conflict.") continue """Conflict resolution""" existing = ts["collisions"][0] solution = solutions[ts["hash"]] # theirs # # - Discard the projected timeslot # - Keep the existing collision(s) # if solution == "theirs": continue # ours # # - Create the projected timeslot # - Delete the existing collision(s) # if solution == "ours": projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show) create.append(projected_ts) # Delete collision(s) for ex in ts["collisions"]: try: existing_ts = TimeSlot.objects.get(pk=ex["id"]) delete.append(existing_ts) except ObjectDoesNotExist: pass # theirs-end # # - Keep the existing timeslot # - Create projected with end of existing start # if solution == "theirs-end": projected_ts = TimeSlot.objects.instantiate( ts["start"], existing["start"], schedule, show ) create.append(projected_ts) # ours-end # # - Create the projected timeslot # - Change the start of the existing collision to projected end # if solution == "ours-end": projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show) create.append(projected_ts) existing_ts = TimeSlot.objects.get(pk=existing["id"]) existing_ts.start = parse_datetime(ts["end"]) update.append(existing_ts) # theirs-start # # - Keep existing # - Create projected with start time of existing end # if solution == "theirs-start": projected_ts = TimeSlot.objects.instantiate( existing["end"], ts["end"], schedule, show ) create.append(projected_ts) # ours-start # # - Create the projected timeslot # - Change end of existing to projected start # if solution == "ours-start": projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show) create.append(projected_ts) existing_ts = TimeSlot.objects.get(pk=existing["id"]) existing_ts.end = parse_datetime(ts["start"]) update.append(existing_ts) # theirs-both # # - Keep existing # - Create two projected timeslots with end of existing start and start of existing # end # if solution == "theirs-both": projected_ts = TimeSlot.objects.instantiate( ts["start"], existing["start"], schedule, show ) create.append(projected_ts) projected_ts = TimeSlot.objects.instantiate( existing["end"], ts["end"], schedule, show ) create.append(projected_ts) # ours-both # # - Create projected # - Split existing into two: # - Set existing end time to projected start # - Create another one with start = projected end and end = existing end # if solution == "ours-both": projected_ts = TimeSlot.objects.instantiate(ts["start"], ts["end"], schedule, show) create.append(projected_ts) existing_ts = TimeSlot.objects.get(pk=existing["id"]) existing_ts.end = parse_datetime(ts["start"]) update.append(existing_ts) projected_ts = TimeSlot.objects.instantiate( ts["end"], existing["end"], schedule, show ) create.append(projected_ts) # If there were any errors, don't make any db changes yet # but add error messages and return already chosen solutions if len(errors) > 0: conflicts = Schedule.make_conflicts(model_to_dict(schedule), schedule.pk, show.pk) partly_resolved = conflicts["projected"] saved_solutions = {} # Add already chosen resolutions and error message to conflict for index, c in enumerate(conflicts["projected"]): # The element should only exist if there was a collision if len(c["collisions"]) > 0: saved_solutions[c["hash"]] = "" if c["hash"] in solutions and solutions[c["hash"]] in c["solution_choices"]: saved_solutions[c["hash"]] = solutions[c["hash"]] if c["hash"] in errors: partly_resolved[index]["error"] = errors[c["hash"]] # Re-insert post data conflicts["projected"] = partly_resolved conflicts["solutions"] = saved_solutions conflicts["notes"] = data.get("notes") conflicts["playlists"] = data.get("playlists") raise ScheduleConflictError( _("Not all conflicts have been resolved."), code="unresolved-conflicts", conflicts=conflicts, ) # Collect upcoming timeslots to delete which might still remain del_timeslots = TimeSlot.objects.filter( schedule=schedule, start__gt=timezone.make_aware(datetime.combine(schedule.last_date, time(0, 0))), ) for del_ts in del_timeslots: delete.append(del_ts) # If 'dryrun' is true, just return the projected changes instead of executing them if "dryrun" in sdl and sdl["dryrun"]: return { "create": [model_to_dict(ts) for ts in create], "update": [model_to_dict(ts) for ts in update], "delete": [model_to_dict(ts) for ts in delete], } """Database changes if no errors found""" # Only save schedule if timeslots were created if create: # Create or update schedule schedule.save() # Update timeslots for ts in update: ts.save(update_fields=["start", "end"]) # Create timeslots for ts in create: ts.schedule = schedule # Reassign playlists if "playlists" in data and ts.hash in data["playlists"]: ts.playlist_id = int(data["playlists"][ts.hash]) ts.save() # Reassign notes if "notes" in data and ts.hash in data["notes"]: try: note = Note.objects.get(pk=int(data["notes"][ts.hash])) note.timeslot_id = ts.id note.save(update_fields=["timeslot_id"]) timeslot = TimeSlot.objects.get(pk=ts.id) timeslot.note_id = note.id timeslot.save(update_fields=["note_id"]) except ObjectDoesNotExist: pass # Delete manually resolved timeslots and those after until for dl in delete: dl.delete() return model_to_dict(schedule) class TimeSlotManager(models.Manager): @staticmethod def instantiate(start, end, schedule, show): return TimeSlot( start=parse_datetime(start), end=parse_datetime(end), show=show, schedule=schedule, ) @staticmethod def get_timerange_timeslots(start_timerange: datetime, end_timerange: datetime) -> QuerySet: """get the timeslots between start_timerange and end_timerange""" return TimeSlot.objects.filter( # start before start_timerange, end after start_timerange Q(start__lt=start_timerange, end__gt=start_timerange) # start after/at start_timerange, end before/at end_timerange | Q(start__gte=start_timerange, end__lte=end_timerange) # start before end_timerange, end after/at end_timerange | Q(start__lt=end_timerange, end__gte=end_timerange) ) @staticmethod def get_colliding_timeslots(timeslot): return TimeSlot.objects.filter( (Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end)) | (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end)) ) class TimeSlot(models.Model): schedule = models.ForeignKey(Schedule, on_delete=models.CASCADE, related_name="timeslots") show = models.ForeignKey( Show, editable=False, on_delete=models.CASCADE, related_name="timeslots" ) start = models.DateTimeField() end = models.DateTimeField() memo = models.TextField(blank=True) repetition_of = models.ForeignKey( "self", blank=True, null=True, on_delete=models.CASCADE, related_name="repetitions", ) playlist_id = models.IntegerField(null=True) note_id = models.IntegerField(null=True, editable=False) objects = TimeSlotManager() class Meta: ordering = ("start", "end") def __str__(self): if self.start.date() == self.end.date(): time_span = "{0}, {1} - {2}".format( self.start.strftime("%x"), self.start.strftime("%X"), self.end.strftime("%X"), ) else: time_span = "{0} - {1}".format( self.start.strftime("%X %x"), self.end.strftime("%X %x"), ) return f"{str(self.show)} ({time_span})" def save(self, *args, **kwargs): self.show = self.schedule.show super(TimeSlot, self).save(*args, **kwargs) return self @property def hash(self): string = ( str(self.start) + str(self.end) + str(self.schedule.rrule.id) + str(self.schedule.by_weekday) ) return str("".join(s for s in string if s.isdigit())) class Note(ModelWithCreatedUpdatedFields): contributors = models.ManyToManyField(Host, related_name="contributions") cba_id = models.IntegerField(blank=True, null=True) content = models.TextField() image = models.ForeignKey(Image, null=True, on_delete=models.CASCADE, related_name="notes") owner = models.ForeignKey( User, editable=False, on_delete=models.CASCADE, related_name="notes", default=1 ) playlist = models.TextField(blank=True, null=True) slug = models.SlugField(max_length=32, unique=True) summary = models.TextField(blank=True) tags = models.TextField(blank=True, null=True) timeslot = models.OneToOneField(TimeSlot, on_delete=models.CASCADE, unique=True) title = models.CharField(max_length=128) class Meta: ordering = ("timeslot",) def __str__(self): return self.title def save(self, *args, **kwargs): timeslot = TimeSlot.objects.get(pk=self.timeslot.id) timeslot.note_id = self.id timeslot.save() super(Note, self).save(*args, **kwargs) class NoteLink(Link): note = models.ForeignKey(Note, on_delete=models.CASCADE, related_name="links") class UserProfile(ModelWithCreatedUpdatedFields, models.Model): user = models.OneToOneField( User, on_delete=models.CASCADE, related_name="profile", editable=False ) cba_username = models.CharField("CBA Username", blank=True, max_length=60) cba_user_token = models.CharField("CBA Token", blank=True, max_length=255) def __str__(self): return self.user.username