diff --git a/.github/workflows/scripts/post_before_script.sh b/.github/workflows/scripts/post_before_script.sh new file mode 100644 index 0000000000..f4b65ba545 --- /dev/null +++ b/.github/workflows/scripts/post_before_script.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -euv + +# # See pulpcore.app.util.ENABLE_6064_BACKPORT_WORKAROUND for context. +# This needs to be set here because it relies on service init. +# Its being tested in only one scenario to have both cases covered. +if [[ "$TEST" == "s3" ]]; then + cmd_prefix pulpcore-manager backport-patch-6064 +fi + diff --git a/CHANGES/5725.bugfix b/CHANGES/5725.bugfix new file mode 100644 index 0000000000..d935a47d8c --- /dev/null +++ b/CHANGES/5725.bugfix @@ -0,0 +1,8 @@ +On a request for on-demand content in the content app, a corrupted Remote that +contains the wrong binary (for that content) prevented other Remotes from being +attempted on future requests. Now the last failed Remotes are temporarily ignored +and others may be picked. + +Because the [original](https://github.com/pulp/pulpcore/pull/6064) contains a migraton, +this is backported here as an optional patch which can be enabled by running the +pulpcore-manager command: `backport-patch-6064`. diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index ee1e8dcefa..1183e61a84 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -83,11 +83,11 @@ def file_fixtures_root(tmp_path): @pytest.fixture def write_3_iso_file_fixture_data_factory(file_fixtures_root): - def _write_3_iso_file_fixture_data_factory(name, overwrite=False): + def _write_3_iso_file_fixture_data_factory(name, overwrite=False, seed=None): file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite) - file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso")) - file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso")) - file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso")) + file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"), seed=seed) + file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"), seed=seed) + file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"), seed=seed) generate_manifest( file_fixtures_root.joinpath(f"{name}/PULP_MANIFEST"), [file1, file2, file3] ) @@ -362,3 +362,19 @@ def _wget_recursive_download_on_host(url, destination): ) return _wget_recursive_download_on_host + + +@pytest.fixture +def generate_server_and_remote( + file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup +): + def _generate_server_and_remote(*, manifest_path, policy): + server = gen_fixture_server(file_fixtures_root, None) + url = server.make_url(manifest_path) + remote = gen_object_with_cleanup( + file_bindings.RemotesFileApi, + {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, + ) + return server, remote + + yield _generate_server_and_remote diff --git a/pulp_file/tests/functional/api/test_acs.py b/pulp_file/tests/functional/api/test_acs.py index 873a607402..b4150b3112 100644 --- a/pulp_file/tests/functional/api/test_acs.py +++ b/pulp_file/tests/functional/api/test_acs.py @@ -12,22 +12,6 @@ ) -@pytest.fixture -def generate_server_and_remote( - file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup -): - def _generate_server_and_remote(*, manifest_path, policy): - server = gen_fixture_server(file_fixtures_root, None) - url = server.make_url(manifest_path) - remote = gen_object_with_cleanup( - file_bindings.RemotesFileApi, - {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, - ) - return server, remote - - yield _generate_server_and_remote - - @pytest.mark.parallel def test_acs_validation_and_update( file_bindings, diff --git a/pulpcore/app/apps.py b/pulpcore/app/apps.py index 74eed9dcfc..fce116d361 100644 --- a/pulpcore/app/apps.py +++ b/pulpcore/app/apps.py @@ -261,6 +261,11 @@ def ready(self): sender=self, dispatch_uid="populate_artifact_serving_distribution_identifier", ) + post_migrate.connect( + _conditional_6064_backport_workaround, + sender=self, + dispatch_uid="conditional_6064_backport_workaround", + ) def _populate_access_policies(sender, apps, verbosity, **kwargs): @@ -416,3 +421,14 @@ def _populate_artifact_serving_distribution(sender, apps, verbosity, **kwargs): pulp_type="core.artifact", defaults={"base_path": name, "content_guard": content_guard}, ) + + +def _conditional_6064_backport_workaround(sender, apps, verbosity, **kwargs): + # See context in pulpcore.app.util.ENABLE_6064_BACKPORT_WORKAROUND + from pulpcore.app.models import RemoteArtifact + from django.db import models + import pulpcore.app.util + + if pulpcore.app.util.failed_at_exists(connection, RemoteArtifact): + pulpcore.app.util.ENABLE_6064_BACKPORT_WORKAROUND = True + RemoteArtifact.add_to_class("failed_at", models.DateTimeField(null=True)) diff --git a/pulpcore/app/management/commands/backport-patch-6064.py b/pulpcore/app/management/commands/backport-patch-6064.py new file mode 100644 index 0000000000..75d63ebc08 --- /dev/null +++ b/pulpcore/app/management/commands/backport-patch-6064.py @@ -0,0 +1,77 @@ +from django.core.management.base import BaseCommand +from gettext import gettext as _ +from django.db import connection +from pulpcore.app.models import RemoteArtifact + + +CHECK_COL_QUERY = """ +SELECT COUNT(*) +FROM information_schema.columns +WHERE table_name = %s +AND column_name = %s; +""" + +MODIFY_QUERY_TMPL = """ +ALTER TABLE {} +ADD COLUMN {} TIMESTAMPTZ DEFAULT NULL; +""" + +HELP = _( + """ +Enables patch backport of #6064 (https://github.com/pulp/pulpcore/pull/6064). + +The fix prevents corrupted remotes from making content unreacahble by adding +a cooldown time, which is tracked by a new field, 'RemoteArtifact.failed_at'. +This command adds the field to the appropriate table. +""" +) + + +class Command(BaseCommand): + help = HELP + + def add_arguments(self, parser): + parser.add_argument( + "--dry-run", + action="store_true", + help="Run the migration in dry-run mode without saving changes", + ) + + def handle(self, *args, **options): + dry_run = options.get("dry_run", False) + try: + with connection.cursor() as cursor: + # Check if column already exists + table_name = RemoteArtifact._meta.db_table + field_name = "failed_at" + cursor.execute(CHECK_COL_QUERY, [table_name, field_name]) + field_exists = cursor.fetchone()[0] > 0 + if field_exists: + self._print_success(f"Field '{table_name}.{field_name}' already exists.") + self._print_success("Nothing to be done") + return + + # Add field to table + self._print_info(f"Adding {field_name!r} column to {table_name!r}...") + MODIFY_QUERY = MODIFY_QUERY_TMPL.format(table_name, field_name) + if not dry_run: + cursor.execute(MODIFY_QUERY) + self._print_success("Done") + else: + self._print_warn("[DRY-RUN] SQL that would be executed:") + self._print_info(MODIFY_QUERY) + except Exception as e: + self._print_error(f"Migration failed: {str(e)}") + raise + + def _print_info(self, msg): + self.stdout.write(msg) + + def _print_success(self, msg): + self.stdout.write(self.style.SUCCESS(msg)) + + def _print_error(self, msg): + self.stdout.write(self.style.ERROR(msg)) + + def _print_warn(self, msg): + self.stdout.write(self.style.WARNING(msg)) diff --git a/pulpcore/app/models/content.py b/pulpcore/app/models/content.py index a0ba46f627..0534dd42bd 100644 --- a/pulpcore/app/models/content.py +++ b/pulpcore/app/models/content.py @@ -703,6 +703,7 @@ class RemoteArtifact(BaseModel, QueryMixin): sha256 (models.CharField): The expected SHA-256 checksum of the file. sha384 (models.CharField): The expected SHA-384 checksum of the file. sha512 (models.CharField): The expected SHA-512 checksum of the file. + failed_at (models.DateTimeField): The datetime of last download attempt failure. Relations: diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 32c121b499..af1766e175 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -290,6 +290,11 @@ "EXPIRES_TTL": 600, # 10 minutes } +# The time a RemoteArtifact will be ignored after failure. +# In on-demand, if a fetching content from a remote failed due to corrupt data, +# the corresponding RemoteArtifact will be ignored for that time (seconds). +REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = 5 * 60 # 5 minutes + SPECTACULAR_SETTINGS = { "SERVE_URLCONF": ROOT_URLCONF, "DEFAULT_GENERATOR_CLASS": "pulpcore.openapi.PulpSchemaGenerator", diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index 5d558b95c0..8ca36c6902 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -30,6 +30,28 @@ from pulpcore.exceptions.validation import InvalidSignatureError +# Backport workaround for https://github.com/pulp/pulpcore/pull/6064 +# If 'pulpcore-manager backport-patch-6064' was run, the field +# RemoteArtifact.failed_at will be set and the backport will take effect. +ENABLE_6064_BACKPORT_WORKAROUND = False + + +def failed_at_exists(connection, ra_class) -> bool: + """Whtether 'failed_at' exists in the database.""" + table_name = ra_class._meta.db_table + field_name = "failed_at" + CHECK_COL_QUERY = """ + SELECT COUNT(*) + FROM information_schema.columns + WHERE table_name = %s + AND column_name = %s; + """ + with connection.cursor() as cursor: + cursor.execute(CHECK_COL_QUERY, [table_name, field_name]) + field_exists = cursor.fetchone()[0] > 0 + return field_exists + + # a little cache so viewset_for_model doesn't have to iterate over every app every time _model_viewset_cache = {} STRIPPED_API_ROOT = settings.API_ROOT.strip("/") diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 77f292aa93..93686f0bf6 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -7,6 +7,7 @@ import struct from gettext import gettext as _ from functools import lru_cache +from datetime import timedelta from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError from aiohttp.web import FileResponse, StreamResponse, HTTPOk @@ -23,6 +24,7 @@ from asgiref.sync import sync_to_async import django +from django.utils import timezone from opentelemetry import metrics @@ -58,6 +60,7 @@ MetricsEmitter, get_domain, cache_key, + ENABLE_6064_BACKPORT_WORKAROUND, ) from pulpcore.exceptions import ( # noqa: E402 @@ -842,9 +845,9 @@ async def _stream_content_artifact(self, request, response, content_artifact): [pulpcore.plugin.models.ContentArtifact][] returned the binary data needed for the client. """ - # We should only retry with exceptions that happen before we receive any data + # We should only skip exceptions that happen before we receive any data # and start streaming, as we can't rollback data if something happens after that. - RETRYABLE_EXCEPTIONS = ( + SKIPPABLE_EXCEPTIONS = ( ClientResponseError, UnsupportedDigestValidationError, ClientConnectionError, @@ -853,11 +856,17 @@ async def _stream_content_artifact(self, request, response, content_artifact): remote_artifacts = content_artifact.remoteartifact_set.select_related( "remote" ).order_by_acs() + + if ENABLE_6064_BACKPORT_WORKAROUND: + protection_time = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN + remote_artifacts = remote_artifacts.exclude( + failed_at__gte=timezone.now() - timedelta(seconds=protection_time) + ) async for remote_artifact in remote_artifacts: try: response = await self._stream_remote_artifact(request, response, remote_artifact) return response - except RETRYABLE_EXCEPTIONS as e: + except SKIPPABLE_EXCEPTIONS as e: log.warning( "Could not download remote artifact at '{}': {}".format( remote_artifact.url, str(e) @@ -1161,14 +1170,29 @@ async def finalize(): try: download_result = await downloader.run() except DigestValidationError: + COOLDOWN_MSG = "" + if ENABLE_6064_BACKPORT_WORKAROUND: + remote_artifact.failed_at = timezone.now() + await remote_artifact.asave() + REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = ( + settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN + ) + COOLDOWN_MSG = ( + "- Marking this Remote to be ignored for " + f"{REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN=}s.\n\n" + ) await downloader.session.close() close_tcp_connection(request.transport._sock) raise RuntimeError( - f"We tried streaming {remote_artifact.url!r} to the client, but it " - "failed checkusm validation. " - "At this point, we cant recover from wrong data already sent, " - "so we are forcing the connection to close. " - "If this error persists, the remote server might be corrupted." + f"Pulp tried streaming {remote_artifact.url!r} to " + "the client, but it failed checksum validation.\n\n" + "We can't recover from wrong data already sent so we are:\n" + "- Forcing the connection to close.\n" + f"{COOLDOWN_MSG}" + "If the Remote is known to be fixed, try resyncing the associated repository.\n" + "If the Remote is known to be permanently corrupted, try removing " + "affected Pulp Remote, adding a good one and resyncing.\n" + "If the problem persists, please contact the Pulp team." ) if content_length := response.headers.get("Content-Length"): diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 11735ca04d..8dfa433820 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,17 +1,21 @@ """Tests related to content delivery.""" -from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError import hashlib -import pytest import subprocess +import uuid from urllib.parse import urljoin -from pulpcore.client.pulp_file import ( - RepositorySyncURL, -) +import pytest +from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError +from pulpcore.client.pulp_file import RepositorySyncURL from pulpcore.tests.functional.utils import download_file, get_files_in_manifest +import django + +django.setup() +from pulpcore.app.util import ENABLE_6064_BACKPORT_WORKAROUND # noqa: E402 + @pytest.mark.parallel def test_delete_remote_on_demand( @@ -116,8 +120,13 @@ def test_remote_content_changed_with_on_demand( ): """ GIVEN a remote synced on demand with fileA (e.g, digest=123), - WHEN on the remote server, fileA changed its content (e.g, digest=456), - THEN retrieving fileA from the content app will cause a connection-close/incomplete-response. + AND in the remote server, fileA changed its content (e.g, digest=456), + + WHEN the client first requests that content + THEN the content app will start a response but close the connection before finishing + + WHEN the client requests that content again (within the RA cooldown interval) + THEN the content app will return a 404 """ # GIVEN basic_manifest_path = write_3_iso_file_fixture_data_factory("basic") @@ -129,17 +138,113 @@ def test_remote_content_changed_with_on_demand( repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) distribution = file_distribution_factory(repository=repo.pulp_href) expected_file_list = list(get_files_in_manifest(remote.url)) - - # WHEN write_3_iso_file_fixture_data_factory("basic", overwrite=True) - # THEN get_url = urljoin(distribution.base_url, expected_file_list[0][0]) - with pytest.raises(ClientPayloadError, match="Response payload is not completed"): - download_file(get_url) - # Assert again with curl just to be sure. + # WHEN (first request) result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # THEN assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr + + # WHEN (second request) + result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # THEN + if ENABLE_6064_BACKPORT_WORKAROUND: + assert result.returncode == 0 + assert b"< HTTP/1.1 404 Not Found" in result.stderr + + +@pytest.mark.parallel +def test_handling_remote_artifact_on_demand_streaming_failure( + write_3_iso_file_fixture_data_factory, + file_repo_with_auto_publish, + file_remote_factory, + file_bindings, + monitor_task, + monitor_task_group, + file_distribution_factory, + gen_object_with_cleanup, + generate_server_and_remote, +): + """ + GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). + AND Only the ACS RemoteArtifact (that has priority on the content-app) is corrupted + + WHEN a client requests the content for the first time + THEN the client doesnt get any content + + WHEN a client requests the content for the second time + THEN the client gets the right content + """ + + # Plumbing + def create_simple_remote(manifest_path): + remote = file_remote_factory(manifest_path=manifest_path, policy="on_demand") + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + return remote + + def create_acs_remote(manifest_path): + acs_server, acs_remote = generate_server_and_remote( + manifest_path=manifest_path, policy="on_demand" + ) + acs = gen_object_with_cleanup( + file_bindings.AcsFileApi, + {"remote": acs_remote.pulp_href, "paths": [], "name": str(uuid.uuid4())}, + ) + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + def sync_publish_and_distribute(remote): + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + distribution = file_distribution_factory(repository=repo.pulp_href) + return distribution + + def refresh_acs(acs): + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + def get_original_content_info(remote): + expected_files = get_files_in_manifest(remote.url) + content_unit = list(expected_files)[0] + return content_unit[0], content_unit[1] + + def download_from_distribution(content, distribution): + content_unit_url = urljoin(distribution.base_url, content_name) + downloaded_file = download_file(content_unit_url) + actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest() + return actual_checksum + + # GIVEN + basic_manifest_path = write_3_iso_file_fixture_data_factory("basic", seed=123) + acs_manifest_path = write_3_iso_file_fixture_data_factory("acs", seed=123) + remote = create_simple_remote(basic_manifest_path) + distribution = sync_publish_and_distribute(remote) + acs = create_acs_remote(acs_manifest_path) + refresh_acs(acs) + write_3_iso_file_fixture_data_factory("acs", overwrite=True) # corrupt + + # WHEN/THEN (first request) + content_name, expected_checksum = get_original_content_info(remote) + + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_from_distribution(content_name, distribution) + + # WHEN/THEN (second request) + if ENABLE_6064_BACKPORT_WORKAROUND: + actual_checksum = download_from_distribution(content_name, distribution) + assert actual_checksum == expected_checksum + else: + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_from_distribution(content_name, distribution) diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index 4248cd6ee0..d5651099fc 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -4,6 +4,7 @@ import asyncio import hashlib import os +import random from aiohttp import web from dataclasses import dataclass @@ -103,10 +104,14 @@ async def _download_file(url, auth=None, headers=None): return MockDownload(body=await response.read(), response_obj=response) -def generate_iso(full_path, size=1024, relative_path=None): +def generate_iso(full_path, size=1024, relative_path=None, seed=None): """Generate a random file.""" with open(full_path, "wb") as fout: - contents = os.urandom(size) + if seed: + random.seed(seed) + contents = random.randbytes(size) + else: + contents = os.urandom(size) fout.write(contents) fout.flush() digest = hashlib.sha256(contents).hexdigest()