diff --git a/CHANGES/5012.bugfix b/CHANGES/5012.bugfix new file mode 100644 index 0000000000..199235e30e --- /dev/null +++ b/CHANGES/5012.bugfix @@ -0,0 +1,3 @@ +Fixed content-app behavior for the case where the client would get a 200 response for a package +streamed from a Remote which did not match the expected checksum. +Now, the connection is closed before finalizing the response. diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index 92f2c159e1..ee1e8dcefa 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -83,8 +83,8 @@ def file_fixtures_root(tmp_path): @pytest.fixture def write_3_iso_file_fixture_data_factory(file_fixtures_root): - def _write_3_iso_file_fixture_data_factory(name): - file_fixtures_root.joinpath(name).mkdir() + def _write_3_iso_file_fixture_data_factory(name, overwrite=False): + file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite) file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso")) file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso")) file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso")) diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 3d92b13518..77f292aa93 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -4,6 +4,7 @@ import os import re import socket +import struct from gettext import gettext as _ from functools import lru_cache @@ -59,7 +60,10 @@ cache_key, ) -from pulpcore.exceptions import UnsupportedDigestValidationError # noqa: E402 +from pulpcore.exceptions import ( # noqa: E402 + UnsupportedDigestValidationError, + DigestValidationError, +) from jinja2 import Template # noqa: E402: module level not at top of file from pulpcore.cache import AsyncContentCache # noqa: E402 @@ -1147,13 +1151,25 @@ async def finalize(): await original_finalize() downloader = remote.get_downloader( - remote_artifact=remote_artifact, headers_ready_callback=handle_response_headers + remote_artifact=remote_artifact, + headers_ready_callback=handle_response_headers, ) original_handle_data = downloader.handle_data downloader.handle_data = handle_data original_finalize = downloader.finalize downloader.finalize = finalize - download_result = await downloader.run() + try: + download_result = await downloader.run() + except DigestValidationError: + await downloader.session.close() + close_tcp_connection(request.transport._sock) + raise RuntimeError( + f"We tried streaming {remote_artifact.url!r} to the client, but it " + "failed checkusm validation. " + "At this point, we cant recover from wrong data already sent, " + "so we are forcing the connection to close. " + "If this error persists, the remote server might be corrupted." + ) if content_length := response.headers.get("Content-Length"): self._report_served_artifact_size(int(content_length)) @@ -1176,3 +1192,13 @@ def _report_served_artifact_size(self, size): "content_app_name": _get_content_app_name(), } self.artifacts_size_counter.add(size, attributes) + + +def close_tcp_connection(sock): + """Configure socket to close TCP connection immediately.""" + try: + l_onoff = 1 + l_linger = 0 # 0 seconds timeout - immediate close + sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", l_onoff, l_linger)) + except (socket.error, OSError) as e: + log.warning(f"Error configuring socket for force close: {e}") diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 1054afc042..11735ca04d 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,8 +1,9 @@ """Tests related to content delivery.""" -from aiohttp.client_exceptions import ClientResponseError +from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError import hashlib import pytest +import subprocess from urllib.parse import urljoin from pulpcore.client.pulp_file import ( @@ -102,3 +103,43 @@ def test_remote_artifact_url_update( actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest() expected_checksum = expected_file_list[0][1] assert expected_checksum == actual_checksum + + +@pytest.mark.parallel +def test_remote_content_changed_with_on_demand( + write_3_iso_file_fixture_data_factory, + file_repo_with_auto_publish, + file_remote_ssl_factory, + file_bindings, + monitor_task, + file_distribution_factory, +): + """ + GIVEN a remote synced on demand with fileA (e.g, digest=123), + WHEN on the remote server, fileA changed its content (e.g, digest=456), + THEN retrieving fileA from the content app will cause a connection-close/incomplete-response. + """ + # GIVEN + basic_manifest_path = write_3_iso_file_fixture_data_factory("basic") + remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="on_demand") + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + distribution = file_distribution_factory(repository=repo.pulp_href) + expected_file_list = list(get_files_in_manifest(remote.url)) + + # WHEN + write_3_iso_file_fixture_data_factory("basic", overwrite=True) + + # THEN + get_url = urljoin(distribution.base_url, expected_file_list[0][0]) + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_file(get_url) + + # Assert again with curl just to be sure. + result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + assert result.returncode == 18 + assert b"* Closing connection 0" in result.stderr + assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr