diff --git a/.circleci/config.yml b/.circleci/config.yml index 0ef0b07..4585351 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -126,7 +126,7 @@ jobs: - "run": name: "mypy" command: | - nix develop --command mypy --strict src + nix develop --command mypy src unittests: parameters: diff --git a/flake.lock b/flake.lock index 7f9e5af..4e2f799 100644 --- a/flake.lock +++ b/flake.lock @@ -2,11 +2,11 @@ "nodes": { "flake-utils": { "locked": { - "lastModified": 1656928814, - "narHash": "sha256-RIFfgBuKz6Hp89yRr7+NR5tzIAbn52h8vT6vXkYjZoM=", + "lastModified": 1659877975, + "narHash": "sha256-zllb8aq3YO3h8B/U0/J1WBgAL8EX5yWf5pMj3G0NAmc=", "owner": "numtide", "repo": "flake-utils", - "rev": "7e2a3b3dfd9af950a856d66b0a7d01e3c18aa249", + "rev": "c0e246b9b83f637f4681389ecabcb2681b4f3af0", "type": "github" }, "original": { @@ -28,11 +28,11 @@ ] }, "locked": { - "lastModified": 1659091740, - "narHash": "sha256-aNtnezQfUX1QS/bFno2H5661qIY/Rn7BmHnspuuyI+4=", + "lastModified": 1662635943, + "narHash": "sha256-1OBBlBzZ894or8eHZjyADOMnGH89pPUKYGVVS5rwW/0=", "owner": "DavHau", "repo": "mach-nix", - "rev": "f15ea8677df951cb4fe608945fd98725dcd033b3", + "rev": "65266b5cc867fec2cb6a25409dd7cd12251f6107", "type": "github" }, "original": { @@ -43,11 +43,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1659526864, - "narHash": "sha256-XFzXrc1+6DZb9hBgHfEzfwylPUSqVFJbQPs8eOgYufU=", + "lastModified": 1665132027, + "narHash": "sha256-zoHPqSQSENt96zTk6Mt1AP+dMNqQDshXKQ4I6MfjP80=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "478f3cbc8448b5852539d785fbfe9a53304133be", + "rev": "9ecc270f02b09b2f6a76b98488554dd842797357", "type": "github" }, "original": { @@ -60,11 +60,11 @@ "pypi-deps-db": { "flake": false, "locked": { - "lastModified": 1659688860, - "narHash": "sha256-b9CjUto5pfKaHVWnxJG7gpRqacJIryChXbN5K0Owo5o=", + "lastModified": 1665260206, + "narHash": "sha256-LukZvKMArRE3+5/kF7/YOypJ6XTKtkYKkgxYpg/pxHA=", "owner": "DavHau", "repo": "pypi-deps-db", - "rev": "a9ceaf65546ed01049467b4357f310cb0e340276", + "rev": "207b45139d020d459c8e2f70409668f1559d3e95", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 2a207d6..374b20a 100644 --- a/flake.nix +++ b/flake.nix @@ -65,7 +65,7 @@ devShells = withDefault - (devShellForVersions ["test"] supportedPythonVersions) + (devShellForVersions [] supportedPythonVersions) defaultPythonVersion; }); } diff --git a/pyproject.toml b/pyproject.toml index aa7e766..8b5045b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,11 @@ [tool.isort] profile = "black" multi_line_output = 3 + +[tool.mypy] +python_version = "3.10" +strict = true + +[[tool.mypy.overrides]] +module = "testtools.*" +ignore_missing_imports = true diff --git a/setup.cfg b/setup.cfg index 667ee44..023190d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,9 +17,11 @@ package_dir = packages = find: install_requires = attrs + parsec [options.extras_require] test = + testtools twisted hypothesis diff --git a/src/tahoe_capabilities/parser.py b/src/tahoe_capabilities/parser.py index af737e4..2bc318b 100644 --- a/src/tahoe_capabilities/parser.py +++ b/src/tahoe_capabilities/parser.py @@ -1,5 +1,15 @@ + +from __future__ import annotations + +__all__ = [ + "capability_from_string", + "ParseError", +] + +from typing import Callable, Type, TypeVar from base64 import b32decode as _b32decode -from typing import Callable, Dict, List, TypeVar, cast + +from parsec import Parser, string, many1, many, digit, times, one_of, ParseError from .types import ( Capability, @@ -29,9 +39,11 @@ WriteCapability, ) +T = TypeVar("T") +S = TypeVar("S") class NotRecognized(ValueError): - def __init__(self, prefix: List[str]) -> None: + def __init__(self, prefix: list[str]) -> None: super().__init__(f"Unrecognized capability type {prefix}") @@ -48,236 +60,238 @@ def _unb32str(s: str) -> bytes: return _b32decode(s.encode("ascii")) -def _parse_chk_verify(pieces: List[str]) -> CHKVerify: - verifykey = _unb32str(pieces[0]) - uri_extension_hash = _unb32str(pieces[1]) - needed = int(pieces[2]) - total = int(pieces[3]) - size = int(pieces[4]) - return CHKVerify(verifykey, uri_extension_hash, needed, total, size) - - -def _parse_chk_read(pieces: List[str]) -> CHKRead: - readkey = _unb32str(pieces[0]) - uri_extension_hash = _unb32str(pieces[1]) - needed = int(pieces[2]) - total = int(pieces[3]) - size = int(pieces[4]) - return CHKRead.derive(readkey, uri_extension_hash, needed, total, size) - - -def _parse_dir2_chk_verify(pieces: List[str]) -> CHKDirectoryVerify: - return CHKDirectoryVerify(_parse_chk_verify(pieces)) - - -def _parse_dir2_chk_read(pieces: List[str]) -> CHKDirectoryRead: - return CHKDirectoryRead(_parse_chk_read(pieces)) - - -def _parse_literal(pieces: List[str]) -> LiteralRead: - return LiteralRead(_unb32str(pieces[0])) - - -def _parse_dir2_literal_read(pieces: List[str]) -> LiteralDirectoryRead: - return LiteralDirectoryRead(_parse_literal(pieces)) +def writeable_from_string(s: str) -> WriteCapability | DirectoryWriteCapability: + """ + Parse a capability string into any kind of writeable object. + """ + cap = capability_from_string(s) + assert isinstance( + cap, + (SSKWrite, MDMFWrite, SSKDirectoryWrite, MDMFDirectoryWrite), + ) + return cap -def _parse_ssk_verify(pieces: List[str]) -> SSKVerify: - storage_index = _unb32str(pieces[0]) - fingerprint = _unb32str(pieces[1]) - return SSKVerify(storage_index, fingerprint) +def readable_from_string(s: str) -> ReadCapability | DirectoryReadCapability: + """ + Parse a capability string into any kind of readable object. + """ + cap = capability_from_string(s) + assert isinstance( + cap, + (SSKRead, MDMFRead, LiteralDirectoryRead, CHKDirectoryRead, SSKDirectoryRead, MDMFDirectoryRead), + ) + return cap -def _parse_ssk_read(pieces: List[str]) -> SSKRead: - readkey = _unb32str(pieces[0]) - fingerprint = _unb32str(pieces[1]) - return SSKRead.derive(readkey, fingerprint) +def immutable_readonly_from_string(s: str) -> ImmutableReadCapability: + """ + Parse a capability string into any kind of readable, immutable object. + """ + cap = capability_from_string(s) + assert isinstance(cap, (LiteralRead, CHKRead)) + return cap -def _parse_dir2_ssk_verify(pieces: List[str]) -> SSKDirectoryVerify: - return SSKDirectoryVerify(_parse_ssk_verify(pieces)) +def immutable_directory_from_string(s: str) -> ImmutableDirectoryReadCapability: + """ + Parse a capability string into any kind of readable, immutable + directory. + """ + cap = capability_from_string(s) + assert isinstance(cap, (LiteralDirectoryRead, CHKDirectoryRead)) + return cap -def _parse_dir2_ssk_read(pieces: List[str]) -> SSKDirectoryRead: - return SSKDirectoryRead(_parse_ssk_read(pieces)) +def readonly_directory_from_string(s: str) -> DirectoryReadCapability: + """ + Parse a capability string into a read capability for a mutable + directory. + :raise ValueError: If the string represents a capability that is not + read-only, is not for a mutable, or is not for a directory. + """ + cap = capability_from_string(s) + assert isinstance(cap, (SSKDirectoryRead, MDMFDirectoryRead)) + return cap -def _parse_mdmf_verify(pieces: List[str]) -> MDMFVerify: - storage_index = _unb32str(pieces[0]) - fingerprint = _unb32str(pieces[1]) - return MDMFVerify(storage_index, fingerprint) +def writeable_directory_from_string(s: str) -> DirectoryWriteCapability: + """ + Parse a capability string into a write capability for a mutable + directory. -def _parse_mdmf_read(pieces: List[str]) -> MDMFRead: - readkey = _unb32str(pieces[0]) - fingerprint = _unb32str(pieces[1]) - return MDMFRead.derive(readkey, fingerprint) + :raise ValueError: If the string represents a capability that is writeable + or is not for a directory. + """ + cap = capability_from_string(s) + assert isinstance(cap, (SSKDirectoryWrite, MDMFDirectoryWrite)) + return cap -def _parse_dir2_mdmf_read(pieces: List[str]) -> MDMFDirectoryRead: - return MDMFDirectoryRead(_parse_mdmf_read(pieces)) +def _b32string(alphabet: str, exact_bits: None | int = None) -> Parser[list[str]]: + """ + Parse a base32-encoded string. + :param alphabet: The alphabet to use. Must be 32 characters long. -def _parse_dir2_mdmf_verify(pieces: List[str]) -> MDMFDirectoryVerify: - return MDMFDirectoryVerify(_parse_mdmf_verify(pieces)) + :exact_bits: See ``b32string``. + :return: A parser that consumes and returns the matched base32 characters + (still encoded). + """ + assert len(alphabet) == 32 -def _parse_ssk_write(pieces: List[str]) -> SSKWrite: - writekey = _unb32str(pieces[0]) - fingerprint = _unb32str(pieces[1]) - return SSKWrite.derive(writekey, fingerprint) + if exact_bits is None: + return many(one_of(alphabet)) + full, extra = divmod(exact_bits, 5) + stem = times( + one_of(alphabet), + full, + full, + ) + if extra == 0: + return stem + return (stem + one_of("".join(set(_trailing_b32chars(alphabet, extra))))).parsecmap( + lambda xs_x: xs_x[0] + [xs_x[1]] + ) -def _parse_dir2_ssk_write(pieces: List[str]) -> SSKDirectoryWrite: - return SSKDirectoryWrite(_parse_ssk_write(pieces)) +def b32string(exact_bits: None | int = None) -> Parser[bytes]: + """ + Parse a base32-encoded string. -def _parse_mdmf_write(pieces: List[str]) -> MDMFWrite: - writekey = _unb32str(pieces[0]) - fingerprint = _unb32str(pieces[1]) - return MDMFWrite.derive(writekey, fingerprint) + :param alphabet: The alphabet to use. Must be 32 characters long. + :param exact_bits: If ``None`` parse a string of any length. Otherwise, + parse a base32 string that represents an encoded string of exactly + this many bits. -def _parse_dir2_mdmf_write(pieces: List[str]) -> MDMFDirectoryWrite: - return MDMFDirectoryWrite(_parse_mdmf_write(pieces)) + :return: A parser that consumes and results in the decoded string. + """ + # RFC3548 standard used by Gnutella, Content-Addressable Web, THEX, Bitzi, + # Web-Calculus... + rfc3548_alphabet = "abcdefghijklmnopqrstuvwxyz234567" + return _b32string(rfc3548_alphabet, exact_bits).parsecmap( + lambda xs: _unb32str("".join(xs)) + ) -_parsers: Dict[str, Callable[[List[str]], Capability]] = { - "LIT": _parse_literal, - "CHK-Verifier": _parse_chk_verify, - "CHK": _parse_chk_read, - "SSK-Verifier": _parse_ssk_verify, - "SSK-RO": _parse_ssk_read, - "SSK": _parse_ssk_write, - "MDMF-Verifier": _parse_mdmf_verify, - "MDMF-RO": _parse_mdmf_read, - "MDMF": _parse_mdmf_write, - "DIR2-LIT": _parse_dir2_literal_read, - "DIR2-CHK-Verifier": _parse_dir2_chk_verify, - "DIR2-CHK": _parse_dir2_chk_read, - "DIR2-Verifier": _parse_dir2_ssk_verify, - "DIR2-RO": _parse_dir2_ssk_read, - "DIR2": _parse_dir2_ssk_write, - "DIR2-MDMF-Verifier": _parse_dir2_mdmf_verify, - "DIR2-MDMF-RO": _parse_dir2_mdmf_read, - "DIR2-MDMF": _parse_dir2_mdmf_write, -} -_A = TypeVar("_A") +def _trailing_b32chars(alphabet: str, bits: int) -> str: + """ + Find the part of the base32 alphabet that is required and allowed to + express the given number of bits of encoded data. + This is used to match the end of a base32 string where the length of the + encoded data is not a multiple of 5 bits (the base32 word size). + """ + stem = alphabet[:: 1 << bits] + if bits == 0: + return stem + return stem + _trailing_b32chars(alphabet, bits - 1) -def _uri_parser(s: str, parsers: Dict[str, Callable[[List[str]], _A]]) -> _A: - pieces = s.split(":") - if pieces[0] == "URI": - try: - parser = parsers[pieces[1]] - except KeyError: - raise NotRecognized(pieces[:2]) - else: - return parser(pieces[2:]) - raise NotRecognized(pieces[:1]) +# Match the common separator between components of capability strings. +_sep = string(":") -def writeable_from_string(s: str) -> WriteCapability: - return cast( - WriteCapability, - _uri_parser( - s, - { - "SSK": _parse_ssk_write, - "MDMF": _parse_mdmf_write, - "DIR2": _parse_dir2_ssk_write, - "DIR2-MDMF": _parse_dir2_mdmf_write, - }, - ), - ) +# Match a natural number +_natural = many1(digit()).parsecmap("".join).parsecmap(int) +# Match a base32-encoded binary string with the length of a key. +_key = b32string(128) -def readable_from_string(s: str) -> ReadCapability: - return cast( - ReadCapability, - _uri_parser( - s, - { - "LIT": _parse_literal, - "CHK": _parse_chk_read, - "SSK-RO": _parse_ssk_read, - "MDMF-RO": _parse_mdmf_write, - }, - ), - ) +# Match a base32-encoded binary string with the length of a fingerprint or UEB hash. +_uri_extension_hash = b32string(256) +# Match the base32-encoded data portion of a literal capability. This is not +# length limited though in practice literals are only used for data of 55 +# bytes or less. +_lit = b32string() -def immutable_readonly_from_string(s: str) -> ImmutableReadCapability: - return cast( - ImmutableReadCapability, - _uri_parser( - s, - { - "LIT": _parse_literal, - "CHK": _parse_chk_read, - }, - ), - ) +# Match the needed shares, total shares, and data size numbers in a CHK +# capability string. +_chk_params = times(_sep >> _natural, 3, 3) +# Match all of the data components (but not the type prefix) of a CHK +# capability string. +_chk = _key + (_sep >> _uri_extension_hash) + _chk_params -def immutable_directory_from_string(s: str) -> ImmutableDirectoryReadCapability: - return cast( - ImmutableDirectoryReadCapability, - _uri_parser( - s, - { - "DIR2-LIT": _parse_dir2_literal_read, - "DIR2-CHK": _parse_dir2_chk_read, - }, - ), - ) +# Match all of the data components (but not the type prefix) of an SSK (SDMF +# or MDMF) capability string. +# +# Tahoe-LAFS calls the components of SSK "storage_index" and "fingerprint" but +# they are syntactically the same as "key" and "uri_extension_hash" from CHK. +_ssk = _key + (_sep >> _uri_extension_hash) -def readonly_directory_from_string(s: str) -> DirectoryReadCapability: +def _capability_parser() -> Parser[Capability]: """ - Parse a capability string into a read capability for a mutable - directory. - - :raise ValueError: If the string represents a capability that is not - read-only, is not for a mutable, or is not for a directory. + Parse any kind of capability string. """ - return cast( - DirectoryReadCapability, - _uri_parser( - s, - { - "DIR2-RO": _parse_dir2_ssk_read, - "DIR2-MDMF-RO": _parse_dir2_mdmf_read, - }, - ), + lit_read = string("LIT:") >> _lit.parsecmap(LiteralRead) + + def chk_glue(f: Callable[[bytes, bytes, int, int, int], T]) -> Callable[[tuple[tuple[bytes, bytes], list[int]]], T]: + def g(values: tuple[tuple[bytes, bytes], list[int]]) -> T: + ((key, uri_extension_hash), [a, b, c]) = values + return f(key, uri_extension_hash, a, b, c) + + return g + + chk_verify = string("CHK-Verifier:") >> _chk.parsecmap(chk_glue(CHKVerify)) + chk = string("CHK:") >> _chk.parsecmap(chk_glue(CHKRead.derive)) + + ssk_verify = string("SSK-Verifier:") >> _ssk.parsecmap(lambda p: SSKVerify(*p)) + ssk_read = string("SSK-RO:") >> _ssk.parsecmap(lambda p: SSKRead.derive(*p)) + ssk = string("SSK:") >> _ssk.parsecmap(lambda p: SSKWrite.derive(*p)) + + mdmf_verify = string("MDMF-Verifier:") >> _ssk.parsecmap(lambda p: MDMFVerify(*p)) + mdmf_read = string("MDMF-RO:") >> _ssk.parsecmap(lambda p: MDMFRead.derive(*p)) + mdmf = string("MDMF:") >> _ssk.parsecmap(lambda p: MDMFWrite.derive(*p)) + + def dir2(file_parser: Parser[T], dir_type: Callable[[T], S]) -> Parser[S]: + return string("DIR2-") >> file_parser.parsecmap(dir_type) + + return string("URI:") >> ( + lit_read + ^ chk_verify + ^ chk + ^ ssk_verify + ^ ssk_read + ^ ssk + ^ mdmf_verify + ^ mdmf_read + ^ mdmf + # Directory variations, starting with ssk which breaks the pattern by + # leaving "SSK-" out. + ^ ( + string("DIR2-Verifier:") + >> _ssk.parsecmap(lambda p: SSKDirectoryVerify(SSKVerify(*p))) + ) + ^ ( + string("DIR2-RO:") + >> _ssk.parsecmap(lambda p: SSKDirectoryRead(SSKRead.derive(*p))) + ) + ^ ( + string("DIR2:") + >> _ssk.parsecmap(lambda p: SSKDirectoryWrite(SSKWrite.derive(*p))) + ) + # # And then the rest + ^ dir2(lit_read, LiteralDirectoryRead) + ^ dir2(chk_verify, CHKDirectoryVerify) + ^ dir2(chk, CHKDirectoryRead) + ^ dir2(mdmf_verify, MDMFDirectoryVerify) + ^ dir2(mdmf_read, MDMFDirectoryRead) + ^ dir2(mdmf, MDMFDirectoryWrite) ) -def writeable_directory_from_string(s: str) -> DirectoryWriteCapability: - """ - Parse a capability string into a write capability for a mutable - directory. - - :raise ValueError: If the string represents a capability that is writeable - or is not for a directory. - """ - return cast( - DirectoryWriteCapability, - _uri_parser( - s, - { - "DIR2": _parse_dir2_ssk_write, - "DIR2-MDMF": _parse_dir2_mdmf_write, - }, - ), - ) +_capability = _capability_parser() def capability_from_string(s: str) -> Capability: - pieces = s.split(":") - if pieces[0] == "URI": - parser = _parsers[pieces[1]] - return parser(pieces[2:]) - - raise NotRecognized(pieces[:1]) + """ + Parse any known capability string. + """ + return _capability.parse(s) diff --git a/src/tahoe_capabilities/test/test_capabilities.py b/src/tahoe_capabilities/test/test_capabilities.py index e85c235..18e6248 100644 --- a/src/tahoe_capabilities/test/test_capabilities.py +++ b/src/tahoe_capabilities/test/test_capabilities.py @@ -1,7 +1,11 @@ +from base64 import b32encode as _b32encode from operator import attrgetter -from unittest import TestCase +from testtools import TestCase +from testtools.content import text_content +from testtools.matchers import Equals, raises from hypothesis import assume, given +from hypothesis.strategies import integers, binary from tahoe_capabilities import ( Capability, @@ -10,11 +14,103 @@ digested_capability_string, ) from tahoe_capabilities.strategies import capabilities +from tahoe_capabilities.parser import ( + _sep, + _natural, + _key, + _lit, + _chk_params, + _chk, + ParseError, +) + + +def b32encode(bs: bytes) -> str: + return _b32encode(bs).lower().decode("ascii").rstrip("=") -class ParseTests(TestCase): +class ParseTests(TestCase): # type: ignore[misc] maxDiff = None + def test_sep(self) -> None: + """ + ``_sep`` parses only ":". + """ + self.expectThat(_sep.parse(":"), Equals(":")) + self.expectThat(lambda: _sep.parse("x"), raises(ParseError)) + + @given(integers(min_value=0)) + def test_natural(self, n: int) -> None: + """ + ``_natural`` parses non-negative integers. + """ + self.assertThat(_natural.parse(str(n)), Equals(n)) + + def test_natural_fail(self) -> None: + """ + ``_natural`` rejects strings that contain non-digits. + """ + self.assertThat(lambda: _natural.parse("hello"), raises(ParseError)) + self.assertThat(lambda: _natural.parse("-1"), raises(ParseError)) + + @given(binary(min_size=16, max_size=16)) + def test_key(self, bs: bytes) -> None: + """ + ``_key`` parses base32-encoded 128 bit strings. + """ + self.assertThat( + _key.parse(b32encode(bs)), + Equals(bs), + ) + + @given(binary(max_size=15)) + def test_key_fail(self, bs: bytes) -> None: + """ + ``_key`` rejects strings shorter than 128 bits. + """ + self.assertThat( + lambda: _key.parse(b32encode(bs)), + raises(ParseError), + ) + + @given(binary()) + def test_lit(self, bs: bytes) -> None: + """ + ``_lit`` parses base32-encoded strings of any length. + """ + self.assertThat( + _lit.parse(b32encode(bs)), + Equals(bs), + ) + + @given(integers(min_value=1), integers(min_value=1), integers(min_value=1)) + def test_chk_params(self, a: int, b: int, c: int) -> None: + """ + ``_chk_params`` parses strings like:: + + ::: + """ + self.assertThat( + _chk_params.parse(f":{a}:{b}:{c}"), + Equals([a, b, c]), + ) + + @given( + binary(min_size=16, max_size=16), + binary(min_size=32, max_size=32), + integers(min_value=1), + integers(min_value=1), + integers(min_value=1), + ) + def test_chk(self, key: bytes, ueh: bytes, a: int, b: int, c: int) -> None: + """ + ``_chk`` parses a key, sep, uri hash extension, and chk parmaeters. + """ + self.assertThat( + _chk.parse(f"{b32encode(key)}:{b32encode(ueh)}:{a}:{b}:{c}"), + Equals(((key, ueh), [a, b, c])), + ) + @given(capabilities()) def test_from_string_roundtrip(self, cap: Capability) -> None: """ @@ -22,6 +118,7 @@ def test_from_string_roundtrip(self, cap: Capability) -> None: ``danger_real_capability_string``. """ cap_str = danger_real_capability_string(cap) + self.addDetail("cap", text_content(cap_str)) cap_parsed = capability_from_string(cap_str) self.assertEqual(cap_parsed, cap) @@ -64,7 +161,7 @@ def test_digested_capability_string_distinct( reader = attrgetter("reader") -class VectorTests(TestCase): +class VectorTests(TestCase): # type: ignore[misc] """ Test Tahoe-Capabilities behavior on hard-coded values against known-correct test vectors extracted from Tahoe-LAFS. @@ -107,9 +204,17 @@ class VectorTests(TestCase): ) def test_vector(self) -> None: + """ + Certain known-valid capability strings can be parsed, diminished, + and serialized to the correct known-valid diminished capability + strings. + """ for index, (description, start, transform, expected) in self.vector: + parsed = capability_from_string(start) + transformed = transform(parsed) + serialized = danger_real_capability_string(transformed) self.assertEqual( - danger_real_capability_string(transform(capability_from_string(start))), + serialized, expected, f"(#{index}) {description}({start}) != {expected}", ) diff --git a/src/tahoe_capabilities/types.py b/src/tahoe_capabilities/types.py index 7a58ac5..fa791d8 100644 --- a/src/tahoe_capabilities/types.py +++ b/src/tahoe_capabilities/types.py @@ -1,4 +1,6 @@ -from typing import Tuple, Union +from __future__ import annotations + +from typing import Union from attrs import field, frozen @@ -9,10 +11,10 @@ class LiteralRead: data: bytes prefix: str = "LIT" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.data,) @@ -20,10 +22,10 @@ def secrets(self) -> Tuple[bytes, ...]: class LiteralDirectoryRead: cap_object: LiteralRead prefix: str = "DIR2-LIT" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @@ -37,11 +39,11 @@ class CHKVerify: prefix: str = "CHK-Verifier" @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.storage_index, self.uri_extension_hash) @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return (str(self.needed), str(self.total), str(self.size)) @@ -78,11 +80,11 @@ def size(self) -> int: return self.verifier.size @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.readkey, self.verifier.uri_extension_hash) @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.verifier.suffix @@ -92,11 +94,11 @@ class CHKDirectoryVerify: prefix: str = "DIR2-CHK-Verifier" @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -110,11 +112,11 @@ def verifier(self) -> CHKDirectoryVerify: return CHKDirectoryVerify(self.cap_object.verifier) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -123,10 +125,10 @@ class SSKVerify: storage_index: bytes fingerprint: bytes prefix: str = "SSK-Verifier" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.storage_index, self.fingerprint) @@ -135,7 +137,7 @@ class SSKRead: readkey: bytes = field(repr=False) verifier: SSKVerify prefix: str = "SSK-RO" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @classmethod def derive(cls, readkey: bytes, fingerprint: bytes) -> "SSKRead": @@ -143,7 +145,7 @@ def derive(cls, readkey: bytes, fingerprint: bytes) -> "SSKRead": return SSKRead(readkey, SSKVerify(storage_index, fingerprint)) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.readkey, self.verifier.fingerprint) @@ -152,7 +154,7 @@ class SSKWrite: writekey: bytes = field(repr=False) reader: SSKRead prefix: str = "SSK" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @classmethod def derive(cls, writekey: bytes, fingerprint: bytes) -> "SSKWrite": @@ -160,7 +162,7 @@ def derive(cls, writekey: bytes, fingerprint: bytes) -> "SSKWrite": return SSKWrite(writekey, SSKRead.derive(readkey, fingerprint)) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.writekey, self.reader.verifier.fingerprint) @@ -170,11 +172,11 @@ class SSKDirectoryVerify: prefix: str = "DIR2-Verifier" @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -188,11 +190,11 @@ def verifier(self) -> SSKDirectoryVerify: return SSKDirectoryVerify(self.cap_object.verifier) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -206,11 +208,11 @@ def reader(self) -> SSKDirectoryRead: return SSKDirectoryRead(self.cap_object.reader) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -219,10 +221,10 @@ class MDMFVerify: storage_index: bytes fingerprint: bytes prefix: str = "MDMF-Verifier" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.storage_index, self.fingerprint) @@ -231,7 +233,7 @@ class MDMFRead: readkey: bytes = field(repr=False) verifier: MDMFVerify prefix: str = "MDMF-RO" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @classmethod def derive(cls, readkey: bytes, fingerprint: bytes) -> "MDMFRead": @@ -239,7 +241,7 @@ def derive(cls, readkey: bytes, fingerprint: bytes) -> "MDMFRead": return MDMFRead(readkey, MDMFVerify(storage_index, fingerprint)) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.readkey, self.verifier.fingerprint) @@ -248,7 +250,7 @@ class MDMFWrite: writekey: bytes = field(repr=False) reader: MDMFRead prefix: str = "MDMF" - suffix: Tuple[str, ...] = field(init=False, default=()) + suffix: tuple[str, ...] = field(init=False, default=()) @classmethod def derive(cls, writekey: bytes, fingerprint: bytes) -> "MDMFWrite": @@ -256,7 +258,7 @@ def derive(cls, writekey: bytes, fingerprint: bytes) -> "MDMFWrite": return MDMFWrite(writekey, MDMFRead.derive(readkey, fingerprint)) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return (self.writekey, self.reader.verifier.fingerprint) @@ -266,11 +268,11 @@ class MDMFDirectoryVerify: prefix: str = "DIR2-MDMF-Verifier" @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -284,11 +286,11 @@ def verifier(self) -> MDMFDirectoryVerify: return MDMFDirectoryVerify(self.cap_object.verifier) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix @@ -302,11 +304,11 @@ def reader(self) -> MDMFDirectoryRead: return MDMFDirectoryRead(self.cap_object.reader) @property - def secrets(self) -> Tuple[bytes, ...]: + def secrets(self) -> tuple[bytes, ...]: return self.cap_object.secrets @property - def suffix(self) -> Tuple[str, ...]: + def suffix(self) -> tuple[str, ...]: return self.cap_object.suffix