Newer
Older
#
# steering, Programme/schedule management for AURA
#
# Copyright (C) 2011-2017, 2020, Ernesto Rico Schmidt
# Copyright (C) 2017-2019, Ingo Leindecker
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Iterable
from datetime import datetime
from functools import cached_property
from zoneinfo import ZoneInfo
from drf_jsonschema_serializer import JSONSchemaField
from drf_spectacular.utils import extend_schema_field
from rest_framework.permissions import exceptions
from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist

Ernesto Rico Schmidt
committed
from django.db import IntegrityError, transaction
from django.db.models import Q
from django.utils import text, timezone
Playlist,
ProgramEntry,
from program.typing import (
Logo,
MicroProgram,
ProgramFallback,
RadioCBASettings,
RadioImageRequirementsSettings,
RadioPlayoutSettings,
RadioProgramSettings,
RadioStationSettings,
)
from program.utils import update_links
SOLUTION_CHOICES = {
"theirs": "Discard projected timeslot. Keep existing timeslot(s).",
"ours": "Create projected timeslot. Delete existing timeslot(s).",
"theirs-start": (
"Keep existing timeslot. Create projected timeslot with start time of existing end."
),
"ours-start": (
"Create projected timeslot. Change end of existing timeslot to projected start time."
),
"theirs-end": (
"Keep existing timeslot. Create projected timeslot with end of existing start time."
),
"ours-end": (
"Create projected timeslot. Change start of existing timeslot to projected end time."
),
"theirs-both": (
"Keep existing timeslot. "
"Create two projected timeslots with end of existing start and start of existing end."
),
"ours-both": (
"Create projected timeslot. Split existing timeslot into two: \n\n"
"* set existing end time to projected start,\n"
"* create another timeslot with start = projected end and end = existing end."
),
}
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class UpdateFieldPermissionValidatorMixin:
"""
Validates that fields can only be saved if the appropriate permissions have been given.
The required field permissions are determined based the following strategy:
1. If Serializer.Meta.custom_field_write_permission_map[{field_name}] has been set
the permissions defined there will be used.
2. If permission in the style of
{app_label}.edit__{model_name}__{field_name}
exists, that permission will be used.
3. If none of the above are available, no permissions will be used for the field.
The permissions for each field, for which specific write permissions have been defined,
will be checked on update and create operations. If the user does not have any one of the
required field permissions for the provided fields the request will be rejected with a
403 error including a list of fields that the user is not allowed to set.
"""
def get_validators(self):
def validate_field_permissions(data, *args, **kwargs):
user = self.context.get("request").user
self._check_field_write_permissions(user, data, self.instance)
return [*super().get_validators(), validate_field_permissions]
def _generate_field_permission_name_candidates(self, model_name: str, field_name: str):
prefix = f"edit__{model_name}__"
yield prefix + field_name
if field_name.endswith("_id"):
yield prefix + field_name[:-3]
if field_name.endswith("_ids"):
yield prefix + field_name[:-4] + "s"
def _get_field_write_permission_map(self, field_names: set[str]):
custom_map: dict[str, str | Iterable[str]] = getattr(
self.Meta, "custom_field_write_permission_map", {}
)
model = self.Meta.model
custom_model_permissions = set(dict(model._meta.permissions).keys())
app_label = model._meta.app_label
model_name = model._meta.model_name
result = {}
for field_name in field_names:
if perm := custom_map.get(field_name):
if isinstance(perm, str):
result[field_name] = {perm}
else:
result[field_name] = set(perm)
continue
for perm in self._generate_field_permission_name_candidates(model_name, field_name):
if perm in custom_model_permissions:
result[field_name] = {f"{app_label}.{perm}"}
return result
def _check_field_write_permissions(self, user, validated_data, instance=None):
field_error_map = {}
field_write_permission_map = self._get_field_write_permission_map(validated_data.keys())
client_name_map = {field.source: field.field_name for field in self._writable_fields}
for field_name, required_permissions in field_write_permission_map.items():
if not user.has_perms(required_permissions):
# The name of the field in validated_data is not necessarily
# the same as the field name the client sent.
# The error mapping should reflect the name the client sent.
client_field_name = client_name_map.get(field_name, field_name)
field_error_map[client_field_name] = exceptions.ErrorDetail(
"You do not have permission to set this field.",
code="missing-field-permission",
)
if field_error_map:
raise exceptions.PermissionDenied(field_error_map)
class SimpleAuditUpdaterMixin:
def _update_audit_data(self, instance, *username_fields):
username = self.context.get("request").user.username
for field in username_fields:
setattr(instance, field, username)
instance.save(update_fields=username_fields)
def update(self, instance, validated_data):
instance = super().update(instance, validated_data)
self._update_audit_data(instance, "updated_by")
return instance
def create(self, validated_data):
instance = super().create(validated_data)
self._update_audit_data(instance, "created_by", "updated_by")
return instance
class ErrorSerializer(serializers.Serializer):
message = serializers.CharField()
code = serializers.CharField(allow_null=True)
class CBASerializer(serializers.ModelSerializer):
read_only_fields = (
"created_at",
"created_by",
"updated_at",
"updated_by",
)
fields = (
"username",
"user_token",
) + read_only_fields
def to_representation(self, instance):
if not self.parent.context.get("request").user.is_authenticated:
return None
return super().to_representation(instance)
class UserSerializer(serializers.ModelSerializer):
is_privileged = serializers.SerializerMethodField()
permissions = serializers.SerializerMethodField()
cba = CBASerializer(required=False)
profile_ids = serializers.PrimaryKeyRelatedField(
many=True, queryset=Profile.objects.all(), required=False, source="profiles"
class Meta:
extra_kwargs = {
"password": {"write_only": True},
}
model = User
) + read_only_fields
@staticmethod
def get_permissions(obj: User) -> list[str]:
return sorted(
[p for p in obj.get_all_permissions() if p.split(".", 1)[0] in ["auth", "program"]]
)
@staticmethod
def get_is_privileged(obj: User) -> bool:
return obj.is_superuser
def create(self, validated_data):
"""
Create and return a new User instance, given the validated data.
"""
cba_data = validated_data.pop("cba") if "cba" in validated_data else None
user = super(UserSerializer, self).create(validated_data)
user.date_joined = timezone.now()
user.set_password(validated_data["password"])
if cba_data:
CBA.objects.create(
username=cba_data.get("username").strip(),
user_token=cba_data.get("user_token").strip(),
created_by=self.context.get("request").user.username,
user=user,
def update(self, instance, validated_data):
"""
Update and return an existing User instance, given the validated data.
"""
if "first_name" in validated_data:
instance.first_name = validated_data.get("first_name")
if "last_name" in validated_data:
instance.last_name = validated_data.get("last_name")
if "email" in validated_data:
instance.email = validated_data.get("email")
if "is_active" in validated_data:
instance.is_active = validated_data.get("is_active")
if "is_staff" in validated_data:
instance.is_staff = validated_data.get("is_staff")
if "is_superuser" in validated_data:
instance.is_superuser = validated_data.get("is_superuser")
cba_data = validated_data.pop("cba") if "cba" in validated_data else None
except ObjectDoesNotExist:
user = self.context.get("request").user
if cba:
# having the update_cba permission overrides being the user
if not (user.has_perm("program.update_cba") or user.id == instance.id):
raise exceptions.PermissionDenied(
detail="You do not have permission to update this user CBA profile."
if "username" in cba_data:
cba.username = cba_data.get("username")
if "user_token" in cba_data:
cba.user_token = cba_data.get("user_token")
cba.updated_by = self.context.get("request").user.username
cba.save()
else:
# having the create_cba permission overrides being the user
if not (user.has_perm("program.create_cba") or user.id == instance.id):
raise exceptions.PermissionDenied(
detail="You do not have permission to create this user CBA profile."
created_by=self.context.get("request").user.username,
user=instance,
instance.save()
return instance
class CategorySerializer(serializers.ModelSerializer):
class Meta:
model = Category
fields = ("description", "id", "is_active", "name", "slug", "subtitle")
class LinkTypeSerializer(serializers.ModelSerializer):
class Meta:
model = LinkType
fields = ("id", "is_active", "name")
class LicenseSerializer(serializers.ModelSerializer):
fields = (
"id",
"identifier",
"name",
"needs_author",
"requires_express_permission_for_publication",
"url",
)
class ProfileLinkSerializer(serializers.ModelSerializer):
type_id = serializers.PrimaryKeyRelatedField(queryset=LinkType.objects.all(), source="type")
class Meta:
fields = ("type_id", "url")
class PPOIField(serializers.CharField):
def validate_format(self, value: str):
if not re.match(r"\d(?:\.\d+)?x\d(?:\.\d+)?", value):
raise serializers.ValidationError("PPOI must match format: ${float}x${float}")
def __init__(self, **kwargs):
kwargs["max_length"] = 20
kwargs.setdefault("validators", [])
kwargs["validators"].append(self.validate_format)
super().__init__(**kwargs)
def to_representation(self, value: tuple[float, float]):
[left, top] = value
return f"{left}x{top}"
class ImageSerializer(serializers.ModelSerializer):
license_id = serializers.PrimaryKeyRelatedField(
allow_null=True,
queryset=License.objects.all(),
required=False,
source="license",
help_text="`License` ID of this image.",
ppoi = PPOIField(required=False, help_text="PPOI specifies the crop centerpoint of the image.")

Ernesto Rico Schmidt
committed
url = serializers.SerializerMethodField()

Ernesto Rico Schmidt
committed
@staticmethod
def get_url(instance: Image) -> str:
"""Returns the image URL, using settings.SITE_URL to include the protocol and avoid mixed
media warnings."""
return f"{settings.SITE_URL}{instance.image.url}"
extra_kwargs = {"image": {"write_only": True}}
model = Image
read_only_fields = (
"height",
"id",

Ernesto Rico Schmidt
committed
"url",
"width",
)
fields = (
"alt_text",
"credits",
"image",

Konrad Mohrfeldt
committed
"is_use_explicitly_granted_by_author",
) + read_only_fields
def create(self, validated_data):
"""Create and return a new Image instance, given the validated data."""
image = Image.objects.create(
owner=self.context.get("request").user.username, **validated_data
)
image.save()
return image
def update(self, instance, validated_data):
"""Update and return an existing Image instance, given the validated data."""
if "alt_text" in validated_data:
instance.alt_text = validated_data.get("alt_text")
if "credits" in validated_data:
instance.credits = validated_data.get("credits")
if "license" in validated_data:
instance.license = validated_data.get("license")
if "ppoi" in validated_data:
instance.image.ppoi = validated_data.get("ppoi")
instance.save()
return instance
class ImageRenderSerializer(serializers.Serializer):
width = serializers.IntegerField(required=False, min_value=1)
height = serializers.IntegerField(required=False, min_value=1)
class ProfileSerializer(serializers.ModelSerializer):
image_id = serializers.PrimaryKeyRelatedField(
allow_null=True,
queryset=Image.objects.all(),
required=False,
source="image",
help_text="`Image` id of the profile.",
links = ProfileLinkSerializer(
many=True, required=False, help_text="Array of `Link` objects. Can be empty."
owner_ids = serializers.PrimaryKeyRelatedField(
many=True,
queryset=User.objects.all(),
source="owners",
help_text="User ID(s) that own this profile.",
class Meta:
read_only_fields = (
"created_at",
"created_by",
"updated_at",
"updated_by",
)
fields = (
"biography",
"email",
"id",
"image_id",
def to_representation(self, instance):
representation = super().to_representation(instance)
if not self.context.get("request").user.is_authenticated:
del representation["email"]
return representation
def create(self, validated_data):
Create and return a new Profile instance, given the validated data.
links_data = validated_data.pop("links", [])
validated_data["image"] = validated_data.pop("image", None)
profile = Profile.objects.create(
created_by=self.context.get("request").user.username, **validated_data
)
for link_data in links_data:
ProfileLink.objects.create(profile=profile, **link_data)
profile.owners.set(owners)
def update(self, instance, validated_data):
"""Update and return an existing Profile instance, given the validated data."""
user = self.context.get("request").user
user_is_owner = user in instance.owners.all()
user_permissions = set(
permission.split("__")[-1]
for permission in user.get_all_permissions()
if permission.startswith("program.edit__profile")
)
update_fields = set(validated_data.keys())
# having the update_profile permission overrides the ownership
if not (user.has_perm("program.update_profile") or (user_is_owner and user_permissions)):
raise exceptions.PermissionDenied(detail="You are not allowed to update this host.")
# without the update_profile permission, fields without edit permission are not allowed
if not user.has_perm("program.update_profile") and (
not_allowed := update_fields.difference(user_permissions)
):
detail = {field: "You are not allowed to edit this field" for field in not_allowed}
raise exceptions.PermissionDenied(detail=detail)
if "biography" in validated_data:
instance.biography = validated_data.get("biography")
if "name" in validated_data:
instance.name = validated_data.get("name")
if "email" in validated_data:
instance.email = validated_data.get("email")
if "image" in validated_data:
instance.image = validated_data.get("image")
if "is_active" in validated_data:
instance.is_active = validated_data.get("is_active")
# optional many-to-many
if "owners" in validated_data:
instance.owners.set(validated_data.get("owners"))
# optional nested objects
if "links" in validated_data:
instance = update_links(instance, validated_data.get("links"))
instance.updated_by = self.context.get("request").user.username
instance.save()
return instance
class LanguageSerializer(serializers.ModelSerializer):
class Meta:
model = Language
class TopicSerializer(serializers.ModelSerializer):
class Meta:
model = Topic
class MusicFocusSerializer(serializers.ModelSerializer):
class Meta:
model = MusicFocus
class TypeSerializer(serializers.ModelSerializer):

Ingo Leindecker
committed
class Meta:
model = Type

Ingo Leindecker
committed
class FundingCategorySerializer(serializers.ModelSerializer):

Ingo Leindecker
committed
class Meta:

Ingo Leindecker
committed
class ShowLinkSerializer(serializers.ModelSerializer):
type_id = serializers.PrimaryKeyRelatedField(queryset=LinkType.objects.all(), source="type")
class Meta:
model = ShowLink
fields = ("type_id", "url")
class ShowSerializer(serializers.HyperlinkedModelSerializer):
category_ids = serializers.PrimaryKeyRelatedField(
many=True,
queryset=Category.objects.all(),
source="category",
help_text="Array of `Category` IDs.",
)
cba_series_id = serializers.IntegerField(
allow_null=True, required=False, help_text="CBA series ID."
)
default_playlist_id = serializers.PrimaryKeyRelatedField(
allow_null=True,
help_text="Default `Playlist` ID for this show.",
queryset=Playlist.objects.all(),
required=False,
source="default_playlist",
)
funding_category_id = serializers.PrimaryKeyRelatedField(
queryset=FundingCategory.objects.all(),
source="funding_category",
help_text="`FundingCategory` ID.",
)
host_ids = serializers.PrimaryKeyRelatedField(
queryset=Profile.objects.all(),
help_text="`Profile` IDs that host this show.",
)
image_id = serializers.PrimaryKeyRelatedField(
allow_null=True,
queryset=Image.objects.all(),
required=False,
source="image",
help_text="`Image` ID of this show.",
)
language_ids = serializers.PrimaryKeyRelatedField(
many=True,
queryset=Language.objects.all(),
source="language",
help_text="`Language` IDs of this show.",
links = ProfileLinkSerializer(many=True, required=False, help_text="Array of `Link` objects.")
logo_id = serializers.PrimaryKeyRelatedField(
allow_null=True,
queryset=Image.objects.all(),
required=False,
source="logo",
help_text="`Image` ID of the logo of this show.",
)
music_focus_ids = serializers.PrimaryKeyRelatedField(
many=True,
queryset=MusicFocus.objects.all(),
source="music_focus",
help_text="Array of `MusicFocus` IDs.",
owner_ids = serializers.PrimaryKeyRelatedField(
many=True,
queryset=User.objects.all(),
source="owners",
help_text="Array of `User` IDs owning this Show.",
predecessor_id = serializers.PrimaryKeyRelatedField(
allow_null=True,
queryset=Show.objects.all(),
required=False,
source="predecessor",
help_text="`Show` ID that predeceeded this one.",
)
topic_ids = serializers.PrimaryKeyRelatedField(
many=True, queryset=Topic.objects.all(), source="topic", help_text="Array of `Topic` IDs."
)
type_id = serializers.PrimaryKeyRelatedField(
queryset=Type.objects.all(), source="type", help_text="Array of `Type` IDs."
class Meta:
model = Show
read_only_fields = (
"created_at",
"created_by",
"updated_at",
"updated_by",
)
"funding_category_id",
"host_ids",
"logo_id",
"music_focus_ids",
"owner_ids",
"predecessor_id",
"topic_ids",
"type_id",
def to_internal_value(self, data):
if data.get("cba_series_id") == "":
data["cba_series_id"] = None
if data.get("default_playlist_id") == "":
data["default_playlist_id"] = None
return super().to_internal_value(data)
def to_representation(self, instance):
representation = super().to_representation(instance)
if not self.context.get("request").user.is_authenticated:
del representation["email"]
elif not self.context.get("request").user.has_perm("display__show__internal_note"):
del representation["internal_note"]
return representation
def create(self, validated_data):
"""
Create and return a new Show instance, given the validated data.
"""
category = validated_data.pop("category")
hosts = validated_data.pop("hosts")
language = validated_data.pop("language")
music_focus = validated_data.pop("music_focus")
owners = validated_data.pop("owners")
topic = validated_data.pop("topic")
links_data = validated_data.pop("links", [])
validated_data["funding_category"] = validated_data.pop("funding_category")
validated_data["type"] = validated_data.pop("type")
validated_data["default_playlist"] = validated_data.pop("default_playlist", None)
validated_data["image"] = validated_data.pop("image", None)
validated_data["logo"] = validated_data.pop("logo", None)
validated_data["predecessor"] = validated_data.pop("predecessor", None)
validated_data["slug"] = validated_data.get(
"slug", text.slugify(validated_data.get("name"))
)
show = Show.objects.create(
created_by=self.context.get("request").user.username,
**validated_data,
# Save many-to-many relationships

jackie / Andrea Ida Malkah Klaura
committed
show.category.set(category)
show.hosts.set(hosts)
show.language.set(language)
show.music_focus.set(music_focus)
show.owners.set(owners)
show.topic.set(topic)
for link_data in links_data:
ShowLink.objects.create(show=show, **link_data)
def update(self, instance, validated_data):
"""Update and return an existing Show instance, given the validated data."""
user = self.context.get("request").user
user_is_owner = user in instance.owners.all()
user_permissions = set(
permission.split("__")[-1]
for permission in user.get_all_permissions()
if permission.startswith("program.edit__show")
)
update_fields = set(validated_data.keys())
# having update_show permission overrides the ownership
if not (user.has_perm("program.update_show") or (user_is_owner and user_permissions)):
raise exceptions.PermissionDenied(detail="You are not allowed to update this show.")
# without the update_show permission, fields without edit permission are not allowed
if not user.has_perm("program.update_show") and (
not_allowed := update_fields.difference(user_permissions)
detail = {field: "You are not allowed to edit this field" for field in not_allowed}
raise exceptions.PermissionDenied(detail=detail)
if "description" in validated_data:
instance.description = validated_data.get("description")
if "name" in validated_data:
instance.name = validated_data.get("name")
if "short_description" in validated_data:
instance.short_description = validated_data.get("short_description")
if "cba_series_id" in validated_data:
instance.cba_series_id = validated_data.get("cba_series_id")
if "default_playlist" in validated_data:
instance.default_playlist = validated_data.get("default_playlist")
if "email" in validated_data:
instance.email = validated_data.get("email")
if "funding_category" in validated_data:
instance.funding_category = validated_data.get("funding_category")
if "image" in validated_data:
instance.image = validated_data.get("image")
if "internal_note" in validated_data:
instance.internal_note = validated_data.get("internal_note")
if "is_active" in validated_data:
instance.is_active = validated_data.get("is_active")
if "is_public" in validated_data:
instance.is_public = validated_data.get("is_public")
if "logo" in validated_data:
instance.logo = validated_data.get("logo")
if "predecessor" in validated_data:
instance.predecessor = validated_data.get("predecessor")
if "slug" in validated_data:
instance.slug = validated_data.get("slug")
if "type" in validated_data:
instance.type = validated_data.get("type")
# optional many-to-many
if "category" in validated_data:
instance.category.set(validated_data.get("category", []))
if "hosts" in validated_data:
instance.hosts.set(validated_data.get("hosts", []))
if "language" in validated_data:
instance.language.set(validated_data.get("language", []))
if "music_focus" in validated_data:
instance.music_focus.set(validated_data.get("music_focus", []))
if "owners" in validated_data:
instance.owners.set(validated_data.get("owners", []))
if "topic" in validated_data:
instance.topic.set(validated_data.get("topic", []))
# optional nested objects
if "links" in validated_data:
instance = update_links(instance, validated_data.get("links"))
instance.updated_by = self.context.get("request").user.username
instance.save()
return instance
class RRuleSerializer(serializers.ModelSerializer):
class Meta:
model = RRule
fields = (
"by_set_pos",
"by_weekdays",
"count",
"freq",
"name",
)
read_only_fields = fields
class ScheduleSerializer(serializers.ModelSerializer):
class Meta:
model = Schedule
"rrule_id",
"show_id",
"start_time",
)
class UnsavedScheduleSerializer(ScheduleSerializer):
id = serializers.IntegerField(allow_null=True)
class ScheduleInRequestSerializer(ScheduleSerializer):
dryrun = serializers.BooleanField(
write_only=True,
required=False,
help_text=(
"Whether to simulate the database changes. If true, no database changes will occur. "
"Instead a list of objects that would be created, updated and deleted if dryrun was "
"false will be returned."
),
)
"default_playlist_id",
"dryrun",
"end_time",
"rrule_id",
"show_id",
"start_time",
)
def create(self, validated_data):
"""Create and return a new Schedule instance, given the validated data."""
validated_data["default_playlist"] = validated_data.pop("default_playlist_id")
validated_data["rrule"] = validated_data.pop("rrule_id")
validated_data["show"] = validated_data.pop("show_id")
schedule = Schedule.objects.create(**validated_data)
schedule.save()
return schedule
def update(self, instance, validated_data):
"""Update and return an existing Schedule instance, given the validated data."""
instance.by_weekday = validated_data.get("by_weekday", instance.by_weekday)
instance.first_date = validated_data.get("first_date", instance.first_date)
instance.start_time = validated_data.get("start_time", instance.start_time)
instance.end_time = validated_data.get("end_time", instance.end_time)
instance.last_date = validated_data.get("last_date", instance.last_date)
instance.is_repetition = validated_data.get("is_repetition", instance.is_repetition)
instance.default_playlist = validated_data.get(
"default_playlist_id", instance.default_playlist
instance.rrule = validated_data.get("rrule_id", instance.rrule)
instance.show = validated_data.get("show_id", instance.show)
instance.add_days_no = validated_data.get("add_days_no", instance.add_days_no)
instance.add_business_days_only = validated_data.get(