From 613554eda79719fddad97390bd1d6c059d13b980 Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Tue, 6 Aug 2024 00:49:46 +0200 Subject: [PATCH] Skip syncing when replicated distributions were not updated closes #5493 --- CHANGES/5493.feature | 2 + ...timestamps_for_replicated_distributions.py | 37 +++++ pulpcore/app/models/__init__.py | 2 +- pulpcore/app/models/replica.py | 19 ++- pulpcore/app/replica.py | 104 +++++++++++++- pulpcore/app/serializers/__init__.py | 2 +- pulpcore/app/serializers/publication.py | 47 ++++++- pulpcore/app/serializers/replica.py | 31 ++++- pulpcore/app/tasks/replica.py | 10 +- .../tests/functional/api/test_replication.py | 130 +++++++++++++++++- 10 files changed, 367 insertions(+), 17 deletions(-) create mode 100644 CHANGES/5493.feature create mode 100644 pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py diff --git a/CHANGES/5493.feature b/CHANGES/5493.feature new file mode 100644 index 00000000000..7bf771d6100 --- /dev/null +++ b/CHANGES/5493.feature @@ -0,0 +1,2 @@ +Optimized replication to skip syncing when upstream distributions are unchanged. This uses the new +`content_last_updated` field on the distribution serializer to track content source timestamps. diff --git a/pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py b/pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py new file mode 100644 index 00000000000..18263e75d35 --- /dev/null +++ b/pulpcore/app/migrations/0122_record_timestamps_for_replicated_distributions.py @@ -0,0 +1,37 @@ +# Generated by Django 4.2.11 on 2024-08-06 19:27 + +from django.db import migrations, models +import django.db.models.deletion +import django_lifecycle.mixins +import pulpcore.app.models.base + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0121_add_profile_artifacts_table'), + ] + + operations = [ + migrations.CreateModel( + name='LastUpdatedRecord', + fields=[ + ('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)), + ('pulp_created', models.DateTimeField(auto_now_add=True)), + ('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)), + ('content_last_updated', models.DateTimeField()), + ('last_replication', models.DateTimeField()), + ('distribution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.distribution')), + ('upstream_pulp', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.upstreampulp')), + ], + options={ + 'unique_together': {('distribution', 'upstream_pulp')}, + }, + bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model), + ), + migrations.AddField( + model_name='upstreampulp', + name='last_updated_timestamps', + field=models.ManyToManyField(through='core.LastUpdatedRecord', to='core.distribution'), + ), + ] diff --git a/pulpcore/app/models/__init__.py b/pulpcore/app/models/__init__.py index 4eb55919d36..85fa54efca3 100644 --- a/pulpcore/app/models/__init__.py +++ b/pulpcore/app/models/__init__.py @@ -91,4 +91,4 @@ from .progress import GroupProgressReport, ProgressReport # Moved here to avoid a circular import with GroupProgressReport -from .replica import UpstreamPulp +from .replica import UpstreamPulp, LastUpdatedRecord diff --git a/pulpcore/app/models/replica.py b/pulpcore/app/models/replica.py index 348ff7a8ed3..6364bc2dff0 100644 --- a/pulpcore/app/models/replica.py +++ b/pulpcore/app/models/replica.py @@ -6,8 +6,13 @@ """ from django.db import models -from pulpcore.plugin.models import BaseModel, EncryptedTextField, AutoAddObjPermsMixin from pulpcore.app.util import get_domain_pk +from pulpcore.plugin.models import ( + AutoAddObjPermsMixin, + BaseModel, + Distribution, + EncryptedTextField, +) class UpstreamPulp(BaseModel, AutoAddObjPermsMixin): @@ -28,9 +33,21 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin): pulp_label_select = models.TextField(null=True) + last_updated_timestamps = models.ManyToManyField(Distribution, through="LastUpdatedRecord") + class Meta: unique_together = ("name", "pulp_domain") permissions = [ ("replicate_upstreampulp", "Can start a replication task"), ("manage_roles_upstreampulp", "Can manage roles on upstream pulps"), ] + + +class LastUpdatedRecord(BaseModel): + distribution = models.ForeignKey(Distribution, on_delete=models.CASCADE) + upstream_pulp = models.ForeignKey(UpstreamPulp, on_delete=models.CASCADE) + content_last_updated = models.DateTimeField() + last_replication = models.DateTimeField() + + class Meta: + unique_together = ("distribution", "upstream_pulp") diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 2ca7f457571..fd776b9de24 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -1,6 +1,10 @@ -from django.db.models import Model import logging +from datetime import datetime + +from django.db.models import Model +from django.utils.dateparse import parse_datetime + from pulp_glue.common.context import PulpContext from pulpcore.tasking.tasks import dispatch from pulpcore.app.tasks.base import ( @@ -8,6 +12,12 @@ general_create, general_multi_delete, ) +from pulpcore.app.models import ( + Distribution, + LastUpdatedRecord, + UpstreamPulp, +) + from pulpcore.plugin.util import get_url, get_domain _logger = logging.getLogger(__name__) @@ -45,7 +55,7 @@ class Replicator: app_label = None sync_task = None - def __init__(self, pulp_ctx, task_group, tls_settings): + def __init__(self, pulp_ctx, task_group, tls_settings, server): """ :param pulp_ctx: PulpReplicaContext :param task_group: TaskGroup @@ -54,6 +64,7 @@ def __init__(self, pulp_ctx, task_group, tls_settings): self.pulp_ctx = pulp_ctx self.task_group = task_group self.tls_settings = tls_settings + self.server = server self.domain = get_domain() self.distros_uris = [f"pdrn:{self.domain.pulp_id}:distributions"] @@ -161,11 +172,12 @@ def distribution_data(self, repository, upstream_distribution): def create_or_update_distribution(self, repository, upstream_distribution): distribution_data = self.distribution_data(repository, upstream_distribution) + content_last_updated = self._get_last_updated_timestamp(upstream_distribution) try: distro = self.distribution_model_cls.objects.get( name=upstream_distribution["name"], pulp_domain=self.domain ) - # Check that the distribution has the right repository associated + self._update_or_create_record_timestamps(distro, content_last_updated) needs_update = self.needs_update(distribution_data, distro) if needs_update: # Update the distribution @@ -182,20 +194,84 @@ def create_or_update_distribution(self, repository, upstream_distribution): ) except self.distribution_model_cls.DoesNotExist: # Dispatch a task to create the distribution - distribution_data["name"] = upstream_distribution["name"] + distribution_name = distribution_data["name"] = upstream_distribution["name"] dispatch( - general_create, + distribution_create, task_group=self.task_group, shared_resources=[repository], exclusive_resources=self.distros_uris, - args=(self.app_label, self.distribution_serializer_name), + args=( + self.app_label, + self.distribution_serializer_name, + distribution_name, + self.server.pk, + str(content_last_updated), + ), kwargs={"data": distribution_data}, ) + @staticmethod + def _get_last_updated_timestamp(upstream_distribution): + if content_last_updated := upstream_distribution.get("content_last_updated"): + return parse_datetime(content_last_updated) + + def _update_or_create_record_timestamps(self, distro, content_last_updated): + try: + distribution_timestamp = LastUpdatedRecord.objects.get( + distribution=distro, upstream_pulp=self.server + ) + except LastUpdatedRecord.DoesNotExist: + LastUpdatedRecord.objects.create( + distribution=distro, + upstream_pulp=self.server, + last_updated=content_last_updated, + last_replication=datetime.now(), + ) + else: + updated = False + if content_last_updated != distribution_timestamp.content_last_updated: + distribution_timestamp.content_last_updated = content_last_updated + updated = True + if distribution_timestamp.last_replication < self.server.pulp_last_updated: + distribution_timestamp.last_replication = datetime.now() + updated = True + + if updated: + distribution_timestamp.save( + update_fields=["content_last_updated", "last_replication"] + ) + def sync_params(self, repository, remote): """This method returns a dict that will be passed as kwargs to the sync task.""" raise NotImplementedError("Each replicator must supply its own sync params.") + def requires_syncing(self, distro): + try: + local_distribution = Distribution.objects.get( + name=distro["name"], pulp_domain=self.domain + ) + except Distribution.DoesNotExist: + # a local equivalent of the upstream distribution has not been created yet + return True + + try: + updated_timestamp = LastUpdatedRecord.objects.get( + distribution=local_distribution, upstream_pulp=self.server + ) + except LastUpdatedRecord.DoesNotExist: + # missing data about last updates, perhaps because the local replica does not exist + return True + + if updated_timestamp.last_replication < self.server.pulp_last_updated: + # the server configuration has changed since the last time (e.g., the value of base-url) + return True + + if updated_timestamp.content_last_updated == self._get_last_updated_timestamp(distro): + # the upstream source has not changed + return False + else: + return True + def sync(self, repository, remote): dispatch( self.sync_task, @@ -246,3 +322,19 @@ def remove_missing(self, names): exclusive_resources=repositories + remotes, args=(repository_ids + remote_ids,), ) + + +def distribution_create(app_label, serializer_name, distro_name, server_pk, last_updated, **kwargs): + general_create(app_label, serializer_name, **kwargs) + upstream_timestamp_create(distro_name, server_pk, last_updated) + + +def upstream_timestamp_create(distribution_name, server_pk, last_updated): + distribution = Distribution.objects.get(name=distribution_name, pulp_domain=get_domain()) + server = UpstreamPulp.objects.get(pk=server_pk) + LastUpdatedRecord.objects.create( + distribution=distribution, + upstream_pulp=server, + last_updated=parse_datetime(last_updated), + last_replication=datetime.now(), + ) diff --git a/pulpcore/app/serializers/__init__.py b/pulpcore/app/serializers/__init__.py index 0d8be05b3bb..43dd2bf417b 100644 --- a/pulpcore/app/serializers/__init__.py +++ b/pulpcore/app/serializers/__init__.py @@ -115,4 +115,4 @@ UserRoleSerializer, UserSerializer, ) -from .replica import UpstreamPulpSerializer +from .replica import UpstreamPulpSerializer, LastUpdatedRecordSerializer diff --git a/pulpcore/app/serializers/publication.py b/pulpcore/app/serializers/publication.py index 2d2363e5f03..51d70b47e19 100644 --- a/pulpcore/app/serializers/publication.py +++ b/pulpcore/app/serializers/publication.py @@ -1,6 +1,8 @@ from gettext import gettext as _ -from django.db.models import Q +from contextlib import suppress + +from django.db.models import Q, ObjectDoesNotExist from drf_spectacular.utils import extend_schema_field from rest_framework import serializers @@ -160,6 +162,42 @@ class Meta(ContentGuardSerializer.Meta): fields = ContentGuardSerializer.Meta.fields + ("header_name", "header_value", "jq_filter") +class ContentLastUpdatedField(serializers.DateTimeField, serializers.ReadOnlyField): + """ + A serializer field representing the last updated timestamp for a used content source. + """ + + def get_attribute(self, instance): + return instance + + def to_representation(self, instance): + """ + Fetch the last_updated timestamp of the content source (e.g., publication) or distribution. + """ + repository = instance.repository + publication = instance.publication + repo_version = instance.repository_version + + if publication or repo_version: + return instance.pulp_last_updated + + if repository: + with suppress(ObjectDoesNotExist): + versions = repository.versions.all() + publications = models.Publication.objects.filter( + repository_version__in=versions, complete=True + ) + publication = publications.select_related("repository_version").latest( + "repository_version", "pulp_created" + ) + return max(publication.pulp_last_updated, instance.pulp_last_updated) + + repo_version = repository.latest_version() + return max(repo_version.pulp_last_updated, instance.pulp_last_updated) + + return None + + class DistributionSerializer(ModelSerializer): """ The Serializer for the Distribution model. @@ -232,6 +270,12 @@ class DistributionSerializer(ModelSerializer): hidden = serializers.BooleanField( default=False, help_text=_("Whether this distribution should be shown in the content app.") ) + content_last_updated = ContentLastUpdatedField( + help_text=_( + "Last updated timestamp of the content source " + "(e.g., repository_version) or distribution." + ) + ) class Meta: model = models.Distribution @@ -239,6 +283,7 @@ class Meta: "base_path", "base_url", "content_guard", + "content_last_updated", "hidden", "pulp_labels", "name", diff --git a/pulpcore/app/serializers/replica.py b/pulpcore/app/serializers/replica.py index af3206ce1bf..47fd2fcbf0c 100644 --- a/pulpcore/app/serializers/replica.py +++ b/pulpcore/app/serializers/replica.py @@ -3,14 +3,30 @@ from rest_framework import serializers from rest_framework.validators import UniqueValidator -from pulpcore.app.serializers import HiddenFieldsMixin from pulpcore.app.serializers import ( + DetailRelatedField, + HiddenFieldsMixin, IdentityField, ModelSerializer, ) -from pulpcore.app.models import UpstreamPulp +from pulpcore.app.models import UpstreamPulp, Distribution, LastUpdatedRecord + + +class LastUpdatedRecordSerializer(ModelSerializer): + """ + Serializer for recording the last update timestamps from Server's distributions. + """ + + distribution = DetailRelatedField( + view_name_pattern=r"distributions(-.*/.*)-detail", + queryset=Distribution.objects.all(), + ) + + class Meta: + model = LastUpdatedRecord + fields = ("distribution", "content_last_updated") class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin): @@ -85,6 +101,16 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin): required=False, ) + last_updated_timestamps = LastUpdatedRecordSerializer( + help_text=_( + "A list of pairs of local distributions and last " + "updated timestamps of their upstream equivalents" + ), + source="lastupdatedrecord_set", + many=True, + read_only=True, + ) + class Meta: abstract = True model = UpstreamPulp @@ -96,6 +122,7 @@ class Meta: "ca_cert", "client_cert", "client_key", + "last_updated_timestamps", "tls_validation", "username", "password", diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 53a37973bfa..c1d33c46bf9 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -68,7 +68,8 @@ def replicate_distributions(server_pk): for config in pulp_plugin_configs(): if config.replicator_classes: for replicator_class in config.replicator_classes: - supported_replicators.append(replicator_class(ctx, task_group, tls_settings)) + replicator = replicator_class(ctx, task_group, tls_settings, server) + supported_replicators.append(replicator) for replicator in supported_replicators: distros = replicator.upstream_distributions(labels=server.pulp_label_select) @@ -78,13 +79,14 @@ def replicate_distributions(server_pk): remote = replicator.create_or_update_remote(upstream_distribution=distro) if not remote: # The upstream distribution is not serving any content, - # let if fall throug the cracks and be cleanup below. + # let if fall through the cracks and be cleanup below. continue # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) - # Dispatch a sync task - replicator.sync(repository, remote) + # Dispatch a sync task if needed + if replicator.requires_syncing(distro): + replicator.sync(repository, remote) # Get or create a distribution replicator.create_or_update_distribution(repository, distro) diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index cfdcb4e6d3a..d9eca083313 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -3,8 +3,9 @@ from pulpcore.client.pulpcore import ApiException from pulpcore.client.pulpcore import AsyncOperationResponse +from pulpcore.client.pulp_file import RepositorySyncURL -from pulpcore.tests.functional.utils import PulpTaskGroupError +from pulpcore.tests.functional.utils import PulpTaskGroupError, generate_iso @pytest.mark.parallel @@ -227,6 +228,133 @@ def test_replication_with_wrong_ca_cert( assert task.state == "completed" +@pytest.mark.parallel +def test_replication_optimization( + check_replication, + domain_factory, + bindings_cfg, + pulpcore_bindings, + pulp_settings, + file_bindings, + file_repository_factory, + file_remote_factory, + file_distribution_factory, + file_publication_factory, + basic_manifest_path, + monitor_task, + gen_object_with_cleanup, + tmp_path, +): + non_default_domain = domain_factory() + source_domain = domain_factory() + upstream_pulp_body = { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + } + upstream_pulp = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=non_default_domain.name + ) + + # sync a repository on the "remote" Pulp instance + upstream_remote = file_remote_factory( + pulp_domain=source_domain.name, manifest_path=basic_manifest_path, policy="immediate" + ) + upstream_repository = file_repository_factory(pulp_domain=source_domain.name) + + repository_sync_data = RepositorySyncURL(remote=upstream_remote.pulp_href, mirror=True) + response = file_bindings.RepositoriesFileApi.sync( + upstream_repository.pulp_href, repository_sync_data + ) + monitor_task(response.task) + upstream_repository = file_bindings.RepositoriesFileApi.read(upstream_repository.pulp_href) + upstream_publication = file_publication_factory( + pulp_domain=source_domain.name, repository_version=upstream_repository.latest_version_href + ) + upstream_distribution = file_distribution_factory( + pulp_domain=source_domain.name, publication=upstream_publication.pulp_href + ) + + # replicate the "remote" instance + check_replication(upstream_pulp, upstream_distribution, non_default_domain) + + # replicate the "remote" instance again to check if the timestamp was NOT changed + check_replication(upstream_pulp, upstream_distribution, non_default_domain) + + # upload new content to the repository on the "remote" Pulp instance (creating a new version) + filename = tmp_path / str(uuid.uuid4()) + generate_iso(filename) + relative_path = "1.iso" + + response = file_bindings.ContentFilesApi.create( + relative_path, + file=filename, + repository=upstream_repository.pulp_href, + pulp_domain=source_domain.name, + ) + monitor_task(response.task) + upstream_repository = file_bindings.RepositoriesFileApi.read(upstream_repository.pulp_href) + upstream_publication = file_publication_factory( + pulp_domain=source_domain.name, repository_version=upstream_repository.latest_version_href + ) + response = file_bindings.DistributionsFileApi.partial_update( + upstream_distribution.pulp_href, + {"publication": upstream_publication.pulp_href}, + ) + monitor_task(response.task) + + # replicate the "remote" instance to check if the timestamp was correctly changed + check_replication(upstream_pulp, upstream_distribution, non_default_domain) + + +@pytest.fixture +def check_replication(pulpcore_bindings, file_bindings, monitor_task_group): + def _check_replication(upstream_pulp, upstream_distribution, local_domain): + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + # check if the replication succeeded + task_group = monitor_task_group(response.task_group) + for task in task_group.tasks: + assert task.state == "completed" + + upstream_distribution = file_bindings.DistributionsFileApi.read( + upstream_distribution.pulp_href + ) + distribution = file_bindings.DistributionsFileApi.list( + name=upstream_distribution.name, + pulp_domain=local_domain.name, + ).results[0] + + upstream_pulp = pulpcore_bindings.UpstreamPulpsApi.read(upstream_pulp.pulp_href) + # check if the timestamps of the updated distribution match + for timestamp in upstream_pulp.last_updated_timestamps: + if timestamp.distribution == distribution.pulp_href: + assert upstream_distribution.content_last_updated == timestamp.content_last_updated + break + else: + assert False, "The replica of the upstream distribution was not found" + + # check if the content was correctly replicated + local_version = file_bindings.RepositoriesFileApi.read( + distribution.repository + ).latest_version_href + local_present = file_bindings.RepositoriesFileVersionsApi.read( + local_version + ).content_summary.present + upstream_version = file_bindings.PublicationsFileApi.read( + upstream_distribution.publication + ).repository_version + upstream_present = file_bindings.RepositoriesFileVersionsApi.read( + upstream_version + ).content_summary.present + + assert upstream_present["file.file"]["count"] == local_present["file.file"]["count"] + + return _check_replication + + @pytest.fixture() def gen_users(gen_user): """Returns a user generator function for the tests."""