Skip to content

Commit

Permalink
fix(zetaclient): add zetaclient evm outbound tx index by nonce to s…
Browse files Browse the repository at this point in the history
…upplement outtx tracker (#2735)

* zetaclient: cache outbound tx while scanning blocks

* PoC: no outbound tracker to validate the local indexing

* Update zetaclient/chains/evm/observer/inbound.go

Co-authored-by: Lucas Bertrand <[email protected]>

* fixed rename

* fixed rename

* revert the manual test in localnet e2e

* fix unit test

* fix some logs/comments due to rename

* Update zetaclient/chains/evm/observer/inbound.go

Co-authored-by: Charlie Chen <[email protected]>

* check cache before calling RPC

* use a separate function FilterTSSOutbound to scan TSS outbounds and supplement to outbound trackers

* replace manual evm rpc mock with mockery generated mock

---------

Co-authored-by: Charlie Chen <[email protected]>
Co-authored-by: Lucas Bertrand <[email protected]>
Co-authored-by: Charlie Chen <[email protected]>
  • Loading branch information
4 people authored Sep 11, 2024
1 parent a37fb58 commit 1929ff6
Show file tree
Hide file tree
Showing 16 changed files with 870 additions and 216 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [2654](https://github.com/zeta-chain/node/pull/2654) - add validation for authorization list in when validating genesis state for authorization module
* [2674](https://github.com/zeta-chain/node/pull/2674) - allow operators to vote on ballots associated with discarded keygen without affecting the status of the current keygen.
* [2672](https://github.com/zeta-chain/node/pull/2672) - check observer set for duplicates when adding a new observer or updating an existing one
* [2735](https://github.com/zeta-chain/node/pull/2735) - fix the outbound tracker blocking confirmation and outbound processing on EVM chains by locally index outbound txs in zetaclient
* [2787](https://github.com/zeta-chain/node/pull/2787) - ask for 3 accounts (signer, pda, system_program) on solana gateway deposit
* [2842](https://github.com/zeta-chain/node/pull/2842) - fix: move interval assignment out of cctx loop in EVM outbound tx scheduler
* [2853](https://github.com/zeta-chain/node/pull/2853) - calling precompile through sc with sc state update
Expand Down
5 changes: 5 additions & 0 deletions zetaclient/chains/evm/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func (ob *Observer) ObserveInbound(ctx context.Context, sampledLogger zerolog.Lo
return errors.Wrap(err, "unable to observe TSSReceive")
}

// task 4: filter the outbounds from TSS address to supplement outbound trackers
// TODO: make this a separate go routine in outbound.go after switching to smart contract V2
//
ob.FilterTSSOutbound(ctx, startBlock, toBlock)

// query the gateway logs
// TODO: refactor in a more declarative design. Example: storing the list of contract and events to listen in an array
// https://github.com/zeta-chain/node/issues/2493
Expand Down
85 changes: 58 additions & 27 deletions zetaclient/chains/evm/observer/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package observer_test
import (
"context"
"encoding/hex"
"errors"
"testing"

ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/onrik/ethrpc"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
Expand All @@ -17,6 +19,7 @@ import (
"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/zetaclient/chains/evm"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/config"
"github.com/zeta-chain/node/zetaclient/testutils"
"github.com/zeta-chain/node/zetaclient/testutils/mocks"
Expand Down Expand Up @@ -461,42 +464,70 @@ func Test_ObserveTSSReceiveInBlock(t *testing.T) {
blockNumber := receipt.BlockNumber.Uint64()
block := testutils.LoadEVMBlock(t, TestDataDir, chainID, blockNumber, true)

// create mock client
evmClient := mocks.NewMockEvmClient()
evmJSONRPC := mocks.NewMockJSONRPCClient()
// create mock zetacore client
tss := mocks.NewTSSMainnet()
lastBlock := receipt.BlockNumber.Uint64() + confirmation

zetacoreClient := mocks.NewZetacoreClient(t).
WithKeys(&keys.Keys{}).
WithZetaChain().
WithPostVoteInbound("", "").
WithPostVoteInbound("", "")

ctx := context.Background()

t.Run("should observe TSS receive in block", func(t *testing.T) {
ob, _ := MockEVMObserver(t, chain, evmClient, evmJSONRPC, zetacoreClient, tss, lastBlock, chainParam)
// test cases
tests := []struct {
name string
evmClient interfaces.EVMRPCClient
jsonClient interfaces.EVMJSONRPCClient
errMsg string
}{
{
name: "should observe TSS receive in block",
evmClient: func() interfaces.EVMRPCClient {
// feed block number and receipt to mock client
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(1000), nil)
evmClient.On("TransactionReceipt", mock.Anything, mock.Anything).Return(receipt, nil)
return evmClient
}(),
jsonClient: mocks.NewMockJSONRPCClient().WithBlock(block),
errMsg: "",
},
{
name: "should not observe on error getting block",
evmClient: func() interfaces.EVMRPCClient {
// feed block number to allow construction of observer
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(1000), nil)
return evmClient
}(),
jsonClient: mocks.NewMockJSONRPCClient(), // no block
errMsg: "error getting block",
},
{
name: "should not observe on error getting receipt",
evmClient: func() interfaces.EVMRPCClient {
// feed block number but RPC error on getting receipt
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(1000), nil)
evmClient.On("TransactionReceipt", mock.Anything, mock.Anything).Return(nil, errors.New("RPC error"))
return evmClient
}(),
jsonClient: mocks.NewMockJSONRPCClient().WithBlock(block),
errMsg: "error getting receipt",
},
}

// feed archived block and receipt
evmJSONRPC.WithBlock(block)
evmClient.WithReceipt(receipt)
err := ob.ObserveTSSReceiveInBlock(ctx, blockNumber)
require.NoError(t, err)
})
t.Run("should not observe on error getting block", func(t *testing.T) {
ob, _ := MockEVMObserver(t, chain, evmClient, evmJSONRPC, zetacoreClient, tss, lastBlock, chainParam)
err := ob.ObserveTSSReceiveInBlock(ctx, blockNumber)
// error getting block is expected because the mock JSONRPC contains no block
require.ErrorContains(t, err, "error getting block")
})
t.Run("should not observe on error getting receipt", func(t *testing.T) {
ob, _ := MockEVMObserver(t, chain, evmClient, evmJSONRPC, zetacoreClient, tss, lastBlock, chainParam)
evmJSONRPC.WithBlock(block)
err := ob.ObserveTSSReceiveInBlock(ctx, blockNumber)
// error getting block is expected because the mock evmClient contains no receipt
require.ErrorContains(t, err, "error getting receipt")
})
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ob, _ := MockEVMObserver(t, chain, tt.evmClient, tt.jsonClient, zetacoreClient, tss, lastBlock, chainParam)
err := ob.ObserveTSSReceiveInBlock(context.Background(), blockNumber)
if tt.errMsg != "" {
require.ErrorContains(t, err, tt.errMsg)
} else {
require.NoError(t, err)
}
})
}
}

func makeAppContext(t *testing.T) (context.Context, *zctx.AppContext) {
Expand Down
20 changes: 11 additions & 9 deletions zetaclient/chains/evm/observer/observer_gas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func TestPostGasPrice(t *testing.T) {
t.Run("Pre EIP-1559 doesn't support priorityFee", func(t *testing.T) {
// ARRANGE
// Given ETH rpc mock
ethRPC := mocks.NewMockEvmClient().WithBlockNumber(blockNumber)
ethRPC := mocks.NewEVMRPCClient(t)
ethRPC.On("BlockNumber", mock.Anything).Return(uint64(blockNumber), nil)

// Given zetacore client mock
zetacoreClient := mocks.NewZetacoreClient(t).WithZetaChain()
Expand All @@ -37,10 +38,11 @@ func TestPostGasPrice(t *testing.T) {
observer, _ := MockEVMObserver(t, chain, ethRPC, nil, zetacoreClient, nil, blockNumber, chainParam)

// Given empty baseFee from RPC
ethRPC.WithHeader(&ethtypes.Header{BaseFee: nil})
ethRPC.On("HeaderByNumber", anything, anything).Return(&ethtypes.Header{BaseFee: nil}, nil)

// Given gas price from RPC
ethRPC.WithSuggestGasPrice(big.NewInt(3 * gwei))
// Given gasPrice and priorityFee from RPC
ethRPC.On("SuggestGasPrice", anything).Return(big.NewInt(3*gwei), nil)
ethRPC.On("SuggestGasTipCap", anything).Return(big.NewInt(0), nil)

// Given mock collector for zetacore call
// PostVoteGasPrice(ctx, chain, gasPrice, priorityFee, blockNum)
Expand Down Expand Up @@ -69,7 +71,8 @@ func TestPostGasPrice(t *testing.T) {
t.Run("Post EIP-1559 supports priorityFee", func(t *testing.T) {
// ARRANGE
// Given ETH rpc mock
ethRPC := mocks.NewMockEvmClient().WithBlockNumber(blockNumber)
ethRPC := mocks.NewEVMRPCClient(t)
ethRPC.On("BlockNumber", mock.Anything).Return(uint64(blockNumber), nil)

// Given zetacore client mock
zetacoreClient := mocks.NewZetacoreClient(t).WithZetaChain()
Expand All @@ -82,12 +85,11 @@ func TestPostGasPrice(t *testing.T) {
observer, _ := MockEVMObserver(t, chain, ethRPC, nil, zetacoreClient, nil, blockNumber, chainParam)

// Given 1 gwei baseFee from RPC
ethRPC.WithHeader(&ethtypes.Header{BaseFee: big.NewInt(gwei)})
ethRPC.On("HeaderByNumber", anything, anything).Return(&ethtypes.Header{BaseFee: big.NewInt(gwei)}, nil)

// Given gasPrice and priorityFee from RPC
ethRPC.
WithSuggestGasPrice(big.NewInt(3 * gwei)).
WithSuggestGasTipCap(big.NewInt(2 * gwei))
ethRPC.On("SuggestGasPrice", anything).Return(big.NewInt(3*gwei), nil)
ethRPC.On("SuggestGasTipCap", anything).Return(big.NewInt(2*gwei), nil)

// Given mock collector for zetacore call
// PostVoteGasPrice(ctx, chain, gasPrice, priorityFee, blockNum)
Expand Down
50 changes: 33 additions & 17 deletions zetaclient/chains/evm/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/onrik/ethrpc"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/zeta-chain/node/pkg/ptr"
zctx "github.com/zeta-chain/node/zetaclient/context"
Expand Down Expand Up @@ -97,7 +98,9 @@ func MockEVMObserver(

// use default mock evm client if not provided
if evmClient == nil {
evmClient = mocks.NewMockEvmClient().WithBlockNumber(1000)
evmClientDefault := mocks.NewEVMRPCClient(t)
evmClientDefault.On("BlockNumber", mock.Anything).Return(uint64(1000), nil)
evmClient = evmClientDefault
}

// use default mock evm client if not provided
Expand Down Expand Up @@ -152,6 +155,10 @@ func Test_NewObserver(t *testing.T) {
chain := chains.Ethereum
params := mocks.MockChainParams(chain.ChainId, 10)

// create evm client with mocked block number 1000
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(1000), nil)

// test cases
tests := []struct {
name string
Expand All @@ -174,7 +181,7 @@ func Test_NewObserver(t *testing.T) {
Endpoint: "http://localhost:8545",
},
chainParams: params,
evmClient: mocks.NewMockEvmClient().WithBlockNumber(1000),
evmClient: evmClient,
evmJSONRPC: mocks.NewMockJSONRPCClient(),
tss: mocks.NewTSSMainnet(),
logger: base.Logger{},
Expand All @@ -188,13 +195,18 @@ func Test_NewObserver(t *testing.T) {
Endpoint: "http://localhost:8545",
},
chainParams: params,
evmClient: mocks.NewMockEvmClient().WithError(fmt.Errorf("error RPC")),
evmJSONRPC: mocks.NewMockJSONRPCClient(),
tss: mocks.NewTSSMainnet(),
logger: base.Logger{},
ts: nil,
fail: true,
message: "error RPC",
evmClient: func() interfaces.EVMRPCClient {
// create mock evm client with RPC error
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(0), fmt.Errorf("error RPC"))
return evmClient
}(),
evmJSONRPC: mocks.NewMockJSONRPCClient(),
tss: mocks.NewTSSMainnet(),
logger: base.Logger{},
ts: nil,
fail: true,
message: "error RPC",
},
{
name: "should fail on invalid ENV var",
Expand All @@ -203,7 +215,7 @@ func Test_NewObserver(t *testing.T) {
Endpoint: "http://localhost:8545",
},
chainParams: params,
evmClient: mocks.NewMockEvmClient().WithBlockNumber(1000),
evmClient: evmClient,
evmJSONRPC: mocks.NewMockJSONRPCClient(),
tss: mocks.NewTSSMainnet(),
before: func() {
Expand All @@ -224,8 +236,7 @@ func Test_NewObserver(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create AppContext, client and tss
//zetacoreCtx, _ := getAppContext(tt.evmCfg.Chain, tt.evmCfg.Endpoint, &params)
// create mock zetacore client
zetacoreClient := mocks.NewZetacoreClient(t)

database, err := db.NewFromSqliteInMemory(true)
Expand Down Expand Up @@ -272,7 +283,8 @@ func Test_LoadLastBlockScanned(t *testing.T) {
params := mocks.MockChainParams(chain.ChainId, 10)

// create observer using mock evm client
evmClient := mocks.NewMockEvmClient().WithBlockNumber(100)
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(100), nil)
ob, _ := MockEVMObserver(t, chain, evmClient, nil, nil, nil, 1, params)

t.Run("should load last block scanned", func(t *testing.T) {
Expand Down Expand Up @@ -301,8 +313,12 @@ func Test_LoadLastBlockScanned(t *testing.T) {
// reset last block scanned to 0 so that it will be loaded from RPC
obOther.WithLastBlockScanned(0)

// set RPC error
evmClient.WithError(fmt.Errorf("error RPC"))
// create mock evm client with RPC error
evmClient := mocks.NewEVMRPCClient(t)
evmClient.On("BlockNumber", mock.Anything).Return(uint64(0), fmt.Errorf("error RPC"))

// attach mock evm client to observer
obOther.WithEvmClient(evmClient)

// load last block scanned
err := obOther.LoadLastBlockScanned(ctx)
Expand Down Expand Up @@ -389,12 +405,12 @@ func Test_HeaderCache(t *testing.T) {
ob.WithHeaderCache(headerCache)

// create mock evm client
evmClient := mocks.NewMockEvmClient()
evmClient := mocks.NewEVMRPCClient(t)
ob.WithEvmClient(evmClient)

// feed block header to evm client
header := &ethtypes.Header{Number: big.NewInt(100)}
evmClient.WithHeader(header)
evmClient.On("HeaderByNumber", mock.Anything, mock.Anything).Return(header, nil)

// get block header from observer
resHeader, err := ob.GetBlockHeaderCached(ctx, uint64(100))
Expand Down
36 changes: 36 additions & 0 deletions zetaclient/chains/evm/observer/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,42 @@ func ParseAndCheckWithdrawnEvent(
return nil, errors.New("no ERC20 Withdrawn event found")
}

// FilterTSSOutbound filters the outbounds from TSS address to supplement outbound trackers
func (ob *Observer) FilterTSSOutbound(ctx context.Context, startBlock, toBlock uint64) {
// filters the outbounds from TSS address block by block
for bn := startBlock; bn <= toBlock; bn++ {
ob.FilterTSSOutboundInBlock(ctx, bn)
}
}

// FilterTSSOutboundInBlock filters the outbounds in a single block to supplement outbound trackers
func (ob *Observer) FilterTSSOutboundInBlock(ctx context.Context, blockNumber uint64) {
// query block and ignore error (we don't rescan as we are only supplementing outbound trackers)
block, err := ob.GetBlockByNumberCached(blockNumber)
if err != nil {
ob.Logger().
Outbound.Error().
Err(err).
Msgf("error getting block %d for chain %d", blockNumber, ob.Chain().ChainId)
return
}

for i := range block.Transactions {
tx := block.Transactions[i]
if ethcommon.HexToAddress(tx.From) == ob.TSS().EVMAddress() {
nonce := uint64(tx.Nonce)
if !ob.IsTxConfirmed(nonce) {
if receipt, txx, ok := ob.checkConfirmedTx(ctx, tx.Hash, nonce); ok {
ob.SetTxNReceipt(nonce, receipt, txx)
ob.Logger().
Outbound.Info().
Msgf("TSS outbound detected on chain %d nonce %d tx %s", ob.Chain().ChainId, nonce, tx.Hash)
}
}
}
}
}

// checkConfirmedTx checks if a txHash is confirmed
// returns (receipt, transaction, true) if confirmed or (nil, nil, false) otherwise
func (ob *Observer) checkConfirmedTx(
Expand Down
Loading

0 comments on commit 1929ff6

Please sign in to comment.