Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get provenance URL using the simple API #6

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions src/pip_plugin_pep740/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,49 @@
PluginType = Literal["dist-inspector"]


def _get_provenance_url(filename: str, index_host: str) -> str | None:
if filename.endswith(".tar.gz"):
name, _ = parse_sdist_filename(filename)
elif filename.endswith(".whl"):
name, _, _, _ = parse_wheel_filename(filename)
else:
# Unexpected file, ignore
return None

simple_index_package_url = (
builder.URIBuilder()
.add_scheme("https")
.add_host(index_host)
.add_path(f"simple/{name}/")
.geturl()
)
try:
r = requests.get(
url=simple_index_package_url,
headers={"Accept": "application/vnd.pypi.simple.v1+json"},
timeout=5,
)
r.raise_for_status()
except requests.RequestException as e:
msg = f"Error accessing PyPI simple API: {e}"
raise ValueError(msg) from e

try:
package_json = r.json()
except JSONDecodeError as e:
msg = f"Invalid PyPI simple index JSON response: {e}"
raise ValueError(msg) from e

matching_artifacts = [f for f in package_json["files"] if f["filename"] == filename]
if len(matching_artifacts) == 0:
msg = f"Could not find file {filename} using the simple API at {index_host}"
raise ValueError(msg)

artifact_info = matching_artifacts[0]
provenance_url: str | None = artifact_info.get("provenance")
return provenance_url


def _get_provenance(filename: str, url: str) -> Provenance | None:
"""Download the provenance for a given distribution."""
url_authority = rfc3986.api.uri_reference(url).authority
Expand All @@ -33,45 +76,29 @@ def _get_provenance(filename: str, url: str) -> Provenance | None:
else:
return None

if filename.endswith(".tar.gz"):
name, version = parse_sdist_filename(filename)
elif filename.endswith(".whl"):
name, version, _, _ = parse_wheel_filename(filename)
else:
# Unexpected file, ignore
provenance_url = _get_provenance_url(filename=filename, index_host=index_host)
if provenance_url is None:
# Can't verify artifacts uploaded without attestations
return None

provenance_url = (
builder.URIBuilder()
.add_scheme("https")
.add_host(index_host)
.add_path(f"integrity/{name}/{version}/{filename}/provenance")
.geturl()
)
try:
r = requests.get(
url=provenance_url,
params={"Accept": "application/vnd.pypi.integrity.v1+json"},
headers={"Accept": "application/vnd.pypi.integrity.v1+json"},
timeout=5,
)
r.raise_for_status()
except requests.HTTPError as e:
# If there is no provenance available, continue
if e.response.status_code == requests.codes.not_found:
return None
raise ValueError(e) from e
except requests.RequestException as e:
msg = f"Error downloading provenance file: {e}"
raise ValueError(msg) from e

try:
return Provenance.model_validate(r.json())
except ValidationError as e:
msg = f"Invalid provenance: {e}"
raise ValueError(msg) from e
except JSONDecodeError as e:
msg = f"Invalid provenance JSON: {e}"
raise ValueError(msg) from e
except ValidationError as e:
msg = f"Invalid provenance: {e}"
raise ValueError(msg) from e


def plugin_type() -> PluginType:
Expand Down
132 changes: 103 additions & 29 deletions test/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import pip_plugin_pep740

PACKAGE_NAME = "abi3info"
PACKAGE_VERSION_1 = "2024.10.8"
DIST_FILE_1 = Path("test/assets/abi3info-2024.10.8-py3-none-any.whl")
PROVENANCE_FILE_1 = Path("test/assets/abi3info-2024.10.8-py3-none-any.whl.provenance")

PACKAGE_VERSION_2 = "2024.10.3"
DIST_FILE_2 = Path("test/assets/abi3info-2024.10.3-py3-none-any.whl")
PROVENANCE_FILE_2 = Path("test/assets/abi3info-2024.10.3-py3-none-any.whl.provenance")

PACKAGE_VERSION_3 = "2024.10.8"
DIST_FILE_3 = Path("test/assets/abi3info-2024.10.8.tar.gz")
PROVENANCE_FILE_3 = Path("test/assets/abi3info-2024.10.8.tar.gz.provenance")

Expand All @@ -38,18 +35,26 @@ def test_plugin_type(self) -> None:
assert pip_plugin_pep740.plugin_type() == "dist-inspector"

@pytest.mark.parametrize(
("version", "filename", "provenance_file", "digest"),
("filename", "provenance_file", "digest"),
[
(PACKAGE_VERSION_1, DIST_FILE_1.name, PROVENANCE_FILE_1, DIST_DIGEST_1),
(PACKAGE_VERSION_3, DIST_FILE_3.name, PROVENANCE_FILE_3, DIST_DIGEST_3),
(DIST_FILE_1.name, PROVENANCE_FILE_1, DIST_DIGEST_1),
(DIST_FILE_3.name, PROVENANCE_FILE_3, DIST_DIGEST_3),
],
)
def test_pre_download_valid_provenance(
self, version: str, filename: str, provenance_file: Path, digest: str
self, filename: str, provenance_file: Path, digest: str
) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{version}/{filename}/provenance",
f"https://test.pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{filename}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{filename}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text=provenance_file.read_text(),
)
pip_plugin_pep740.pre_download(
Expand Down Expand Up @@ -77,8 +82,8 @@ def test_pre_download_invalid_filename(self) -> None:
def test_pre_download_no_provenance_found(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
status_code=404,
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}"}}]}}',
)
assert (
pip_plugin_pep740.pre_download(
Expand All @@ -89,20 +94,47 @@ def test_pre_download_no_provenance_found(self) -> None:
is None
)

def test_pre_download_index_http_error(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
status_code=403,
)
with pytest.raises(ValueError, match="403 Client Error"):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_index_timeout(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
exc=requests.exceptions.ConnectTimeout,
)
with pytest.raises(ValueError, match="Error accessing PyPI simple API"):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_provenance_download_error(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
status_code=403,
)
with pytest.raises(ValueError, match="403 Client Error"):
assert (
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)
is None
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_not_pypi_url(self) -> None:
Expand All @@ -118,23 +150,28 @@ def test_pre_download_not_pypi_url(self) -> None:
def test_pre_download_provenance_timeout(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
exc=requests.exceptions.ConnectTimeout,
)
with pytest.raises(ValueError, match="Error downloading provenance file"):
assert (
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)
is None
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_invalid_provenance(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text=PROVENANCE_FILE_2.read_text(),
)
with pytest.raises(
Expand All @@ -147,10 +184,43 @@ def test_pre_download_invalid_provenance(self) -> None:
digest=DIST_DIGEST_1,
)

def test_pre_download_invalid_index_json(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(f"https://pypi.org/simple/{PACKAGE_NAME}/", text="invalidjson")
with pytest.raises(
ValueError,
match="Invalid PyPI simple index JSON response",
):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_missing_package_from_index_json(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_2.name}", "provenance": "https://provenance_url"}}]}}',
)
with pytest.raises(
ValueError,
match=f"Could not find file {DIST_FILE_1.name} using the simple API at pypi.org",
):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_invalid_provenance_json(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text="invalidjson",
)
with pytest.raises(
Expand All @@ -168,7 +238,11 @@ def test_pre_download_malformed_provenance_valid_json(self) -> None:
provenance["attestation_bundles"] = "invalid"
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text=json.dumps(provenance),
)
with pytest.raises(
Expand Down