# # steering, Programme/schedule management for AURA # # Copyright (C) 2011-2017, 2020, Ernesto Rico Schmidt # Copyright (C) 2017-2019, Ingo Leindecker # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # from datetime import datetime, time, timedelta from textwrap import dedent from dateutil.relativedelta import relativedelta from dateutil.rrule import rrule from rest_framework.exceptions import ValidationError from versatileimagefield.fields import PPOIField, VersatileImageField from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.db import models from django.db.models import Q from django.forms.models import model_to_dict from django.utils import timezone from django.utils.translation import gettext_lazy as _ from program.utils import parse_date, parse_datetime, parse_time from steering.settings import ( AUTO_SET_UNTIL_DATE_TO_DAYS_IN_FUTURE, AUTO_SET_UNTIL_DATE_TO_END_OF_YEAR, THUMBNAIL_SIZES, ) class Type(models.Model): name = models.CharField(max_length=32) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class Category(models.Model): name = models.CharField(max_length=32) abbrev = models.CharField(max_length=4, unique=True) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) description = models.TextField(blank=True) class Meta: ordering = ("name",) def __str__(self): return self.name class Topic(models.Model): name = models.CharField(max_length=32) abbrev = models.CharField(max_length=4, unique=True) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class MusicFocus(models.Model): name = models.CharField(max_length=32) abbrev = models.CharField(max_length=4, unique=True) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class FundingCategory(models.Model): name = models.CharField(max_length=32) abbrev = models.CharField(max_length=4, unique=True) slug = models.SlugField(max_length=32, unique=True) is_active = models.BooleanField(default=True) class Meta: ordering = ("name",) def __str__(self): return self.name class Language(models.Model): name = models.CharField(max_length=32) is_active = models.BooleanField(default=True) class Meta: ordering = ("language",) def __str__(self): return self.name class Host(models.Model): name = models.CharField(max_length=128) is_active = models.BooleanField(default=True) email = models.EmailField(blank=True) website = models.URLField(blank=True) biography = models.TextField(blank=True, null=True) ppoi = PPOIField() height = models.PositiveIntegerField(blank=True, null=True, editable=False) width = models.PositiveIntegerField(blank=True, null=True, editable=False) image = VersatileImageField( blank=True, null=True, upload_to="host_images", width_field="width", height_field="height", ppoi_field="ppoi", ) class Meta: ordering = ("name",) def __str__(self): return self.name def save(self, *args, **kwargs): super(Host, self).save(*args, **kwargs) # Generate thumbnails if self.image.name and THUMBNAIL_SIZES: for size in THUMBNAIL_SIZES: self.image.thumbnail = self.image.crop[size].name class Link(models.Model): description = models.CharField(max_length=16) url = models.URLField() class Meta: abstract = True def __str__(self): return self.url class HostLink(Link): host = models.ForeignKey(Host, on_delete=models.CASCADE, related_name="links") class Show(models.Model): predecessor = models.ForeignKey( "self", blank=True, null=True, on_delete=models.CASCADE, related_name="successors", ) hosts = models.ManyToManyField(Host, blank=True, related_name="shows") owners = models.ManyToManyField(User, blank=True, related_name="shows") language = models.ManyToManyField(Language, blank=True, related_name="language") type = models.ForeignKey(Type, on_delete=models.CASCADE, related_name="shows") category = models.ManyToManyField(Category, blank=True, related_name="shows") funding_category = models.ForeignKey( FundingCategory, null=True, on_delete=models.CASCADE, blank=True, related_name="shows", ) topic = models.ManyToManyField(Topic, blank=True, related_name="shows") music_focus = models.ManyToManyField(MusicFocus, blank=True, related_name="shows") name = models.CharField(max_length=255) slug = models.CharField(max_length=255, unique=True) ppoi = PPOIField() height = models.PositiveIntegerField(blank=True, null=True, editable=False) width = models.PositiveIntegerField(blank=True, null=True, editable=False) image = VersatileImageField( blank=True, null=True, upload_to="show_images", width_field="width", height_field="height", ppoi_field="ppoi", ) logo = models.ImageField(blank=True, null=True, upload_to="show_images") short_description = models.TextField() description = models.TextField(blank=True, null=True) email = models.EmailField(blank=True, null=True) website = models.URLField(blank=True, null=True) cba_series_id = models.IntegerField(blank=True, null=True) default_playlist_id = models.IntegerField(blank=True, null=True) is_active = models.BooleanField(default=True) is_public = models.BooleanField(default=False) class Meta: ordering = ("slug",) def __str__(self): return self.name class RRule(models.Model): name = models.CharField(max_length=32, unique=True) freq = models.IntegerField() interval = models.IntegerField(default=1) by_set_pos = models.IntegerField(blank=True, null=True) count = models.IntegerField(blank=True, null=True) class Meta: ordering = ("pk",) def __str__(self): return self.name class Schedule(models.Model): rrule = models.ForeignKey( RRule, on_delete=models.CASCADE, related_name="schedules", help_text=dedent( """ A recurrence rule. * 1 = once, * 2 = daily, * 3 = business days, * 4 = weekly, * 5 = biweekly, * 6 = every four weeks, * 7 = every even calendar week (ISO 8601), * 8 = every odd calendar week (ISO 8601), * 9 = every 1st week of month, * 10 = every 2nd week of month, * 11 = every 3rd week of month, * 12 = every 4th week of month, * 13 = every 5th week of month """ ), ) show = models.ForeignKey( Show, on_delete=models.CASCADE, related_name="schedules", help_text="Show the schedule belongs to.", ) by_weekday = models.IntegerField( help_text="Number of the Weekday.", choices=[ (0, "Monday"), (1, "Tuesday"), (2, "Wednesday"), (3, "Thursday"), (4, "Friday"), (5, "Saturday"), (6, "Sunday"), ], ) first_date = models.DateField(help_text="Start date of schedule.") start_time = models.TimeField(help_text="Start time of schedule.") end_time = models.TimeField(help_text="End time of schedule.") last_date = models.DateField(help_text="End date of schedule.") is_repetition = models.BooleanField( default=False, help_text="Whether the schedule is a repetition.", ) add_days_no = models.IntegerField( blank=True, null=True, help_text=( "Add a number of days to the generated dates. " "This can be useful for repetitions, like 'On the following day'." ), ) add_business_days_only = models.BooleanField( default=False, help_text=( "Whether to add add_days_no but skipping the weekends. " "E.g. if weekday is Friday, the date returned will be the next Monday." ), ) default_playlist_id = models.IntegerField( blank=True, null=True, help_text="A tank ID in case the timeslot's playlist_id is empty.", ) class Meta: ordering = ("first_date", "start_time") # FIXME: this does not belong here @staticmethod def instantiate_upcoming(sdl, show_pk, pk=None): """Returns an upcoming schedule instance for conflict resolution""" pk = int(pk) if pk is not None else None rrule = RRule.objects.get(pk=int(sdl["rrule"])) show = Show.objects.get(pk=int(show_pk)) is_repetition = True if sdl.get("is_repetition") is True else False default_playlist_id = ( int(sdl["default_playlist_id"]) if sdl.get("default_playlist_id") else None ) add_days_no = int(sdl["add_days_no"]) if sdl.get("add_days_no") else None add_business_days_only = ( True if sdl.get("add_business_days_only") is True else False ) first_date = parse_date(str(sdl["first_date"])) start_time = ( sdl["start_time"] + ":00" if len(str(sdl["start_time"])) == 5 else sdl["start_time"] ) end_time = ( sdl["end_time"] + ":00" if len(str(sdl["end_time"])) == 5 else sdl["end_time"] ) start_time = parse_time(str(start_time)) end_time = parse_time(str(end_time)) if sdl["last_date"]: last_date = parse_date(str(sdl["last_date"])) else: # If last_date was not set, set it to the end of the year or add x days if AUTO_SET_UNTIL_DATE_TO_END_OF_YEAR: year = timezone.now().year last_date = parse_date(f"{year}-12-31") else: last_date = first_date + timedelta( days=+AUTO_SET_UNTIL_DATE_TO_DAYS_IN_FUTURE ) schedule = Schedule( pk=pk, by_weekday=sdl["by_weekday"], rrule=rrule, first_date=first_date, start_time=start_time, end_time=end_time, last_date=last_date, is_repetition=is_repetition, default_playlist_id=default_playlist_id, show=show, add_days_no=add_days_no, add_business_days_only=add_business_days_only, ) return schedule # FIXME: this does not belong here @staticmethod def generate_timeslots(schedule): """ Returns a list of timeslot objects based on a schedule and its rrule Returns past timeslots as well, starting from first_date (not today) """ by_week_no = None by_week_no_end = None by_weekday_end = int(schedule.by_weekday) starts = [] ends = [] timeslots = [] # Handle ending weekday for timeslots over midnight if schedule.end_time < schedule.start_time: if schedule.by_weekday < 6: by_weekday_end = int(schedule.by_weekday + 1) else: by_weekday_end = 0 # Handle ending dates for timeslots over midnight if schedule.end_time < schedule.start_time: last_date = schedule.first_date + timedelta(days=+1) else: last_date = schedule.first_date if schedule.rrule.freq == 0: # Ignore weekdays for one-time timeslots by_weekday_start = None by_weekday_end = None elif schedule.rrule.freq == 3 and schedule.rrule.pk == 2: # Daily timeslots by_weekday_start = (0, 1, 2, 3, 4, 5, 6) by_weekday_end = (0, 1, 2, 3, 4, 5, 6) elif ( schedule.rrule.freq == 3 and schedule.rrule.pk == 3 ): # Business days MO - FR/SA by_weekday_start = (0, 1, 2, 3, 4) if schedule.end_time < schedule.start_time: # End days for over midnight by_weekday_end = (1, 2, 3, 4, 5) else: by_weekday_end = (0, 1, 2, 3, 4) elif schedule.rrule.freq == 2 and schedule.rrule.pk == 7: # Even calendar weeks by_weekday_start = int(schedule.by_weekday) by_week_no = list(range(2, 54, 2)) # Reverse ending weeks if from Sun - Mon if by_weekday_start == 6 and by_weekday_end == 0: by_week_no_end = list(range(1, 54, 2)) else: by_week_no_end = by_week_no elif schedule.rrule.freq == 2 and schedule.rrule.pk == 8: # Odd calendar weeks by_weekday_start = int(schedule.by_weekday) by_week_no = list(range(1, 54, 2)) # Reverse ending weeks if from Sun - Mon if by_weekday_start == 6 and by_weekday_end == 0: by_week_no_end = list(range(2, 54, 2)) else: by_week_no_end = by_week_no else: by_weekday_start = int(schedule.by_weekday) if schedule.rrule.freq == 0: starts.append(datetime.combine(schedule.first_date, schedule.start_time)) ends.append(datetime.combine(last_date, schedule.end_time)) else: starts = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(schedule.first_date, schedule.start_time), interval=schedule.rrule.interval, until=schedule.last_date + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_start, byweekno=by_week_no, ) ) ends = list( rrule( freq=schedule.rrule.freq, dtstart=datetime.combine(last_date, schedule.end_time), interval=schedule.rrule.interval, until=schedule.last_date + relativedelta(days=+1), bysetpos=schedule.rrule.by_set_pos, byweekday=by_weekday_end, byweekno=by_week_no_end, ) ) for k in range(min(len(starts), len(ends))): # Correct dates for the (relatively seldom) case if: # E.g.: 1st Monday from 23:00:00 to 1st Tuesday 00:00:00 # produces wrong end dates if the 1st Tuesday is before the 1st Monday # In this case we take the next day instead of rrule's calculated end if starts[k] > ends[k]: ends[k] = datetime.combine( starts[k] + relativedelta(days=+1), schedule.end_time ) """ Add a number of days to the generated dates? This can be helpful for repetitions: Examples: 1. If RRule is "Every 1st Monday" and we want its repetition alyways to be on the following day, the repetition's RRule is the same but add_days_no is 1 If we would set the repetition to "Every 1st Tuesday" instead we will get unmeant results if the 1st Tuesday is before the 1st Monday (e.g. 1st Tue = May 1 2018, 1st Mon = May 7 2018) 2. If RRule is "Every 1st Friday" and we want its repetition always to be on the following business day, the repetition's RRule is the same but add_days_no is 1 and add_business_days_only is True (e.g. original date = Fri, March 2 2018; generated date = Mon, March 5 2018) In the UI these can be presets: "On the following day" (add_days_no=1,add_business_days_only=False) or "On the following business day" (add_days_no=1,add_business_days_only=True) """ if schedule.add_days_no is not None and schedule.add_days_no > 0: # If only business days and weekday is Fri, Sat or Sun: add add_days_no beginning # from Sunday weekday = datetime.date(starts[k]).weekday() if schedule.add_business_days_only and weekday > 3: days_until_sunday = 6 - weekday starts[k] = starts[k] + relativedelta( days=+days_until_sunday + schedule.add_days_no ) ends[k] = ends[k] + relativedelta( days=+days_until_sunday + schedule.add_days_no ) else: starts[k] = starts[k] + relativedelta(days=+schedule.add_days_no) ends[k] = ends[k] + relativedelta(days=+schedule.add_days_no) if ends[k].date() > schedule.last_date: schedule.last_date = ends[k].date() timeslots.append( TimeSlot( schedule=schedule, start=timezone.make_aware(starts[k]), end=timezone.make_aware(ends[k]), ) ) return timeslots # FIXME: this does not belong here @staticmethod def get_collisions(timeslots): """ Tests a list of timeslot objects for colliding timeslots in the database Returns a list of collisions, containing colliding timeslot IDs or None Keeps indices from input list for later comparison """ collisions = [] for ts in timeslots: collision = TimeSlot.objects.get_colliding_timeslots(ts) if collision: collisions.append( collision[0] ) # TODO: Do we really always retrieve one? else: collisions.append(None) return collisions # FIXME: this does not belong here @staticmethod def generate_conflicts(timeslots): """ Tests a list of timeslot objects for colliding timeslots in the database Returns a list of conflicts containing dicts of projected timeslots, collisions and solutions """ conflicts = {} projected = [] solutions = {} # Cycle each timeslot for ts in timeslots: # Contains collisions collisions = [] # Contains possible solutions solution_choices = set() # Get collisions for each timeslot collision_list = list( TimeSlot.objects.get_colliding_timeslots(ts).order_by("start") ) # Add the projected timeslot projected_entry = { "hash": ts.hash, "start": str(ts.start), "end": str(ts.end), } for c in collision_list: # Add the collision collision = { "id": c.id, "start": str(c.start), "end": str(c.end), "playlist_id": c.playlist_id, "show": c.show.id, "show_name": c.show.name, "is_repetition": c.is_repetition, "schedule": c.schedule_id, "memo": c.memo, } # Get note try: note = Note.objects.get(timeslot=c.id) collision["note_id"] = note.pk except ObjectDoesNotExist: collision["note_id"] = None collisions.append(collision) """Determine acceptable solutions""" if len(collision_list) > 1: # If there is more than one collision: Only these two are supported at the # moment solution_choices.add("theirs") solution_choices.add("ours") else: # These two are always possible: Either keep theirs and remove ours or vice # versa solution_choices.add("theirs") solution_choices.add("ours") # Partly overlapping: projected starts earlier than existing and ends earlier # # ex. pr. # +--+ # | | # +--+ | | # | | +--+ # | | # +--+ # if ts.end > c.start > ts.start <= c.end: solution_choices.add("theirs-end") solution_choices.add("ours-end") # Partly overlapping: projected starts later than existing and ends later # # ex. pr. # +--+ # | | # | | +--+ # +--+ | | # | | # +--+ # if c.start <= ts.start < c.end < ts.end: solution_choices.add("theirs-start") solution_choices.add("ours-start") # Fully overlapping: projected starts earlier and ends later than existing # # ex. pr. # +--+ # +--+ | | # | | | | # +--+ | | # +--+ # if ts.start < c.start and ts.end > c.end: solution_choices.add("theirs-end") solution_choices.add("theirs-start") solution_choices.add("theirs-both") # Fully overlapping: projected starts later and ends earlier than existing # # ex. pr. # +--+ # | | +--+ # | | | | # | | +--+ # +--+ # if ts.start > c.start and ts.end < c.end: solution_choices.add("ours-end") solution_choices.add("ours-start") solution_choices.add("ours-both") if len(collisions) > 0: solutions[ts.hash] = "" projected_entry["collisions"] = collisions projected_entry["solution_choices"] = solution_choices projected_entry["error"] = None projected.append(projected_entry) conflicts["projected"] = projected conflicts["solutions"] = solutions conflicts["notes"] = {} conflicts["playlists"] = {} return conflicts # FIXME: this does not belong here @staticmethod def make_conflicts(sdl, schedule_pk, show_pk): """ Retrieves POST vars Generates a schedule Generates conflicts: Returns timeslots, collisions, solutions as JSON Returns conflicts dict """ # Generate schedule to be saved schedule = Schedule.instantiate_upcoming(sdl, show_pk, schedule_pk) # Copy if first_date changes for generating timeslots gen_schedule = schedule # Generate timeslots # If extending: Get last timeslot and start generating from that date on if schedule_pk is not None: existing_schedule = Schedule.objects.get(pk=int(schedule_pk)) if schedule.last_date > existing_schedule.last_date: last_timeslot = ( TimeSlot.objects.filter(schedule=existing_schedule) .order_by("start") .reverse()[0] ) gen_schedule.first_date = last_timeslot.start.date() + timedelta(days=1) timeslots = Schedule.generate_timeslots(gen_schedule) # Generate conflicts and add schedule conflicts = Schedule.generate_conflicts(timeslots) conflicts["schedule"] = model_to_dict(schedule) return conflicts # FIXME: this does not belong here @staticmethod def resolve_conflicts(data, schedule_pk, show_pk): """ Resolves conflicts Expects JSON POST/PUT data from /shows/1/schedules/ Returns a list of dicts if errors were found Returns an empty list if resolution was successful """ sdl = data["schedule"] solutions = data["solutions"] # Regenerate conflicts schedule = Schedule.instantiate_upcoming(sdl, show_pk, schedule_pk) show = schedule.show conflicts = Schedule.make_conflicts(sdl, schedule_pk, show_pk) if schedule.rrule.freq > 0 and schedule.first_date == schedule.last_date: raise ValidationError( _("Start and end dates must not be the same."), code="no-same-day-start-and-end", ) if schedule.last_date < schedule.first_date: raise ValidationError( _("End date mustn't be before start."), code="no-start-after-end", ) num_conflicts = len( [pr for pr in conflicts["projected"] if len(pr["collisions"]) > 0] ) if len(solutions) != num_conflicts: raise ValidationError( _("Numbers of conflicts and solutions don't match."), code="one-solution-per-conflict", ) # Projected timeslots to create create = [] # Existing timeslots to update update = [] # Existing timeslots to delete delete = [] # Error messages errors = {} for ts in conflicts["projected"]: # If no solution necessary # # - Create the projected timeslot and skip # if "solution_choices" not in ts or len(ts["collisions"]) < 1: projected_ts = TimeSlot.objects.instantiate( ts["start"], ts["end"], schedule, show ) create.append(projected_ts) continue # Check hash (if start, end, rrule or byweekday changed) if not ts["hash"] in solutions: errors[ts["hash"]] = _("This change on the timeslot is not allowed.") continue # If no resolution given # # - Skip # if solutions[ts["hash"]] == "": errors[ts["hash"]] = _("No solution given.") continue # If resolution is not accepted for this conflict # # - Skip # if not solutions[ts["hash"]] in ts["solution_choices"]: errors[ts["hash"]] = _( "Given solution is not accepted for this conflict." ) continue """Conflict resolution""" existing = ts["collisions"][0] solution = solutions[ts["hash"]] # theirs # # - Discard the projected timeslot # - Keep the existing collision(s) # if solution == "theirs": continue # ours # # - Create the projected timeslot # - Delete the existing collision(s) # if solution == "ours": projected_ts = TimeSlot.objects.instantiate( ts["start"], ts["end"], schedule, show ) create.append(projected_ts) # Delete collision(s) for ex in ts["collisions"]: try: existing_ts = TimeSlot.objects.get(pk=ex["id"]) delete.append(existing_ts) except ObjectDoesNotExist: pass # theirs-end # # - Keep the existing timeslot # - Create projected with end of existing start # if solution == "theirs-end": projected_ts = TimeSlot.objects.instantiate( ts["start"], existing["start"], schedule, show ) create.append(projected_ts) # ours-end # # - Create the projected timeslot # - Change the start of the existing collision to projected end # if solution == "ours-end": projected_ts = TimeSlot.objects.instantiate( ts["start"], ts["end"], schedule, show ) create.append(projected_ts) existing_ts = TimeSlot.objects.get(pk=existing["id"]) existing_ts.start = parse_datetime(ts["end"]) update.append(existing_ts) # theirs-start # # - Keep existing # - Create projected with start time of existing end # if solution == "theirs-start": projected_ts = TimeSlot.objects.instantiate( existing["end"], ts["end"], schedule, show ) create.append(projected_ts) # ours-start # # - Create the projected timeslot # - Change end of existing to projected start # if solution == "ours-start": projected_ts = TimeSlot.objects.instantiate( ts["start"], ts["end"], schedule, show ) create.append(projected_ts) existing_ts = TimeSlot.objects.get(pk=existing["id"]) existing_ts.end = parse_datetime(ts["start"]) update.append(existing_ts) # theirs-both # # - Keep existing # - Create two projected timeslots with end of existing start and start of existing # end # if solution == "theirs-both": projected_ts = TimeSlot.objects.instantiate( ts["start"], existing["start"], schedule, show ) create.append(projected_ts) projected_ts = TimeSlot.objects.instantiate( existing["end"], ts["end"], schedule, show ) create.append(projected_ts) # ours-both # # - Create projected # - Split existing into two: # - Set existing end time to projected start # - Create another one with start = projected end and end = existing end # if solution == "ours-both": projected_ts = TimeSlot.objects.instantiate( ts["start"], ts["end"], schedule, show ) create.append(projected_ts) existing_ts = TimeSlot.objects.get(pk=existing["id"]) existing_ts.end = parse_datetime(ts["start"]) update.append(existing_ts) projected_ts = TimeSlot.objects.instantiate( ts["end"], existing["end"], schedule, show ) create.append(projected_ts) # If there were any errors, don't make any db changes yet # but add error messages and return already chosen solutions if len(errors) > 0: conflicts = Schedule.make_conflicts( model_to_dict(schedule), schedule.pk, show.pk ) partly_resolved = conflicts["projected"] saved_solutions = {} # Add already chosen resolutions and error message to conflict for index, c in enumerate(conflicts["projected"]): # The element should only exist if there was a collision if len(c["collisions"]) > 0: saved_solutions[c["hash"]] = "" if ( c["hash"] in solutions and solutions[c["hash"]] in c["solution_choices"] ): saved_solutions[c["hash"]] = solutions[c["hash"]] if c["hash"] in errors: partly_resolved[index]["error"] = errors[c["hash"]] # Re-insert post data conflicts["projected"] = partly_resolved conflicts["solutions"] = saved_solutions conflicts["notes"] = data.get("notes") conflicts["playlists"] = data.get("playlists") return conflicts # Collect upcoming timeslots to delete which might still remain del_timeslots = TimeSlot.objects.filter( schedule=schedule, start__gt=schedule.last_date ) for del_ts in del_timeslots: delete.append(del_ts) # If 'dryrun' is true, just return the projected changes instead of executing them if "dryrun" in sdl and sdl["dryrun"]: return { "create": [model_to_dict(ts) for ts in create], "update": [model_to_dict(ts) for ts in update], "delete": [model_to_dict(ts) for ts in delete], } """Database changes if no errors found""" # Only save schedule if timeslots were created if create: # Create or update schedule schedule.save() # Update timeslots for ts in update: ts.save(update_fields=["start", "end"]) # Create timeslots for ts in create: ts.schedule = schedule # Reassign playlists if "playlists" in data and ts.hash in data["playlists"]: ts.playlist_id = int(data["playlists"][ts.hash]) ts.save() # Reassign notes if "notes" in data and ts.hash in data["notes"]: try: note = Note.objects.get(pk=int(data["notes"][ts.hash])) note.timeslot_id = ts.id note.save(update_fields=["timeslot_id"]) timeslot = TimeSlot.objects.get(pk=ts.id) timeslot.note_id = note.id timeslot.save(update_fields=["note_id"]) except ObjectDoesNotExist: pass # Delete manually resolved timeslots and those after until for dl in delete: dl.delete() return model_to_dict(schedule) class TimeSlotManager(models.Manager): @staticmethod def instantiate(start, end, schedule, show): return TimeSlot( start=parse_datetime(start), end=parse_datetime(end), show=show, is_repetition=schedule.is_repetition, schedule=schedule, ) @staticmethod def get_24h_timeslots(start): end = timezone.make_aware(start + timedelta(hours=24)) return TimeSlot.objects.filter( Q(start__lte=start, end__gte=start) | Q(start__gt=start, start__lt=end) ).exclude(end=start) @staticmethod def get_7d_timeslots(start): start = datetime.combine(start, time(0, 0)) end = timezone.make_aware(start + timedelta(days=7)) return TimeSlot.objects.filter( Q(start__lte=start, end__gte=start) | Q(start__gt=start, start__lt=end) ).exclude(end=start) @staticmethod def get_timerange_timeslots(start, end): return TimeSlot.objects.filter( Q(start__lte=start, end__gte=start) | Q(start__gt=start, start__lt=end) ).exclude(end=start) @staticmethod def get_colliding_timeslots(timeslot): return TimeSlot.objects.filter( (Q(start__lt=timeslot.end) & Q(end__gte=timeslot.end)) | (Q(end__gt=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__gte=timeslot.start) & Q(end__lte=timeslot.end)) | (Q(start__lte=timeslot.start) & Q(end__gte=timeslot.end)) ) class TimeSlot(models.Model): schedule = models.ForeignKey( Schedule, on_delete=models.CASCADE, related_name="timeslots" ) show = models.ForeignKey( Show, editable=False, on_delete=models.CASCADE, related_name="timeslots" ) start = models.DateTimeField() end = models.DateTimeField() memo = models.TextField(blank=True) is_repetition = models.BooleanField(default=False) playlist_id = models.IntegerField(null=True) note_id = models.IntegerField(null=True, editable=False) objects = TimeSlotManager() class Meta: ordering = ("start", "end") def __str__(self): if self.start.date() == self.end.date(): time_span = "{0}, {1} - {2}".format( self.start.strftime("%x"), self.start.strftime("%X"), self.end.strftime("%X"), ) else: time_span = "{0} - {1}".format( self.start.strftime("%X %x"), self.end.strftime("%X %x"), ) return f"{str(self.show)} ({time_span})" def save(self, *args, **kwargs): self.show = self.schedule.show super(TimeSlot, self).save(*args, **kwargs) return self @property def hash(self): string = ( str(self.start) + str(self.end) + str(self.schedule.rrule.id) + str(self.schedule.by_weekday) ) return str("".join(s for s in string if s.isdigit())) class Note(models.Model): timeslot = models.OneToOneField(TimeSlot, on_delete=models.CASCADE, unique=True) title = models.CharField(max_length=128) slug = models.SlugField(max_length=32, unique=True) summary = models.TextField(blank=True) content = models.TextField() ppoi = PPOIField() height = models.PositiveIntegerField(blank=True, null=True, editable=False) width = models.PositiveIntegerField(blank=True, null=True, editable=False) image = VersatileImageField( blank=True, null=True, upload_to="note_images", width_field="width", height_field="height", ppoi_field="ppoi", ) status = models.IntegerField(default=1) start = models.DateTimeField(editable=False) show = models.ForeignKey( Show, on_delete=models.CASCADE, related_name="notes", editable=True ) cba_id = models.IntegerField(blank=True, null=True) user = models.ForeignKey( User, editable=False, on_delete=models.CASCADE, related_name="users", default=1 ) host = models.ForeignKey( Host, on_delete=models.CASCADE, related_name="hosts", null=True ) class Meta: ordering = ("timeslot",) def __str__(self): return self.title def save(self, *args, **kwargs): self.start = self.timeslot.start self.show = self.timeslot.schedule.show timeslot = TimeSlot.objects.get(pk=self.timeslot.id) timeslot.note_id = self.id timeslot.save() super(Note, self).save(*args, **kwargs) # Generate thumbnails if self.image.name and THUMBNAIL_SIZES: for size in THUMBNAIL_SIZES: self.image.thumbnail = self.image.crop[size].name class NoteLink(Link): note = models.ForeignKey(Note, on_delete=models.CASCADE, related_name="links")