Skip to content

Commit

Permalink
Skip syncing when replicated distributions were not updated
Browse files Browse the repository at this point in the history
closes #5493
  • Loading branch information
lubosmj committed Aug 6, 2024
1 parent b618361 commit 613554e
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGES/5493.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Optimized replication to skip syncing when upstream distributions are unchanged. This uses the new
`content_last_updated` field on the distribution serializer to track content source timestamps.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generated by Django 4.2.11 on 2024-08-06 19:27

from django.db import migrations, models
import django.db.models.deletion
import django_lifecycle.mixins
import pulpcore.app.models.base


class Migration(migrations.Migration):

dependencies = [
('core', '0121_add_profile_artifacts_table'),
]

operations = [
migrations.CreateModel(
name='LastUpdatedRecord',
fields=[
('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)),
('pulp_created', models.DateTimeField(auto_now_add=True)),
('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)),
('content_last_updated', models.DateTimeField()),
('last_replication', models.DateTimeField()),
('distribution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.distribution')),
('upstream_pulp', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.upstreampulp')),
],
options={
'unique_together': {('distribution', 'upstream_pulp')},
},
bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model),
),
migrations.AddField(
model_name='upstreampulp',
name='last_updated_timestamps',
field=models.ManyToManyField(through='core.LastUpdatedRecord', to='core.distribution'),
),
]
2 changes: 1 addition & 1 deletion pulpcore/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@
from .progress import GroupProgressReport, ProgressReport

# Moved here to avoid a circular import with GroupProgressReport
from .replica import UpstreamPulp
from .replica import UpstreamPulp, LastUpdatedRecord
19 changes: 18 additions & 1 deletion pulpcore/app/models/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
"""

from django.db import models
from pulpcore.plugin.models import BaseModel, EncryptedTextField, AutoAddObjPermsMixin
from pulpcore.app.util import get_domain_pk
from pulpcore.plugin.models import (
AutoAddObjPermsMixin,
BaseModel,
Distribution,
EncryptedTextField,
)


class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):
Expand All @@ -28,9 +33,21 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):

pulp_label_select = models.TextField(null=True)

last_updated_timestamps = models.ManyToManyField(Distribution, through="LastUpdatedRecord")

class Meta:
unique_together = ("name", "pulp_domain")
permissions = [
("replicate_upstreampulp", "Can start a replication task"),
("manage_roles_upstreampulp", "Can manage roles on upstream pulps"),
]


class LastUpdatedRecord(BaseModel):
distribution = models.ForeignKey(Distribution, on_delete=models.CASCADE)
upstream_pulp = models.ForeignKey(UpstreamPulp, on_delete=models.CASCADE)
content_last_updated = models.DateTimeField()
last_replication = models.DateTimeField()

class Meta:
unique_together = ("distribution", "upstream_pulp")
104 changes: 98 additions & 6 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from django.db.models import Model
import logging

from datetime import datetime

from django.db.models import Model
from django.utils.dateparse import parse_datetime

from pulp_glue.common.context import PulpContext
from pulpcore.tasking.tasks import dispatch
from pulpcore.app.tasks.base import (
general_update,
general_create,
general_multi_delete,
)
from pulpcore.app.models import (
Distribution,
LastUpdatedRecord,
UpstreamPulp,
)

from pulpcore.plugin.util import get_url, get_domain

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,7 +55,7 @@ class Replicator:
app_label = None
sync_task = None

def __init__(self, pulp_ctx, task_group, tls_settings):
def __init__(self, pulp_ctx, task_group, tls_settings, server):
"""
:param pulp_ctx: PulpReplicaContext
:param task_group: TaskGroup
Expand All @@ -54,6 +64,7 @@ def __init__(self, pulp_ctx, task_group, tls_settings):
self.pulp_ctx = pulp_ctx
self.task_group = task_group
self.tls_settings = tls_settings
self.server = server
self.domain = get_domain()
self.distros_uris = [f"pdrn:{self.domain.pulp_id}:distributions"]

Expand Down Expand Up @@ -161,11 +172,12 @@ def distribution_data(self, repository, upstream_distribution):

def create_or_update_distribution(self, repository, upstream_distribution):
distribution_data = self.distribution_data(repository, upstream_distribution)
content_last_updated = self._get_last_updated_timestamp(upstream_distribution)
try:
distro = self.distribution_model_cls.objects.get(
name=upstream_distribution["name"], pulp_domain=self.domain
)
# Check that the distribution has the right repository associated
self._update_or_create_record_timestamps(distro, content_last_updated)
needs_update = self.needs_update(distribution_data, distro)
if needs_update:
# Update the distribution
Expand All @@ -182,20 +194,84 @@ def create_or_update_distribution(self, repository, upstream_distribution):
)
except self.distribution_model_cls.DoesNotExist:
# Dispatch a task to create the distribution
distribution_data["name"] = upstream_distribution["name"]
distribution_name = distribution_data["name"] = upstream_distribution["name"]
dispatch(
general_create,
distribution_create,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=self.distros_uris,
args=(self.app_label, self.distribution_serializer_name),
args=(
self.app_label,
self.distribution_serializer_name,
distribution_name,
self.server.pk,
str(content_last_updated),
),
kwargs={"data": distribution_data},
)

@staticmethod
def _get_last_updated_timestamp(upstream_distribution):
if content_last_updated := upstream_distribution.get("content_last_updated"):
return parse_datetime(content_last_updated)

def _update_or_create_record_timestamps(self, distro, content_last_updated):
try:
distribution_timestamp = LastUpdatedRecord.objects.get(
distribution=distro, upstream_pulp=self.server
)
except LastUpdatedRecord.DoesNotExist:
LastUpdatedRecord.objects.create(
distribution=distro,
upstream_pulp=self.server,
last_updated=content_last_updated,
last_replication=datetime.now(),
)
else:
updated = False
if content_last_updated != distribution_timestamp.content_last_updated:
distribution_timestamp.content_last_updated = content_last_updated
updated = True
if distribution_timestamp.last_replication < self.server.pulp_last_updated:
distribution_timestamp.last_replication = datetime.now()
updated = True

if updated:
distribution_timestamp.save(
update_fields=["content_last_updated", "last_replication"]
)

def sync_params(self, repository, remote):
"""This method returns a dict that will be passed as kwargs to the sync task."""
raise NotImplementedError("Each replicator must supply its own sync params.")

def requires_syncing(self, distro):
try:
local_distribution = Distribution.objects.get(
name=distro["name"], pulp_domain=self.domain
)
except Distribution.DoesNotExist:
# a local equivalent of the upstream distribution has not been created yet
return True

try:
updated_timestamp = LastUpdatedRecord.objects.get(
distribution=local_distribution, upstream_pulp=self.server
)
except LastUpdatedRecord.DoesNotExist:
# missing data about last updates, perhaps because the local replica does not exist
return True

if updated_timestamp.last_replication < self.server.pulp_last_updated:
# the server configuration has changed since the last time (e.g., the value of base-url)
return True

if updated_timestamp.content_last_updated == self._get_last_updated_timestamp(distro):
# the upstream source has not changed
return False
else:
return True

def sync(self, repository, remote):
dispatch(
self.sync_task,
Expand Down Expand Up @@ -246,3 +322,19 @@ def remove_missing(self, names):
exclusive_resources=repositories + remotes,
args=(repository_ids + remote_ids,),
)


def distribution_create(app_label, serializer_name, distro_name, server_pk, last_updated, **kwargs):
general_create(app_label, serializer_name, **kwargs)
upstream_timestamp_create(distro_name, server_pk, last_updated)


def upstream_timestamp_create(distribution_name, server_pk, last_updated):
distribution = Distribution.objects.get(name=distribution_name, pulp_domain=get_domain())
server = UpstreamPulp.objects.get(pk=server_pk)
LastUpdatedRecord.objects.create(
distribution=distribution,
upstream_pulp=server,
last_updated=parse_datetime(last_updated),
last_replication=datetime.now(),
)
2 changes: 1 addition & 1 deletion pulpcore/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@
UserRoleSerializer,
UserSerializer,
)
from .replica import UpstreamPulpSerializer
from .replica import UpstreamPulpSerializer, LastUpdatedRecordSerializer
47 changes: 46 additions & 1 deletion pulpcore/app/serializers/publication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from gettext import gettext as _

from django.db.models import Q
from contextlib import suppress

from django.db.models import Q, ObjectDoesNotExist
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers

Expand Down Expand Up @@ -160,6 +162,42 @@ class Meta(ContentGuardSerializer.Meta):
fields = ContentGuardSerializer.Meta.fields + ("header_name", "header_value", "jq_filter")


class ContentLastUpdatedField(serializers.DateTimeField, serializers.ReadOnlyField):
"""
A serializer field representing the last updated timestamp for a used content source.
"""

def get_attribute(self, instance):
return instance

def to_representation(self, instance):
"""
Fetch the last_updated timestamp of the content source (e.g., publication) or distribution.
"""
repository = instance.repository
publication = instance.publication
repo_version = instance.repository_version

if publication or repo_version:
return instance.pulp_last_updated

if repository:
with suppress(ObjectDoesNotExist):
versions = repository.versions.all()
publications = models.Publication.objects.filter(
repository_version__in=versions, complete=True
)
publication = publications.select_related("repository_version").latest(
"repository_version", "pulp_created"
)
return max(publication.pulp_last_updated, instance.pulp_last_updated)

repo_version = repository.latest_version()
return max(repo_version.pulp_last_updated, instance.pulp_last_updated)

return None


class DistributionSerializer(ModelSerializer):
"""
The Serializer for the Distribution model.
Expand Down Expand Up @@ -232,13 +270,20 @@ class DistributionSerializer(ModelSerializer):
hidden = serializers.BooleanField(
default=False, help_text=_("Whether this distribution should be shown in the content app.")
)
content_last_updated = ContentLastUpdatedField(
help_text=_(
"Last updated timestamp of the content source "
"(e.g., repository_version) or distribution."
)
)

class Meta:
model = models.Distribution
fields = ModelSerializer.Meta.fields + (
"base_path",
"base_url",
"content_guard",
"content_last_updated",
"hidden",
"pulp_labels",
"name",
Expand Down
31 changes: 29 additions & 2 deletions pulpcore/app/serializers/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,30 @@
from rest_framework import serializers
from rest_framework.validators import UniqueValidator

from pulpcore.app.serializers import HiddenFieldsMixin
from pulpcore.app.serializers import (
DetailRelatedField,
HiddenFieldsMixin,
IdentityField,
ModelSerializer,
)


from pulpcore.app.models import UpstreamPulp
from pulpcore.app.models import UpstreamPulp, Distribution, LastUpdatedRecord


class LastUpdatedRecordSerializer(ModelSerializer):
"""
Serializer for recording the last update timestamps from Server's distributions.
"""

distribution = DetailRelatedField(
view_name_pattern=r"distributions(-.*/.*)-detail",
queryset=Distribution.objects.all(),
)

class Meta:
model = LastUpdatedRecord
fields = ("distribution", "content_last_updated")


class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
Expand Down Expand Up @@ -85,6 +101,16 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
required=False,
)

last_updated_timestamps = LastUpdatedRecordSerializer(
help_text=_(
"A list of pairs of local distributions and last "
"updated timestamps of their upstream equivalents"
),
source="lastupdatedrecord_set",
many=True,
read_only=True,
)

class Meta:
abstract = True
model = UpstreamPulp
Expand All @@ -96,6 +122,7 @@ class Meta:
"ca_cert",
"client_cert",
"client_key",
"last_updated_timestamps",
"tls_validation",
"username",
"password",
Expand Down
Loading

0 comments on commit 613554e

Please sign in to comment.