From 1e1e3cf8cd2cf4fe02edb06bc66f2064ca7bcba2 Mon Sep 17 00:00:00 2001 From: Mario Vega Date: Tue, 23 Jul 2024 12:34:24 -0600 Subject: [PATCH] new(tests): EIP-7702 - Set EOA account code for one transaction (Devnet-1) (#621) * feat(fw): Add type-4 transaction and authorization tuple type * new(tests): Add EIP-7702 folder, first test * tests: create contract * new(tests): more 7702 tests * new(tests): EIP-7702 * new(tests): EIP-7702 * typo * fix(fw): minor refactor * new(tests): parametrize sanity test with and without balance * fix(tests): fixes * fix(fw): nonce field in the auth tuple * fix(tests): fix some of the tests * fix(tests): test fixes and skips * fix(test): skip * fix(specs): output the authorization list to the fixture * nit * more tests * fix(fw): State test type-4 fix * fix(fw): Transaction type checks * fix(fixtures): Fixture transaciton type-4 test * fix(base_types): Storage methods * fix(tests): storage method usage * changelog --- docs/CHANGELOG.md | 1 + .../composite_types.py | 29 +- src/ethereum_test_fixtures/blockchain.py | 20 + src/ethereum_test_fixtures/state.py | 19 + .../tests/test_blockchain.py | 48 + src/ethereum_test_tools/__init__.py | 2 + src/ethereum_test_types/__init__.py | 2 + src/ethereum_test_types/types.py | 169 +++- tests/prague/eip7702_eoa_code_tx/__init__.py | 3 + tests/prague/eip7702_eoa_code_tx/spec.py | 29 + .../eip7702_eoa_code_tx/test_eoa_code_txs.py | 916 ++++++++++++++++++ 11 files changed, 1216 insertions(+), 22 deletions(-) create mode 100644 tests/prague/eip7702_eoa_code_tx/__init__.py create mode 100644 tests/prague/eip7702_eoa_code_tx/spec.py create mode 100644 tests/prague/eip7702_eoa_code_tx/test_eoa_code_txs.py diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c92f28f60b..ca3d40f1d7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -29,6 +29,7 @@ Test fixtures for use by clients are available for each release on the [Github r - ✨ Add tests for subcontainer kind validation from [EIP-7620: EOF Contract Creation](https://eips.ethereum.org/EIPS/eip-7620) for the cases with deeply nested containers and non-first code sections ([#676](https://github.com/ethereum/execution-spec-tests/pull/676)). - ✨ Add tests for runtime stack overflow at CALLF instruction from [EIP-4750: EOF - Functions](https://eips.ethereum.org/EIPS/eip-4750) ([#678](https://github.com/ethereum/execution-spec-tests/pull/678)). - ✨ Add tests for runtime stack overflow at JUMPF instruction from [EIP-6206: EOF - JUMPF and non-returning functions](https://eips.ethereum.org/EIPS/eip-6206) ([#690](https://github.com/ethereum/execution-spec-tests/pull/690)). +- ✨ Add tests for Devnet-1 version of [EIP-7702: Set EOA account code](https://eips.ethereum.org/EIPS/eip-7702) ([#621](https://github.com/ethereum/execution-spec-tests/pull/621)) ### 🛠️ Framework diff --git a/src/ethereum_test_base_types/composite_types.py b/src/ethereum_test_base_types/composite_types.py index 208e8e3575..7cd17f8736 100644 --- a/src/ethereum_test_base_types/composite_types.py +++ b/src/ethereum_test_base_types/composite_types.py @@ -2,10 +2,9 @@ Base composite types for Ethereum test cases. """ from dataclasses import dataclass -from itertools import count -from typing import Any, ClassVar, Dict, Iterator, SupportsBytes, Type, TypeAlias +from typing import Any, ClassVar, Dict, SupportsBytes, Type, TypeAlias -from pydantic import Field, RootModel, TypeAdapter +from pydantic import Field, PrivateAttr, RootModel, TypeAdapter from .base_types import Address, Bytes, Hash, HashInt, HexNumber, ZeroPaddedHexNumber from .conversions import BytesConvertible, NumberConvertible @@ -24,7 +23,7 @@ class Storage(RootModel[Dict[StorageKeyValueType, StorageKeyValueType]]): root: Dict[StorageKeyValueType, StorageKeyValueType] = Field(default_factory=dict) - _current_slot: Iterator[int] = count(0) + _current_slot: int = PrivateAttr(0) StorageDictType: ClassVar[TypeAlias] = Dict[ str | int | bytes | SupportsBytes, str | int | bytes | SupportsBytes @@ -161,10 +160,23 @@ def __bool__(self) -> bool: """Returns True if the storage is not empty""" return any(v for v in self.root.values()) + def __add__(self, other: "Storage") -> "Storage": + """ + Returns a new storage that is the sum of two storages. + """ + return Storage({**self.root, **other.root}) + def keys(self) -> set[StorageKeyValueType]: """Returns the keys of the storage""" return set(self.root.keys()) + def set_next_slot(self, slot: int) -> "Storage": + """ + Sets the next slot to be used by `store_next`. + """ + self._current_slot = slot + return self + def store_next( self, value: StorageKeyValueTypeConvertible | StorageKeyValueType | bool ) -> StorageKeyValueType: @@ -174,10 +186,17 @@ def store_next( Increments the key counter so the next time this function is called, the next key is used. """ - slot = StorageKeyValueTypeAdapter.validate_python(next(self._current_slot)) + slot = StorageKeyValueTypeAdapter.validate_python(self._current_slot) + self._current_slot += 1 self[slot] = StorageKeyValueTypeAdapter.validate_python(value) return slot + def peek_slot(self) -> int: + """ + Peeks the next slot that will be used by `store_next`. + """ + return self._current_slot + def contains(self, other: "Storage") -> bool: """ Returns True if self contains all keys with equal value as diff --git a/src/ethereum_test_fixtures/blockchain.py b/src/ethereum_test_fixtures/blockchain.py index d430d37bf3..2133cbecf5 100644 --- a/src/ethereum_test_fixtures/blockchain.py +++ b/src/ethereum_test_fixtures/blockchain.py @@ -26,6 +26,7 @@ from ethereum_test_exceptions import EngineAPIError, ExceptionInstanceOrList from ethereum_test_forks import Fork from ethereum_test_types.types import ( + AuthorizationTupleGeneric, ConsolidationRequest, ConsolidationRequestGeneric, DepositRequest, @@ -321,11 +322,30 @@ def from_fixture_header( return new_payload +class FixtureAuthorizationTuple(AuthorizationTupleGeneric[ZeroPaddedHexNumber]): + """ + Authorization tuple for fixture transactions. + """ + + signer: Address | None = None + + @classmethod + def from_authorization_tuple( + cls, auth_tuple: AuthorizationTupleGeneric + ) -> "FixtureAuthorizationTuple": + """ + Returns a FixtureAuthorizationTuple from an AuthorizationTuple. + """ + return cls(**auth_tuple.model_dump()) + + class FixtureTransaction(TransactionFixtureConverter, TransactionGeneric[ZeroPaddedHexNumber]): """ Representation of an Ethereum transaction within a test Fixture. """ + authorization_list: List[FixtureAuthorizationTuple] | None = None + @classmethod def from_transaction(cls, tx: Transaction) -> "FixtureTransaction": """ diff --git a/src/ethereum_test_fixtures/state.py b/src/ethereum_test_fixtures/state.py index 514af9c8c7..7cf26fde8f 100644 --- a/src/ethereum_test_fixtures/state.py +++ b/src/ethereum_test_fixtures/state.py @@ -10,6 +10,7 @@ from ethereum_test_exceptions import TransactionExceptionInstanceOrList from ethereum_test_types.types import ( AccessList, + AuthorizationTupleGeneric, CamelModel, EnvironmentGeneric, Transaction, @@ -28,6 +29,23 @@ class FixtureEnvironment(EnvironmentGeneric[ZeroPaddedHexNumber]): prev_randao: Hash | None = Field(None, alias="currentRandom") # type: ignore +class FixtureAuthorizationTuple(AuthorizationTupleGeneric[ZeroPaddedHexNumber]): + """ + Authorization tuple for fixture transactions. + """ + + signer: Address | None = None + + @classmethod + def from_authorization_tuple( + cls, auth_tuple: AuthorizationTupleGeneric + ) -> "FixtureAuthorizationTuple": + """ + Returns a FixtureAuthorizationTuple from an AuthorizationTuple. + """ + return cls(**auth_tuple.model_dump()) + + class FixtureTransaction(TransactionFixtureConverter): """ Type used to describe a transaction in a state test. @@ -42,6 +60,7 @@ class FixtureTransaction(TransactionFixtureConverter): value: List[ZeroPaddedHexNumber] data: List[Bytes] access_lists: List[List[AccessList]] | None = None + authorization_list: List[FixtureAuthorizationTuple] | None = None max_fee_per_blob_gas: ZeroPaddedHexNumber | None = None blob_versioned_hashes: Sequence[Hash] | None = None sender: Address | None = None diff --git a/src/ethereum_test_fixtures/tests/test_blockchain.py b/src/ethereum_test_fixtures/tests/test_blockchain.py index c1772d3fbe..a5fa642b48 100644 --- a/src/ethereum_test_fixtures/tests/test_blockchain.py +++ b/src/ethereum_test_fixtures/tests/test_blockchain.py @@ -15,13 +15,16 @@ Bytes, Hash, HeaderNonce, + TestPrivateKey, ZeroPaddedHexNumber, to_json, ) from ethereum_test_exceptions import BlockException, EngineAPIError, TransactionException from ethereum_test_forks import Prague from ethereum_test_types import ( + EOA, AccessList, + AuthorizationTuple, ConsolidationRequest, DepositRequest, Requests, @@ -178,6 +181,51 @@ }, id="fixture_transaction_type_3_default_values", ), + pytest.param( + True, + FixtureTransaction.from_transaction( + Transaction( + ty=4, + max_fee_per_gas=7, + authorization_list=[ + AuthorizationTuple( + chain_id=1, + address=2, + nonce=[3], + signer=EOA(key=TestPrivateKey), + ) + ], + ).with_signature_and_sender() + ), + { + "type": "0x04", + "chainId": "0x01", + "nonce": "0x00", + "to": "0x00000000000000000000000000000000000000aa", + "value": "0x00", + "data": "0x", + "gasLimit": "0x5208", + "maxPriorityFeePerGas": "0x00", + "maxFeePerGas": "0x07", + "accessList": [], + "authorizationList": [ + { + "chainId": "0x01", + "address": Address(2).hex(), + "nonce": ["0x03"], + "v": "0x00", + "r": "0x796b0a59fe796b5aab79259988f4b18bb7966dc9aa0a01d226859057f539d8f6", + "s": "0x7456ad9b8b4e157d8a150ae7d568bb93e668bf1d5970756f7fe7b7f2472235fe", + "signer": "0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b", + } + ], + "v": "0x01", + "r": "0xb9f3ad929ffdb846cbe357fa25e6ab93cc6e10e76da170a12baf03f8a34ba141", + "s": "0x04992060cfa252f5ac18ac1ccb340a821497d50812a225646094d2ad08b8eeaa", + "sender": "0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b", + }, + id="fixture_transaction_type_4", + ), pytest.param( True, FixtureTransaction.from_transaction( diff --git a/src/ethereum_test_tools/__init__.py b/src/ethereum_test_tools/__init__.py index 2d6d16c9a8..5d5750b7c8 100644 --- a/src/ethereum_test_tools/__init__.py +++ b/src/ethereum_test_tools/__init__.py @@ -37,6 +37,7 @@ EOA, AccessList, Alloc, + AuthorizationTuple, ConsolidationRequest, DepositRequest, Environment, @@ -82,6 +83,7 @@ "Account", "Address", "Alloc", + "AuthorizationTuple", "BaseFixture", "BaseTest", "Block", diff --git a/src/ethereum_test_types/__init__.py b/src/ethereum_test_types/__init__.py index 16714bf4f4..b8b0cf5aba 100644 --- a/src/ethereum_test_types/__init__.py +++ b/src/ethereum_test_types/__init__.py @@ -18,6 +18,7 @@ AccessList, Account, Alloc, + AuthorizationTuple, CamelModel, ConsolidationRequest, DepositRequest, @@ -34,6 +35,7 @@ "AccessList", "Account", "Alloc", + "AuthorizationTuple", "CamelModel", "ConsolidationRequest", "DepositRequest", diff --git a/src/ethereum_test_types/types.py b/src/ethereum_test_types/types.py index 269f7073ab..5474ee4ac2 100644 --- a/src/ethereum_test_types/types.py +++ b/src/ethereum_test_types/types.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from functools import cached_property -from typing import Any, Dict, Generic, List, Sequence +from typing import Any, ClassVar, Dict, Generic, List, Sequence, Tuple from coincurve.keys import PrivateKey, PublicKey from ethereum import rlp as eth_rlp @@ -457,6 +457,102 @@ def to_list(self) -> List[Address | List[Hash]]: return [self.address, self.storage_keys] +class AuthorizationTupleGeneric(CamelModel, Generic[NumberBoundTypeVar]): + """ + Authorization tuple for transactions. + """ + + chain_id: NumberBoundTypeVar = Field(1) # type: ignore + address: Address + nonce: List[NumberBoundTypeVar] = Field(default_factory=list) + + v: NumberBoundTypeVar = Field(0) # type: ignore + r: NumberBoundTypeVar = Field(0) # type: ignore + s: NumberBoundTypeVar = Field(0) # type: ignore + + magic: ClassVar[int] = 0x05 + + @model_validator(mode="before") + @classmethod + def convert_nonce_information(cls, data: Any) -> Any: + """ + Automatically converts the nonce to a list if it is not already. + """ + if "nonce" in data and not isinstance(data["nonce"], list): + data["nonce"] = [data["nonce"]] + return data + + def to_list(self) -> List[Any]: + """ + Returns the authorization tuple as a list of serializable elements. + """ + return [ + Uint(self.chain_id), + self.address, + [Uint(n) for n in self.nonce], + Uint(self.v), + Uint(self.r), + Uint(self.s), + ] + + @cached_property + def signing_bytes(self) -> bytes: + """ + Returns the data to be signed. + """ + return int.to_bytes(self.magic, length=1, byteorder="big") + eth_rlp.encode( + [ + Uint(self.chain_id), + self.address, + [Uint(n) for n in self.nonce], + ] + ) + + def signature(self, private_key: Hash) -> Tuple[int, int, int]: + """ + Returns the signature of the authorization tuple. + """ + signature_bytes = PrivateKey(secret=private_key).sign_recoverable( + self.signing_bytes, hasher=keccak256 + ) + return ( + signature_bytes[64], + int.from_bytes(signature_bytes[0:32], byteorder="big"), + int.from_bytes(signature_bytes[32:64], byteorder="big"), + ) + + +class AuthorizationTuple(AuthorizationTupleGeneric[HexNumber]): + """ + Authorization tuple for transactions. + """ + + signer: EOA | None = None + secret_key: Hash | None = None + + def model_post_init(self, __context: Any) -> None: + """ + Automatically signs the authorization tuple if a secret key or sender are provided. + """ + super().model_post_init(__context) + + if self.secret_key is not None: + self.sign(self.secret_key) + elif self.signer is not None: + assert self.signer.key is not None, "signer must have a key" + self.sign(self.signer.key) + + def sign(self, private_key: Hash) -> None: + """ + Signs the authorization tuple with a private key. + """ + signature = self.signature(private_key) + + self.v = HexNumber(signature[0]) + self.r = HexNumber(signature[1]) + self.s = HexNumber(signature[2]) + + class TransactionGeneric(BaseModel, Generic[NumberBoundTypeVar]): """ Generic transaction type used as a parent for Transaction and FixtureTransaction (blockchain). @@ -547,6 +643,8 @@ class Transaction(TransactionGeneric[HexNumber], TransactionTransitionToolConver to: Address | None = Field(Address(0xAA)) data: Bytes = Field(Bytes(b""), alias="input") + authorization_list: List[AuthorizationTuple] | None = None + secret_key: Hash | None = None error: List[TransactionException] | TransactionException | None = Field(None, exclude=True) @@ -594,7 +692,9 @@ def model_post_init(self, __context): if "ty" not in self.model_fields_set: # Try to deduce transaction type from included fields - if self.max_fee_per_blob_gas is not None or self.blob_kzg_commitments is not None: + if self.authorization_list is not None: + self.ty = 4 + elif self.max_fee_per_blob_gas is not None or self.blob_kzg_commitments is not None: self.ty = 3 elif self.max_fee_per_gas is not None or self.max_priority_fee_per_gas is not None: self.ty = 2 @@ -618,14 +718,27 @@ def model_post_init(self, __context): self.gas_price = 10 if self.ty >= 1 and self.access_list is None: self.access_list = [] + if self.ty < 1: + assert self.access_list is None, "access_list must be None" if self.ty >= 2 and self.max_fee_per_gas is None: self.max_fee_per_gas = 7 if self.ty >= 2 and self.max_priority_fee_per_gas is None: self.max_priority_fee_per_gas = 0 + if self.ty < 2: + assert self.max_fee_per_gas is None, "max_fee_per_gas must be None" + assert self.max_priority_fee_per_gas is None, "max_priority_fee_per_gas must be None" if self.ty == 3 and self.max_fee_per_blob_gas is None: self.max_fee_per_blob_gas = 1 + if self.ty != 3: + assert self.blob_versioned_hashes is None, "blob_versioned_hashes must be None" + assert self.max_fee_per_blob_gas is None, "max_fee_per_blob_gas must be None" + + if self.ty == 4 and self.authorization_list is None: + self.authorization_list = [] + if self.ty != 4: + assert self.authorization_list is None, "authorization_list must be None" if "nonce" not in self.model_fields_set and self.sender is not None: self.nonce = HexNumber(self.sender.get_nonce()) @@ -710,18 +823,40 @@ def signing_envelope(self) -> List[Any]: Returns the list of values included in the envelope used for signing. """ to = self.to if self.to else bytes() - if self.ty == 3: + if self.ty == 4: + # EIP-7702: https://eips.ethereum.org/EIPS/eip-7702 + if self.max_priority_fee_per_gas is None: + raise ValueError(f"max_priority_fee_per_gas must be set for type {self.ty} tx") + if self.max_fee_per_gas is None: + raise ValueError(f"max_fee_per_gas must be set for type {self.ty} tx") + if self.access_list is None: + raise ValueError(f"access_list must be set for type {self.ty} tx") + if self.authorization_list is None: + raise ValueError(f"authorization_tuples must be set for type {self.ty} tx") + return [ + Uint(self.chain_id), + Uint(self.nonce), + Uint(self.max_priority_fee_per_gas), + Uint(self.max_fee_per_gas), + Uint(self.gas_limit), + to, + Uint(self.value), + self.data, + [a.to_list() for a in self.access_list], + [a.to_list() for a in self.authorization_list], + ] + elif self.ty == 3: # EIP-4844: https://eips.ethereum.org/EIPS/eip-4844 if self.max_priority_fee_per_gas is None: - raise ValueError("max_priority_fee_per_gas must be set for type 3 tx") + raise ValueError(f"max_priority_fee_per_gas must be set for type {self.ty} tx") if self.max_fee_per_gas is None: - raise ValueError("max_fee_per_gas must be set for type 3 tx") + raise ValueError(f"max_fee_per_gas must be set for type {self.ty} tx") if self.max_fee_per_blob_gas is None: - raise ValueError("max_fee_per_blob_gas must be set for type 3 tx") + raise ValueError(f"max_fee_per_blob_gas must be set for type {self.ty} tx") if self.blob_versioned_hashes is None: - raise ValueError("blob_versioned_hashes must be set for type 3 tx") + raise ValueError(f"blob_versioned_hashes must be set for type {self.ty} tx") if self.access_list is None: - raise ValueError("access_list must be set for type 3 tx") + raise ValueError(f"access_list must be set for type {self.ty} tx") return [ Uint(self.chain_id), Uint(self.nonce), @@ -738,11 +873,11 @@ def signing_envelope(self) -> List[Any]: elif self.ty == 2: # EIP-1559: https://eips.ethereum.org/EIPS/eip-1559 if self.max_priority_fee_per_gas is None: - raise ValueError("max_priority_fee_per_gas must be set for type 2 tx") + raise ValueError(f"max_priority_fee_per_gas must be set for type {self.ty} tx") if self.max_fee_per_gas is None: - raise ValueError("max_fee_per_gas must be set for type 2 tx") + raise ValueError(f"max_fee_per_gas must be set for type {self.ty} tx") if self.access_list is None: - raise ValueError("access_list must be set for type 2 tx") + raise ValueError(f"access_list must be set for type {self.ty} tx") return [ Uint(self.chain_id), Uint(self.nonce), @@ -757,9 +892,9 @@ def signing_envelope(self) -> List[Any]: elif self.ty == 1: # EIP-2930: https://eips.ethereum.org/EIPS/eip-2930 if self.gas_price is None: - raise ValueError("gas_price must be set for type 1 tx") + raise ValueError(f"gas_price must be set for type {self.ty} tx") if self.access_list is None: - raise ValueError("access_list must be set for type 1 tx") + raise ValueError(f"access_list must be set for type {self.ty} tx") return [ Uint(self.chain_id), @@ -773,7 +908,7 @@ def signing_envelope(self) -> List[Any]: ] elif self.ty == 0: if self.gas_price is None: - raise ValueError("gas_price must be set for type 0 tx") + raise ValueError(f"gas_price must be set for type {self.ty} tx") if self.protected: # EIP-155: https://eips.ethereum.org/EIPS/eip-155 @@ -815,11 +950,11 @@ def payload_body(self) -> List[Any]: elif self.ty == 3 and self.wrapped_blob_transaction: # EIP-4844: https://eips.ethereum.org/EIPS/eip-4844 if self.blobs is None: - raise ValueError("blobs must be set for type 3 tx") + raise ValueError(f"blobs must be set for type {self.ty} tx") if self.blob_kzg_commitments is None: - raise ValueError("blob_kzg_commitments must be set for type 3 tx") + raise ValueError(f"blob_kzg_commitments must be set for type {self.ty} tx") if self.blob_kzg_proofs is None: - raise ValueError("blob_kzg_proofs must be set for type 3 tx") + raise ValueError(f"blob_kzg_proofs must be set for type {self.ty} tx") return [ signing_envelope + [Uint(self.v), Uint(self.r), Uint(self.s)], list(self.blobs), diff --git a/tests/prague/eip7702_eoa_code_tx/__init__.py b/tests/prague/eip7702_eoa_code_tx/__init__.py new file mode 100644 index 0000000000..1400f0640a --- /dev/null +++ b/tests/prague/eip7702_eoa_code_tx/__init__.py @@ -0,0 +1,3 @@ +""" +Cross-client EIP-7702 Tests +""" diff --git a/tests/prague/eip7702_eoa_code_tx/spec.py b/tests/prague/eip7702_eoa_code_tx/spec.py new file mode 100644 index 0000000000..f0da9cbdb8 --- /dev/null +++ b/tests/prague/eip7702_eoa_code_tx/spec.py @@ -0,0 +1,29 @@ +""" +Defines EIP-7702 specification constants and functions. +""" +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ReferenceSpec: + """ + Defines the reference spec version and git path. + """ + + git_path: str + version: str + + +ref_spec_7702 = ReferenceSpec("EIPS/eip-7702.md", "7357ff1f3f176aada6d350d6e42a292a3dec27f4") + + +@dataclass(frozen=True) +class Spec: + """ + Parameters from the EIP-7702 specifications as defined at + https://eips.ethereum.org/EIPS/eip-7702 + """ + + SET_CODE_TX_TYPE = 0x04 + MAGIC = 0x05 + PER_AUTH_BASE_COST = 2500 diff --git a/tests/prague/eip7702_eoa_code_tx/test_eoa_code_txs.py b/tests/prague/eip7702_eoa_code_tx/test_eoa_code_txs.py new file mode 100644 index 0000000000..3264fe90ca --- /dev/null +++ b/tests/prague/eip7702_eoa_code_tx/test_eoa_code_txs.py @@ -0,0 +1,916 @@ +""" +abstract: Tests use of set-code transactions from [EIP-7702: Set EOA account code for one transaction](https://eips.ethereum.org/EIPS/eip-7702) + Tests use of set-code transactions from [EIP-7702: Set EOA account code for one transaction](https://eips.ethereum.org/EIPS/eip-7702). +""" # noqa: E501 + +from enum import Enum +from itertools import count + +import pytest + +from ethereum_test_tools import ( + Account, + Alloc, + AuthorizationTuple, + Block, + BlockchainTestFiller, + Bytecode, + Conditional, + Environment, + Initcode, +) +from ethereum_test_tools import Opcodes as Op +from ethereum_test_tools import ( + StateTestFiller, + Storage, + Transaction, + compute_create2_address, + compute_create_address, +) + +from .spec import ref_spec_7702 + +REFERENCE_SPEC_GIT_PATH = ref_spec_7702.git_path +REFERENCE_SPEC_VERSION = ref_spec_7702.version + +pytestmark = pytest.mark.valid_from("Prague") + +# FIXME: Temporary workaround, should be zero +auth_account_start_balance = 1 + + +class InvalidityReason(Enum): + """ + Reasons for invalidity. + """ + + NONCE = "nonce" + MULTIPLE_NONCE = "multiple_nonce" + CHAIN_ID = "chain_id" + + +@pytest.mark.parametrize( + "eoa_balance", + [ + pytest.param(0, marks=pytest.mark.xfail(reason="evm fails on zero balance")), + pytest.param(1), + ], +) +@pytest.mark.parametrize( + "suffix,succeeds", + [ + pytest.param(Op.STOP, True, id="stop"), + pytest.param(Op.RETURN(0, 0), True, id="return"), + pytest.param(Op.REVERT, False, id="revert"), + pytest.param(Op.INVALID, False, id="invalid"), + ], +) +def test_set_code_to_sstore( + state_test: StateTestFiller, + pre: Alloc, + suffix: Bytecode, + succeeds: bool, + eoa_balance: int, +): + """ + Test the executing a simple SSTORE in a set-code transaction. + """ + storage = Storage() + auth_signer = pre.fund_eoa(eoa_balance) + + set_code = ( + Op.SSTORE(storage.store_next(1), 1) + + Op.SSTORE(storage.store_next(2), 2) + + Op.SSTORE(storage.store_next(3), 3) + + suffix + ) + set_code_to_address = pre.deploy_contract( + set_code, + ) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address: Account(storage={k: 0 for k in storage}), + auth_signer: Account(nonce=0, code=b"", storage=storage if succeeds else {}), + }, + ) + + +def test_set_code_to_sstore_then_sload( + blockchain_test: BlockchainTestFiller, + pre: Alloc, +): + """ + Test the executing a simple SSTORE then SLOAD in two separate set-code transactions. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + sender = pre.fund_eoa() + + storage_key_1 = 0x1 + storage_key_2 = 0x2 + storage_value = 0x1234 + + set_code_1 = Op.SSTORE(storage_key_1, storage_value) + Op.STOP + set_code_1_address = pre.deploy_contract(set_code_1) + + set_code_2 = Op.SSTORE(storage_key_2, Op.ADD(Op.SLOAD(storage_key_1), 1)) + Op.STOP + set_code_2_address = pre.deploy_contract(set_code_2) + + tx_1 = Transaction( + gas_limit=50_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_1_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=sender, + ) + + tx_2 = Transaction( + gas_limit=50_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_2_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=sender, + ) + + block = Block( + txs=[tx_1, tx_2], + ) + + blockchain_test( + pre=pre, + post={ + auth_signer: Account( + nonce=0, + code=b"", + storage={ + storage_key_1: storage_value, + storage_key_2: storage_value + 1, + }, + ), + }, + blocks=[block], + ) + + +@pytest.mark.parametrize( + "call_opcode", + [ + Op.CALL, + Op.DELEGATECALL, + Op.STATICCALL, + Op.CALLCODE, + ], +) +@pytest.mark.parametrize( + "return_opcode", + [ + Op.RETURN, + Op.REVERT, + ], +) +def test_set_code_to_tstore_reentry( + state_test: StateTestFiller, + pre: Alloc, + call_opcode: Op, + return_opcode: Op, +): + """ + Test the executing a simple TSTORE in a set-code transaction, which also performs a + re-entry to TLOAD the value. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + + tload_value = 0x1234 + set_code = Conditional( + condition=Op.ISZERO(Op.TLOAD(1)), + if_true=Op.TSTORE(1, tload_value) + + call_opcode(address=Op.ADDRESS) + + Op.RETURNDATACOPY(0, 0, 32) + + Op.SSTORE(2, Op.MLOAD(0)), + if_false=Op.MSTORE(0, Op.TLOAD(1)) + return_opcode(size=32), + ) + set_code_to_address = pre.deploy_contract(set_code) + + tx = Transaction( + gas_limit=100_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + auth_signer: Account(nonce=0, code=b"", storage={2: tload_value}), + }, + ) + + +def test_set_code_to_self_destruct( + state_test: StateTestFiller, + pre: Alloc, +): + """ + Test the executing self-destruct opcode in a set-code transaction. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + + set_code_to_address = pre.deploy_contract(Op.SELFDESTRUCT(Op.ADDRESS)) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={}, + ) + + +@pytest.mark.parametrize( + "op", + [ + Op.CREATE, + Op.CREATE2, + ], +) +def test_set_code_to_contract_creator( + state_test: StateTestFiller, + pre: Alloc, + op: Op, +): + """ + Test the executing a contract-creating opcode in a set-code transaction. + """ + storage = Storage() + auth_signer = pre.fund_eoa(auth_account_start_balance) + + deployed_code = Op.STOP + initcode = Initcode(deploy_code=deployed_code) + + if op == Op.CREATE: + deployed_contract_address = compute_create_address(auth_signer) + elif op == Op.CREATE2: + deployed_contract_address = compute_create2_address( + address=auth_signer, + salt=0, + initcode=initcode, + ) + + set_code = Op.CALLDATACOPY(0, 0, Op.CALLDATASIZE) + Op.SSTORE( + storage.store_next(deployed_contract_address), + op(value=0, offset=0, size=Op.CALLDATASIZE), + ) + set_code_to_address = pre.deploy_contract(set_code) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + data=initcode, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address: Account(storage={}), + auth_signer: Account(nonce=1, code=b"", storage=storage), + deployed_contract_address: Account( + code=deployed_code, + storage={}, + ), + }, + ) + + +@pytest.mark.parametrize( + "op", + [ + Op.CALL, + Op.DELEGATECALL, + Op.STATICCALL, + Op.CALLCODE, + ], +) +@pytest.mark.parametrize( + "value", + [ + 0, + 10**18, + ], +) +def test_set_code_to_self_caller( + state_test: StateTestFiller, + pre: Alloc, + op: Op, + value: int, +): + """ + Test the executing a self-call in a set-code transaction. + """ + storage = Storage() + auth_signer = pre.fund_eoa(auth_account_start_balance) + + first_entry_slot = storage.store_next(True) + re_entry_success_slot = storage.store_next(op != Op.STATICCALL) + re_entry_call_return_code_slot = storage.store_next(op != Op.STATICCALL) + set_code = Conditional( + condition=Op.ISZERO(Op.SLOAD(first_entry_slot)), + if_true=Op.SSTORE(first_entry_slot, 1) + + Op.SSTORE(re_entry_call_return_code_slot, op(address=auth_signer, value=value)) + + Op.STOP, + if_false=Op.SSTORE(re_entry_success_slot, 1) + Op.STOP, + ) + set_code_to_address = pre.deploy_contract(set_code) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=value, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(10**21), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address: Account(storage={}), + auth_signer: Account( + nonce=0, + code=b"", + storage=storage, + balance=auth_account_start_balance + value, + ), + }, + ) + + +@pytest.mark.parametrize( + "op", + [ + Op.CALL, + Op.DELEGATECALL, + Op.STATICCALL, + Op.CALLCODE, + ], +) +@pytest.mark.parametrize( + "value", + [ + 0, + 10**18, + ], +) +def test_set_code_call_set_code( + state_test: StateTestFiller, + pre: Alloc, + op: Op, + value: int, +): + """ + Test the calling a set-code account from another set-code account. + """ + auth_signer_1 = pre.fund_eoa(auth_account_start_balance) + storage_1 = Storage() + + set_code_1_call_result_slot = storage_1.store_next(op != Op.STATICCALL) + set_code_1_success = storage_1.store_next(True) + + auth_signer_2 = pre.fund_eoa(auth_account_start_balance) + storage_2 = Storage().set_next_slot(storage_1.peek_slot()) + set_code_2_success = storage_2.store_next(op != Op.STATICCALL) + + set_code_1 = ( + Op.SSTORE(set_code_1_call_result_slot, op(address=auth_signer_2, value=value)) + + Op.SSTORE(set_code_1_success, 1) + + Op.STOP + ) + set_code_to_address_1 = pre.deploy_contract(set_code_1) + + set_code_2 = Op.SSTORE(set_code_2_success, 1) + Op.STOP + set_code_to_address_2 = pre.deploy_contract(set_code_2) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer_1, + value=value, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address_1, + nonce=0, + signer=auth_signer_1, + ), + AuthorizationTuple( + address=set_code_to_address_2, + nonce=0, + signer=auth_signer_2, + ), + ], + sender=pre.fund_eoa(10**21), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address_1: Account(storage={k: 0 for k in storage_1}), + set_code_to_address_2: Account(storage={k: 0 for k in storage_2}), + auth_signer_1: Account( + nonce=0, + storage=storage_1 if op in [Op.CALL, Op.STATICCALL] else storage_1 + storage_2, + balance=(0 if op == Op.CALL else value) + auth_account_start_balance, + ), + auth_signer_2: Account( + nonce=0, + storage=storage_2 if op == Op.CALL else {}, + balance=(value if op == Op.CALL else 0) + auth_account_start_balance, + ), + }, + ) + + +def test_address_from_set_code( + state_test: StateTestFiller, + pre: Alloc, +): + """ + Test the address opcode in a set-code transaction. + """ + storage = Storage() + auth_signer = pre.fund_eoa(auth_account_start_balance) + + set_code = Op.SSTORE(storage.store_next(auth_signer), Op.ADDRESS) + Op.STOP + set_code_to_address = pre.deploy_contract(set_code) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address: Account(storage={}), + auth_signer: Account(nonce=0, code=b"", storage=storage), + }, + ) + + +@pytest.mark.parametrize( + "balance", + [ + pytest.param(0, marks=pytest.mark.xfail(reason="evm fails on zero balance")), + pytest.param(10**18), + ], +) +def test_ext_code_on_set_code( + state_test: StateTestFiller, + pre: Alloc, + balance: int, +): + """ + Test different ext*code operations on a set-code address. + """ + auth_signer = pre.fund_eoa(balance) + + slot = count(1) + slot_call_success = next(slot) + slot_caller = next(slot) + slot_ext_code_size_result = next(slot) + slot_ext_code_hash_result = next(slot) + slot_ext_code_copy_result = next(slot) + slot_ext_balance_result = next(slot) + + callee_code = ( + Op.SSTORE(slot_caller, Op.CALLER) + + Op.SSTORE(slot_ext_code_size_result, Op.EXTCODESIZE(Op.CALLER)) + + Op.SSTORE(slot_ext_code_hash_result, Op.EXTCODEHASH(Op.CALLER)) + + Op.EXTCODECOPY(Op.CALLER, 0, 0, Op.EXTCODESIZE(Op.CALLER)) + + Op.SSTORE(slot_ext_code_copy_result, Op.MLOAD(0)) + + Op.SSTORE(slot_ext_balance_result, Op.BALANCE(Op.CALLER)) + + Op.STOP + ) + callee_address = pre.deploy_contract(callee_code) + callee_storage = Storage() + + auth_signer_storage = Storage() + set_code = Op.SSTORE(slot_call_success, Op.CALL(address=callee_address)) + Op.STOP + auth_signer_storage[slot_call_success] = True + set_code_to_address = pre.deploy_contract(set_code) + + callee_storage[slot_caller] = auth_signer + callee_storage[slot_ext_code_size_result] = len(set_code) + callee_storage[slot_ext_code_hash_result] = set_code.keccak256() + callee_storage[slot_ext_code_copy_result] = bytes(set_code).ljust(32, b"\x00")[:32] + callee_storage[slot_ext_balance_result] = balance + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address: Account(storage={}), + auth_signer: Account(nonce=0, code=b"", storage=auth_signer_storage, balance=balance), + callee_address: Account(storage=callee_storage), + }, + ) + + +@pytest.mark.parametrize( + "balance", + [ + pytest.param(0, marks=pytest.mark.xfail(reason="evm fails on zero balance")), + pytest.param(10**18), + ], +) +def test_self_code_on_set_code( + state_test: StateTestFiller, + pre: Alloc, + balance: int, +): + """ + Test codesize and codecopy operations on a set-code address. + """ + auth_signer = pre.fund_eoa(balance) + + slot = count(1) + slot_code_size_result = next(slot) + slot_code_copy_result = next(slot) + slot_self_balance_result = next(slot) + + set_code = ( + Op.SSTORE(slot_code_size_result, Op.CODESIZE) + + Op.CODECOPY(0, 0, Op.CODESIZE) + + Op.SSTORE(slot_code_copy_result, Op.MLOAD(0)) + + Op.SSTORE(slot_self_balance_result, Op.SELFBALANCE) + + Op.STOP + ) + set_code_to_address = pre.deploy_contract(set_code) + + storage = Storage() + storage[slot_code_size_result] = len(set_code) + storage[slot_code_copy_result] = bytes(set_code).ljust(32, b"\x00")[:32] + storage[slot_self_balance_result] = balance + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + set_code_to_address: Account(storage={}), + auth_signer: Account(nonce=0, code=b"", storage=storage, balance=balance), + }, + ) + + +@pytest.mark.parametrize( + "create_op", + [ + Op.CREATE, + Op.CREATE2, + ], +) +def test_set_code_to_account_deployed_in_same_tx( + state_test: StateTestFiller, + pre: Alloc, + create_op: Op, +): + """ + Test setting the code of an account to an address that is deployed in the same transaction, + and test calling the set-code address and the deployed contract. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + + success_slot = 1 + + deployed_code = Op.SSTORE(success_slot, 1) + Op.STOP + initcode = Initcode(deploy_code=deployed_code) + + deployed_contract_address_slot = 1 + signer_call_return_code_slot = 2 + deployed_contract_call_return_code_slot = 3 + + contract_creator_code = ( + Op.CALLDATACOPY(0, 0, Op.CALLDATASIZE) + + Op.SSTORE(deployed_contract_address_slot, create_op(offset=0, size=Op.CALLDATASIZE)) + + Op.SSTORE(signer_call_return_code_slot, Op.CALL(address=auth_signer)) + + Op.SSTORE( + deployed_contract_call_return_code_slot, + Op.CALL(address=Op.SLOAD(deployed_contract_address_slot)), + ) + + Op.STOP() + ) + contract_creator_address = pre.deploy_contract(contract_creator_code) + + if create_op == Op.CREATE: + deployed_contract_address = compute_create_address( + address=contract_creator_address, + nonce=1, + ) + else: + deployed_contract_address = compute_create2_address( + address=contract_creator_address, + salt=0, + initcode=initcode, + ) + + tx = Transaction( + gas_limit=10_000_000, + to=contract_creator_address, + value=0, + data=initcode, + authorization_list=[ + AuthorizationTuple( + address=deployed_contract_address, + nonce=0, + signer=auth_signer, + ), + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + deployed_contract_address: Account( + storage={success_slot: 1}, + ), + auth_signer: Account( + nonce=0, + code=b"", + storage={}, + ), + contract_creator_address: Account( + storage={ + deployed_contract_address_slot: deployed_contract_address, + signer_call_return_code_slot: 1, + deployed_contract_call_return_code_slot: 1, + } + ), + }, + ) + + +def test_set_code_multiple_valid_authorization_tuples_same_signer( + state_test: StateTestFiller, + pre: Alloc, +): + """ + Test setting the code of an account with multiple authorization tuples from the same signer. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + + success_slot = 1 + + tuple_count = 10 + + addresses = [ + pre.deploy_contract(Op.SSTORE(success_slot, i + 1) + Op.STOP) for i in range(tuple_count) + ] + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=address, + nonce=0, + signer=auth_signer, + ) + for address in addresses + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + auth_signer: Account( + nonce=0, + code=b"", + storage={ + success_slot: 1, + }, + ), + }, + ) + + +def test_set_code_multiple_valid_authorization_tuples_first_invalid_same_signer( + state_test: StateTestFiller, + pre: Alloc, +): + """ + Test setting the code of an account with multiple authorization tuples from the same signer + but the first tuple is invalid. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + + success_slot = 1 + + tuple_count = 10 + + addresses = [ + pre.deploy_contract(Op.SSTORE(success_slot, i + 1) + Op.STOP) for i in range(tuple_count) + ] + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=address, + nonce=1 if i == 0 else 0, + signer=auth_signer, + ) + for i, address in enumerate(addresses) + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + auth_signer: Account( + nonce=0, + code=b"", + storage={ + success_slot: 2, + }, + ), + }, + ) + + +@pytest.mark.parametrize( + "invalidity_reason", + [ + InvalidityReason.NONCE, + pytest.param( + InvalidityReason.MULTIPLE_NONCE, marks=pytest.mark.xfail(reason="test issue") + ), + pytest.param(InvalidityReason.CHAIN_ID, marks=pytest.mark.xfail(reason="evm issue")), + ], +) +def test_set_code_invalid_authorization_tuple( + state_test: StateTestFiller, + pre: Alloc, + invalidity_reason: InvalidityReason, +): + """ + Test attempting to set the code of an account with invalid authorization tuple. + """ + auth_signer = pre.fund_eoa(auth_account_start_balance) + + success_slot = 1 + + set_code = Op.SSTORE(success_slot, 1) + Op.STOP + set_code_to_address = pre.deploy_contract(set_code) + + tx = Transaction( + gas_limit=10_000_000, + to=auth_signer, + value=0, + authorization_list=[ + AuthorizationTuple( + address=set_code_to_address, + nonce=1 + if invalidity_reason == InvalidityReason.NONCE + else [0, 1] + if invalidity_reason == InvalidityReason.MULTIPLE_NONCE + else 0, + chain_id=2 if invalidity_reason == InvalidityReason.CHAIN_ID else 0, + signer=auth_signer, + ) + ], + sender=pre.fund_eoa(), + ) + + state_test( + env=Environment(), + pre=pre, + tx=tx, + post={ + auth_signer: Account( + nonce=0, + code=b"", + storage={ + success_slot: 0, + }, + ), + }, + )