diff --git a/boa/contracts/vyper/vyper_contract.py b/boa/contracts/vyper/vyper_contract.py index 4bf5e9db..90d46524 100644 --- a/boa/contracts/vyper/vyper_contract.py +++ b/boa/contracts/vyper/vyper_contract.py @@ -573,11 +573,11 @@ def __init__( self.env.register_contract(self._address, self) def _run_init(self, *args, value=0, override_address=None, gas=None): - encoded_args = b"" + self.ctor_calldata = b"" if self._ctor: - encoded_args = self._ctor.prepare_calldata(*args) + self.ctor_calldata = self._ctor.prepare_calldata(*args) - initcode = self.compiler_data.bytecode + encoded_args + initcode = self.compiler_data.bytecode + self.ctor_calldata with self._anchor_source_map(self._deployment_source_map): address, computation = self.env.deploy( bytecode=initcode, diff --git a/boa/explorer.py b/boa/explorer.py index c44f67fe..57be5c3c 100644 --- a/boa/explorer.py +++ b/boa/explorer.py @@ -1,15 +1,22 @@ +import re import time from dataclasses import dataclass +from datetime import timedelta from typing import Optional from boa.rpc import json +from boa.util.abi import Address +from boa.verifiers import ContractVerifier, VerificationResult try: from requests_cache import CachedSession + def filter_fn(response): + return response.ok and _is_success_response(response.json()) + SESSION = CachedSession( "~/.cache/titanoboa/explorer_cache", - filter_fn=lambda response: _is_success_response(response.json()), + filter_fn=filter_fn, allowable_codes=[200], cache_control=True, expire_after=3600 * 6, @@ -22,15 +29,120 @@ SESSION = Session() DEFAULT_ETHERSCAN_URI = "https://api.etherscan.io/api" +VERSION_RE = re.compile(r"v(\d+\.\d+\.\d+)(\+commit.*)?") @dataclass -class Etherscan: +class Etherscan(ContractVerifier[str]): uri: Optional[str] = DEFAULT_ETHERSCAN_URI api_key: Optional[str] = None num_retries: int = 10 backoff_ms: int | float = 400.0 backoff_factor: float = 1.1 # 1.1**10 ~= 2.59 + timeout = timedelta(minutes=2) + + def verify( + self, + address: Address, + contract_name: str, + solc_json: dict, + constructor_calldata: bytes, + chain_id: int, + license_type: str = "1", + wait: bool = False, + ) -> Optional["VerificationResult[str]"]: + """ + Verify the Vyper contract on Etherscan. + :param address: The address of the contract. + :param contract_name: The name of the contract. + :param solc_json: The solc_json output of the Vyper compiler. + :param constructor_calldata: The calldata for the contract constructor. + :param chain_id: The ID of the chain where the contract is deployed. + :param license_type: The license to use for the contract. Defaults to "none". + :param wait: Whether to return a VerificationResult immediately + or wait for verification to complete. Defaults to False + """ + api_key = self.api_key or "" + output_selection = solc_json["settings"]["outputSelection"] + contract_file = next(k for k, v in output_selection.items() if "*" in v) + compiler_version = solc_json["compiler_version"] + version_match = re.match(VERSION_RE, compiler_version) + if not version_match: + raise ValueError(f"Failed to extract Vyper version from {compiler_version}") + + data = { + "module": "contract", + "action": "verifysourcecode", + "apikey": api_key, + "chainId": chain_id, + "codeformat": "vyper-json", + "sourceCode": json.dumps(solc_json), + "constructorArguments": constructor_calldata.hex(), + "contractaddress": address, + "contractname": f"{contract_file}:{contract_name}", + "compilerversion": f"vyper:{version_match.group(1)}", + "licenseType": license_type, + "optimizationUsed": "1", + } + + def verification_created(): + # we need to retry until the contract is found by Etherscan + response = SESSION.post(self.uri, data=data) + response.raise_for_status() + response_json = response.json() + if response_json.get("status") == "1": + return response_json["result"] + if ( + response_json.get("message") == "NOTOK" + and "Unable to locate ContractCode" not in response_json["result"] + ): + raise ValueError(f"Failed to verify: {response_json['result']}") + print( + f'Verification could not be created yet: {response_json["result"]}. Retrying...' + ) + return None + + etherscan_guid = self._wait_until( + verification_created, timedelta(minutes=2), timedelta(seconds=5), 1.1 + ) + print(f"Verification started with etherscan_guid {etherscan_guid}") + if not wait: + return VerificationResult(etherscan_guid, self) + + self.wait_for_verification(etherscan_guid) + return None + + def wait_for_verification(self, etherscan_guid: str) -> None: + """ + Waits for the contract to be verified on Etherscan. + :param etherscan_guid: The unique ID of the contract verification. + """ + self._wait_until( + lambda: self.is_verified(etherscan_guid), + self.timeout, + self.backoff, + self.backoff_factor, + ) + print("Contract verified!") + + @property + def backoff(self): + return timedelta(milliseconds=self.backoff_ms) + + def is_verified(self, etherscan_guid: str) -> bool: + api_key = self.api_key or "" + url = f"{self.uri}?module=contract&action=checkverifystatus" + url += f"&guid={etherscan_guid}&apikey={api_key}" + + response = SESSION.get(url) + response.raise_for_status() + response_json = response.json() + if ( + response_json.get("message") == "NOTOK" + and "Pending in queue" not in response_json["result"] + ): + raise ValueError(f"Failed to verify: {response_json['result']}") + return response_json.get("status") == "1" def __post_init__(self): if self.uri is None: diff --git a/boa/verifiers.py b/boa/verifiers.py index 0d27d6dc..05e9ea1b 100644 --- a/boa/verifiers.py +++ b/boa/verifiers.py @@ -3,18 +3,58 @@ from dataclasses import dataclass from datetime import datetime, timedelta from http import HTTPStatus -from typing import Optional +from typing import Callable, Generic, Optional, TypeVar import requests +from boa.environment import Env from boa.util.abi import Address from boa.util.open_ctx import Open DEFAULT_BLOCKSCOUT_URI = "https://eth.blockscout.com" +T = TypeVar("T") +P = TypeVar("P") + + +class ContractVerifier(Generic[T]): + def verify( + self, + address: Address, + contract_name: str, + solc_json: dict, + constructor_calldata: bytes, + chain_id: int, + license_type: str = "1", + wait: bool = False, + ) -> Optional["VerificationResult[T]"]: + raise NotImplementedError + + def wait_for_verification(self, identifier: T) -> None: + raise NotImplementedError + + def is_verified(self, identifier: T) -> bool: + raise NotImplementedError + + @staticmethod + def _wait_until( + predicate: Callable[[], P], + wait_for: timedelta, + backoff: timedelta, + backoff_factor: float, + ) -> P: + timeout = datetime.now() + wait_for + wait_time = backoff + while datetime.now() < timeout: + if result := predicate(): + return result + time.sleep(wait_time.total_seconds()) + wait_time *= backoff_factor + + raise TimeoutError("Timeout waiting for verification to complete") @dataclass -class Blockscout: +class Blockscout(ContractVerifier[Address]): """ Allows users to verify contracts on Blockscout. This is independent of Vyper contracts, and can be used to verify any smart contract. @@ -37,14 +77,18 @@ def verify( address: Address, contract_name: str, solc_json: dict, - license_type: str = None, + constructor_calldata: bytes, + chain_id: int, + license_type: str = "1", wait: bool = False, - ) -> Optional["VerificationResult"]: + ) -> Optional["VerificationResult[Address]"]: """ Verify the Vyper contract on Blockscout. :param address: The address of the contract. :param contract_name: The name of the contract. :param solc_json: The solc_json output of the Vyper compiler. + :param constructor_calldata: The calldata for the constructor. + :param chain_id: The ID of the chain where the contract is deployed. :param license_type: The license to use for the contract. Defaults to "none". :param wait: Whether to return a VerificationResult immediately or wait for verification to complete. Defaults to False @@ -83,18 +127,15 @@ def wait_for_verification(self, address: Address) -> None: Waits for the contract to be verified on Blockscout. :param address: The address of the contract. """ - timeout = datetime.now() + self.timeout - wait_time = self.backoff - while datetime.now() < timeout: - if self.is_verified(address): - msg = "Contract verified!" - msg += f" {self.uri}/address/{address}?tab=contract_code" - print(msg) - return - time.sleep(wait_time.total_seconds()) - wait_time *= self.backoff_factor - - raise TimeoutError("Timeout waiting for verification to complete") + self._wait_until( + lambda: self.is_verified(address), + self.timeout, + self.backoff, + self.backoff_factor, + ) + msg = "Contract verified!" + msg += f" {self.uri}/address/{address}?tab=contract_code" + print(msg) def is_verified(self, address: Address) -> bool: api_key = self.api_key or "" @@ -107,19 +148,19 @@ def is_verified(self, address: Address) -> bool: return response.json().get("is_verified", False) -_verifier = Blockscout() +_verifier: ContractVerifier = Blockscout() @dataclass -class VerificationResult: - address: Address - verifier: Blockscout +class VerificationResult(Generic[T]): + identifier: T + verifier: ContractVerifier def wait_for_verification(self): - self.verifier.wait_for_verification(self.address) + self.verifier.wait_for_verification(self.identifier) def is_verified(self): - return self.verifier.is_verified(self.address) + return self.verifier.is_verified(self.identifier) def _set_verifier(verifier): @@ -133,7 +174,7 @@ def get_verifier(): # TODO: maybe allow like `set_verifier("blockscout", *args, **kwargs)` -def set_verifier(verifier): +def set_verifier(verifier: ContractVerifier): return Open(get_verifier, _set_verifier, verifier) @@ -147,14 +188,14 @@ def get_verification_bundle(contract_like): # should we also add a `verify_deployment` function? def verify( - contract, verifier=None, license_type: str = None, wait=False -) -> VerificationResult: + contract, verifier: ContractVerifier = None, wait=False, **kwargs +) -> VerificationResult | None: """ Verifies the contract on a block explorer. :param contract: The contract to verify. :param verifier: The block explorer verifier to use. Defaults to get_verifier(). - :param license_type: Optional license to use for the contract. + :param wait: Whether to wait for verification to complete. """ if verifier is None: verifier = get_verifier() @@ -166,6 +207,8 @@ def verify( address=contract.address, solc_json=bundle, contract_name=contract.contract_name, - license_type=license_type, + constructor_calldata=contract.ctor_calldata, wait=wait, + chain_id=Env.get_singleton().get_chain_id(), + **kwargs, ) diff --git a/tests/integration/network/sepolia/module_lib.vy b/tests/integration/network/sepolia/module_lib.vy new file mode 100644 index 00000000..1dcfa043 --- /dev/null +++ b/tests/integration/network/sepolia/module_lib.vy @@ -0,0 +1,8 @@ +# pragma version ~=0.4.0 + +@view +def throw(): + raise "Error with message" + +def throw_dev_reason(): + raise # dev: some dev reason diff --git a/tests/integration/network/sepolia/test_sepolia_env.py b/tests/integration/network/sepolia/test_sepolia_env.py index e0e9de12..5ea5b243 100644 --- a/tests/integration/network/sepolia/test_sepolia_env.py +++ b/tests/integration/network/sepolia/test_sepolia_env.py @@ -1,8 +1,11 @@ import os +from random import randint, sample +from string import ascii_lowercase import pytest import boa +from boa import Etherscan from boa.deployments import DeploymentsDB, set_deployments_db from boa.network import NetworkEnv from boa.rpc import to_bytes @@ -38,13 +41,40 @@ def simple_contract(): return boa.loads(code, STARTING_SUPPLY) -def test_verify(simple_contract): - api_key = os.getenv("BLOCKSCOUT_API_KEY") - blockscout = Blockscout("https://eth-sepolia.blockscout.com", api_key) - with boa.set_verifier(blockscout): - result = boa.verify(simple_contract) - result.wait_for_verification() - assert result.is_verified() +@pytest.fixture(scope="module", params=[Etherscan, Blockscout]) +def verifier(request): + if request.param == Blockscout: + api_key = os.getenv("BLOCKSCOUT_API_KEY") + return Blockscout("https://eth-sepolia.blockscout.com", api_key) + elif request.param == Etherscan: + api_key = os.environ["ETHERSCAN_API_KEY"] + return Etherscan("https://api-sepolia.etherscan.io/api", api_key) + raise ValueError(f"Unknown verifier: {request.param}") + + +def test_verify(verifier): + # generate a random contract so the verification will actually be done again + name = "".join(sample(ascii_lowercase, 10)) + value = randint(0, 2**256 - 1) + contract = boa.loads( + f""" + import module_lib + + @deploy + def __init__(t: uint256): + if t == 0: + module_lib.throw() + + @external + def {name}() -> uint256: + return {value} + """, + value, + name=name, + ) + result = boa.verify(contract, verifier) + result.wait_for_verification() + assert result.is_verified() def test_env_type():