From 99af29ba7fafcbd0449438fd79141955ca66cdaa Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Mon, 19 Feb 2024 22:14:19 +0100 Subject: [PATCH 1/6] fix: test race --- .../evm/functions/logpoller_wrapper_test.go | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index c68739f45a4..60dc0e03ffe 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -60,10 +60,12 @@ func newSubscriber(expectedCalls int) *subscriber { return sub } -func addr(t *testing.T, lastByte string) []byte { +func addr(lastByte string) ([]byte, error) { contractAddr, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000" + lastByte) - require.NoError(t, err) - return contractAddr + if err != nil { + return []byte{}, err + } + return contractAddr, nil } func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.LogPollerWrapper, *evmclimocks.Client) { @@ -74,9 +76,11 @@ func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.L ContractUpdateCheckFrequencySec: updateFrequencySec, ContractVersion: 1, } - routerAddressBytes = addr(t, "01") + routerAddressBytes, err := addr("01") + require.NoError(t, err) routerAddressHex = common.BytesToAddress(routerAddressBytes) - coordinatorAddressBytes = addr(t, "02") + coordinatorAddressBytes, err = addr("02") + require.NoError(t, err) coordinatorAddressHex = common.BytesToAddress(coordinatorAddressBytes) lpWrapper, err := NewLogPollerWrapper(routerAddressHex, config, client, lp, lggr) require.NoError(t, err) @@ -132,7 +136,7 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById To: &routerAddressHex, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) + }, mock.Anything).Return(addr("00")) lp.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) @@ -163,7 +167,7 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById To: &routerAddressHex, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) + }, mock.Anything).Return(addr("00")) lp.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) @@ -186,7 +190,7 @@ func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { lp, lpWrapper, client := setUp(t, 100_000) // check only once lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "00"), nil) + client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr("00")) servicetest.Run(t, lpWrapper) _, _, err := lpWrapper.LatestEvents() @@ -203,7 +207,7 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById To: &routerAddressHex, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) + }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion @@ -248,7 +252,7 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV2(t *testing.T) { client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById To: &routerAddressHex, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) + }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion From 53888467f1248f61130f2c6c7fd64d23bd3512d7 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Mon, 19 Feb 2024 13:20:30 -0800 Subject: [PATCH 2/6] Revert "(test): Run LogPollerWrapperTest v1/v2 tests sequentially" This reverts commit 1ba2c06d5be136c30f00b4a97a6b349852bbbb93. --- .../relay/evm/functions/logpoller_wrapper_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index 60dc0e03ffe..ef3cba72a09 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -125,6 +125,7 @@ func getMockedRequestLogV2(t *testing.T) logpoller.Log { } func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T) { + t.Parallel() lp, lpWrapper, client := setUp(t, 100_000) // check only once lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) @@ -156,6 +157,7 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T } func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T) { + t.Parallel() lp, lpWrapper, client := setUp(t, 100_000) // check only once lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) @@ -187,6 +189,7 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T } func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { + t.Parallel() lp, lpWrapper, client := setUp(t, 100_000) // check only once lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) @@ -198,6 +201,7 @@ func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { + t.Parallel() lp, lpWrapper, client := setUp(t, 100_000) lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById @@ -243,6 +247,7 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV2(t *testing.T) { + t.Parallel() lp, lpWrapper, client := setUp(t, 100_000) lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById @@ -288,6 +293,7 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV2(t *testing.T) { } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testing.T) { + t.Parallel() _, lpWrapper, _ := setUp(t, 100_000) inputLogs := make([]logpoller.Log, maxLogsToProcess+100) @@ -305,6 +311,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testin } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *testing.T) { + t.Parallel() _, lpWrapper, _ := setUp(t, 100_000) inputLogs := []logpoller.Log{getMockedRequestLogV1(t)} inputLogs[0].Topics = [][]byte{[]byte("invalid topic")} @@ -319,6 +326,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *test } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetectedEvent(t *testing.T) { + t.Parallel() _, lpWrapper, _ := setUp(t, 100_000) mockedRequestLog := getMockedRequestLogV1(t) inputLogs := []logpoller.Log{mockedRequestLog} From 561b8fb7236e00680aa04040b87467cefa1e8102 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Fri, 16 Feb 2024 10:00:39 -0300 Subject: [PATCH 3/6] chore: refactor logpoller_wrapper by injecting a coordinator dep --- .../relay/evm/functions/coordinator_v1.go | 159 +++++++++ .../relay/evm/functions/coordinator_v2.go | 158 +++++++++ .../relay/evm/functions/logpoller_wrapper.go | 329 +++--------------- 3 files changed, 361 insertions(+), 285 deletions(-) create mode 100644 core/services/relay/evm/functions/coordinator_v1.go create mode 100644 core/services/relay/evm/functions/coordinator_v2.go diff --git a/core/services/relay/evm/functions/coordinator_v1.go b/core/services/relay/evm/functions/coordinator_v1.go new file mode 100644 index 00000000000..506d14c9504 --- /dev/null +++ b/core/services/relay/evm/functions/coordinator_v1.go @@ -0,0 +1,159 @@ +package functions + +import ( + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator_1_1_0" + "github.com/smartcontractkit/chainlink/v2/core/logger" + evmRelayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +type CoordinatorV1 struct { + address common.Address + + client client.Client + logPoller logpoller.LogPoller + lggr logger.Logger +} + +func NewCoordinatorV1(address common.Address, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) *CoordinatorV1 { + return &CoordinatorV1{ + address: address, + client: client, + logPoller: logPoller, + lggr: lggr, + } +} + +func (c *CoordinatorV1) Address() common.Address { + return c.address +} + +func (c *CoordinatorV1) RegisterFilters() error { + if (c.address == common.Address{}) { + return nil + } + + return c.logPoller.RegisterFilter( + logpoller.Filter{ + Name: logpoller.FilterName("FunctionsLogPollerWrapper", c.address.String(), "-v", "1"), + EventSigs: []common.Hash{ + functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), + functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), + }, + Addresses: []common.Address{c.address}, + }) +} + +func (c *CoordinatorV1) OracleRequestLogTopic() (common.Hash, error) { + return functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), nil +} + +func (c *CoordinatorV1) OracleResponseLogTopic() (common.Hash, error) { + return functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), nil +} + +func (c *CoordinatorV1) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { + var requests []evmRelayTypes.OracleRequest + + uint32Type, errType1 := abi.NewType("uint32", "uint32", nil) + uint40Type, errType2 := abi.NewType("uint40", "uint40", nil) + uint64Type, errType3 := abi.NewType("uint64", "uint64", nil) + uint72Type, errType4 := abi.NewType("uint72", "uint72", nil) + uint96Type, errType5 := abi.NewType("uint96", "uint96", nil) + addressType, errType6 := abi.NewType("address", "address", nil) + bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) + + if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { + c.lggr.Errorw("LogsToRequests: failed to initialize types", "errType1", errType1, + "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, + ) + } + + parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) + if err != nil { + return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed") + } + + for _, log := range requestLogs { + gethLog := log.ToGethLog() + oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) + continue + } + + commitmentABIV1 := abi.Arguments{ + {Type: bytes32Type}, // RequestId + {Type: addressType}, // Coordinator + {Type: uint96Type}, // EstimatedTotalCostJuels + {Type: addressType}, // Client + {Type: uint64Type}, // SubscriptionId + {Type: uint32Type}, // CallbackGasLimit + {Type: uint72Type}, // AdminFee + {Type: uint72Type}, // DonFee + {Type: uint40Type}, // GasOverheadBeforeCallback + {Type: uint40Type}, // GasOverheadAfterCallback + {Type: uint32Type}, // TimeoutTimestamp + } + + commitmentBytesV1, err := commitmentABIV1.Pack( + oracleRequest.Commitment.RequestId, + oracleRequest.Commitment.Coordinator, + oracleRequest.Commitment.EstimatedTotalCostJuels, + oracleRequest.Commitment.Client, + oracleRequest.Commitment.SubscriptionId, + oracleRequest.Commitment.CallbackGasLimit, + oracleRequest.Commitment.AdminFee, + oracleRequest.Commitment.DonFee, + oracleRequest.Commitment.GasOverheadBeforeCallback, + oracleRequest.Commitment.GasOverheadAfterCallback, + oracleRequest.Commitment.TimeoutTimestamp, + ) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to pack Coordinator v1 commitment bytes, skipping", err) + } + + OracleRequestV1 := evmRelayTypes.OracleRequest{ + RequestId: oracleRequest.RequestId, + RequestingContract: oracleRequest.RequestingContract, + RequestInitiator: oracleRequest.RequestInitiator, + SubscriptionId: oracleRequest.SubscriptionId, + SubscriptionOwner: oracleRequest.SubscriptionOwner, + Data: oracleRequest.Data, + DataVersion: oracleRequest.DataVersion, + Flags: oracleRequest.Flags, + CallbackGasLimit: oracleRequest.CallbackGasLimit, + TxHash: oracleRequest.Raw.TxHash, + OnchainMetadata: commitmentBytesV1, + CoordinatorContract: c.address, + } + + requests = append(requests, OracleRequestV1) + } + return requests, nil +} + +func (c *CoordinatorV1) LogsToResponses(responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) { + var responses []evmRelayTypes.OracleResponse + + parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) + if err != nil { + return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") + } + for _, log := range responseLogs { + gethLog := log.ToGethLog() + oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) + if err != nil { + c.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") + continue + } + responses = append(responses, evmRelayTypes.OracleResponse{ + RequestId: oracleResponse.RequestId, + }) + } + return responses, nil +} diff --git a/core/services/relay/evm/functions/coordinator_v2.go b/core/services/relay/evm/functions/coordinator_v2.go new file mode 100644 index 00000000000..513344c9052 --- /dev/null +++ b/core/services/relay/evm/functions/coordinator_v2.go @@ -0,0 +1,158 @@ +package functions + +import ( + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator" + "github.com/smartcontractkit/chainlink/v2/core/logger" + evmRelayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +type CoordinatorV2 struct { + address common.Address + + client client.Client + logPoller logpoller.LogPoller + lggr logger.Logger +} + +func NewCoordinatorV2(address common.Address, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) *CoordinatorV2 { + return &CoordinatorV2{ + address: address, + client: client, + logPoller: logPoller, + lggr: lggr, + } +} + +func (c *CoordinatorV2) Address() common.Address { + return c.address +} + +func (c *CoordinatorV2) RegisterFilters() error { + if (c.address == common.Address{}) { + return nil + } + + return c.logPoller.RegisterFilter( + logpoller.Filter{ + Name: logpoller.FilterName("FunctionsLogPollerWrapper", c.address.String(), "-v", "2"), + EventSigs: []common.Hash{ + functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), + functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), + }, + Addresses: []common.Address{c.address}, + }) +} +func (c *CoordinatorV2) OracleRequestLogTopic() (common.Hash, error) { + return functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), nil +} +func (c *CoordinatorV2) OracleResponseLogTopic() (common.Hash, error) { + return functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), nil +} +func (c *CoordinatorV2) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { + var requests []evmRelayTypes.OracleRequest + + uint32Type, errType1 := abi.NewType("uint32", "uint32", nil) + uint40Type, errType2 := abi.NewType("uint40", "uint40", nil) + uint64Type, errType3 := abi.NewType("uint64", "uint64", nil) + uint72Type, errType4 := abi.NewType("uint72", "uint72", nil) + uint96Type, errType5 := abi.NewType("uint96", "uint96", nil) + addressType, errType6 := abi.NewType("address", "address", nil) + bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) + + if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { + c.lggr.Errorw("LogsToRequests: failed to initialize types", "errType1", errType1, + "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, + ) + } + + parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) + if err != nil { + return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed") + } + + for _, log := range requestLogs { + gethLog := log.ToGethLog() + oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) + continue + } + + commitmentABIV2 := abi.Arguments{ + {Type: bytes32Type}, // RequestId + {Type: addressType}, // Coordinator + {Type: uint96Type}, // EstimatedTotalCostJuels + {Type: addressType}, // Client + {Type: uint64Type}, // SubscriptionId + {Type: uint32Type}, // CallbackGasLimit + {Type: uint72Type}, // AdminFee + {Type: uint72Type}, // DonFee + {Type: uint40Type}, // GasOverheadBeforeCallback + {Type: uint40Type}, // GasOverheadAfterCallback + {Type: uint32Type}, // TimeoutTimestamp + {Type: uint72Type}, // OperationFee + } + + commitmentBytesV2, err := commitmentABIV2.Pack( + oracleRequest.Commitment.RequestId, + oracleRequest.Commitment.Coordinator, + oracleRequest.Commitment.EstimatedTotalCostJuels, + oracleRequest.Commitment.Client, + oracleRequest.Commitment.SubscriptionId, + oracleRequest.Commitment.CallbackGasLimit, + oracleRequest.Commitment.AdminFee, + oracleRequest.Commitment.DonFee, + oracleRequest.Commitment.GasOverheadBeforeCallback, + oracleRequest.Commitment.GasOverheadAfterCallback, + oracleRequest.Commitment.TimeoutTimestamp, + oracleRequest.Commitment.OperationFee, + ) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to pack Coordinator v2 commitment bytes, skipping", err) + } + + OracleRequestV2 := evmRelayTypes.OracleRequest{ + RequestId: oracleRequest.RequestId, + RequestingContract: oracleRequest.RequestingContract, + RequestInitiator: oracleRequest.RequestInitiator, + SubscriptionId: oracleRequest.SubscriptionId, + SubscriptionOwner: oracleRequest.SubscriptionOwner, + Data: oracleRequest.Data, + DataVersion: oracleRequest.DataVersion, + Flags: oracleRequest.Flags, + CallbackGasLimit: oracleRequest.CallbackGasLimit, + TxHash: oracleRequest.Raw.TxHash, + OnchainMetadata: commitmentBytesV2, + CoordinatorContract: c.address, + } + + requests = append(requests, OracleRequestV2) + } + return requests, nil +} + +func (c *CoordinatorV2) LogsToResponses(responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) { + var responses []evmRelayTypes.OracleResponse + + parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) + if err != nil { + return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") + } + for _, log := range responseLogs { + gethLog := log.ToGethLog() + oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) + if err != nil { + c.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") + continue + } + responses = append(responses, evmRelayTypes.OracleResponse{ + RequestId: oracleResponse.RequestId, + }) + } + return responses, nil +} diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 73c32be7db8..87d758e09dc 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" @@ -15,8 +14,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator_1_1_0" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" type_and_version "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/type_and_version_interface_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -33,8 +30,8 @@ type logPollerWrapper struct { client client.Client logPoller logpoller.LogPoller subscribers map[string]evmRelayTypes.RouteUpdateSubscriber - activeCoordinator coordinator - proposedCoordinator coordinator + activeCoordinator Coordinator + proposedCoordinator Coordinator requestBlockOffset int64 responseBlockOffset int64 pastBlocksToPoll int64 @@ -57,18 +54,18 @@ type detectedEvents struct { detectedEventsOrdered []detectedEvent } -type coordinator struct { - address common.Address - typeAndVersion string +type Coordinator interface { + Address() common.Address + RegisterFilters() error + OracleRequestLogTopic() (common.Hash, error) + OracleResponseLogTopic() (common.Hash, error) + LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) + LogsToResponses(responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) } const FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING = "Functions Coordinator v1" const FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING = "Functions Coordinator v2" -type Functions_Coordinator interface { - *functions_coordinator_1_1_0.FunctionsCoordinator110 | *functions_coordinator.FunctionsCoordinator -} - const logPollerCacheDurationSecDefault = 300 const pastBlocksToPollDefault = 50 const maxLogsToProcess = 1000 @@ -159,11 +156,11 @@ func (l *logPollerWrapper) Name() string { return l.lggr.Name() } // methods of LogPollerWrapper func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmRelayTypes.OracleResponse, error) { l.mu.Lock() - coordinators := []coordinator{} - if l.activeCoordinator.address != (common.Address{}) { + coordinators := []Coordinator{} + if l.activeCoordinator.Address() != (common.Address{}) { coordinators = append(coordinators, l.activeCoordinator) } - if l.proposedCoordinator.address != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { + if l.proposedCoordinator.Address() != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { coordinators = append(coordinators, l.proposedCoordinator) } latest, err := l.logPoller.LatestBlock() @@ -188,12 +185,12 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR for _, coordinator := range coordinators { requestEndBlock := latestBlockNum - l.requestBlockOffset - requestLogTopic, err := oracleRequestLogTopic(coordinator) + requestLogTopic, err := coordinator.OracleRequestLogTopic() if err != nil { l.lggr.Errorw("LatestEvents: ", err) return nil, nil, err } - requestLogs, err := l.logPoller.Logs(startBlockNum, requestEndBlock, requestLogTopic, coordinator.address) + requestLogs, err := l.logPoller.Logs(startBlockNum, requestEndBlock, requestLogTopic, coordinator.Address()) if err != nil { l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", requestEndBlock) return nil, nil, err @@ -201,12 +198,12 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debugw("LatestEvents: fetched request logs", "nRequestLogs", len(requestLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", requestEndBlock) requestLogs = l.filterPreviouslyDetectedEvents(requestLogs, &l.detectedRequests, "requests") responseEndBlock := latestBlockNum - l.responseBlockOffset - responseLogTopic, err := oracleResponseLogTopic(coordinator) + responseLogTopic, err := coordinator.OracleResponseLogTopic() if err != nil { l.lggr.Errorw("LatestEvents: ", err) return nil, nil, err } - responseLogs, err := l.logPoller.Logs(startBlockNum, responseEndBlock, responseLogTopic, coordinator.address) + responseLogs, err := l.logPoller.Logs(startBlockNum, responseEndBlock, responseLogTopic, coordinator.Address()) if err != nil { l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", responseEndBlock) return nil, nil, err @@ -214,13 +211,13 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debugw("LatestEvents: fetched request logs", "nResponseLogs", len(responseLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", responseEndBlock) responseLogs = l.filterPreviouslyDetectedEvents(responseLogs, &l.detectedResponses, "responses") - l.lggr.Debugw("LatestEvents: parsing logs", "nRequestLogs", len(requestLogs), "nResponseLogs", len(responseLogs), "coordinatorAddress", coordinator.address.Hex()) - requests, err := l.logsToRequests(coordinator, requestLogs) + l.lggr.Debugw("LatestEvents: parsing logs", "nRequestLogs", len(requestLogs), "nResponseLogs", len(responseLogs), "coordinatorAddress", coordinator.Address().Hex()) + requests, err := coordinator.LogsToRequests(requestLogs) if err != nil { return nil, nil, err } resultsReq = append(resultsReq, requests...) - responses, err := l.logsToResponses(coordinator, responseLogs) + responses, err := coordinator.LogsToResponses(responseLogs) if err != nil { return nil, nil, err } @@ -333,7 +330,7 @@ func (l *logPollerWrapper) getCurrentCoordinators(ctx context.Context) (common.A Context: ctx, }, donId) if err != nil { - return activeCoordinatorAddress, l.proposedCoordinator.address, nil + return activeCoordinatorAddress, l.proposedCoordinator.Address(), nil } return activeCoordinatorAddress, proposedCoordinator, nil @@ -369,7 +366,7 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinatorAddress common.Add return } - if activeCoordinatorAddress == l.activeCoordinator.address && proposedCoordinatorAddress == l.proposedCoordinator.address { + if activeCoordinatorAddress == l.activeCoordinator.Address() && proposedCoordinatorAddress == l.proposedCoordinator.Address() { l.lggr.Debug("LogPollerWrapper: no changes to routes") return } @@ -378,282 +375,44 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinatorAddress common.Add if err != nil { return } - activeCoordinator := coordinator{address: activeCoordinatorAddress, typeAndVersion: activeCoordinatorTypeAndVersion} + var activeCoordinator Coordinator + if strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { + activeCoordinator = NewCoordinatorV1(activeCoordinatorAddress, l.client, l.logPoller, l.lggr) + } else if strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { + activeCoordinator = NewCoordinatorV2(activeCoordinatorAddress, l.client, l.logPoller, l.lggr) + } + + err = activeCoordinator.RegisterFilters() + if err != nil { + l.lggr.Errorw("LogPollerWrapper: Failed to register active coordinator filters", err) + return + } proposedCoordinatorTypeAndVersion, err := l.getTypeAndVersion(proposedCoordinatorAddress) if err != nil { return } - proposedCoordinator := coordinator{address: proposedCoordinatorAddress, typeAndVersion: proposedCoordinatorTypeAndVersion} + var proposedCoordinator Coordinator + if strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { + proposedCoordinator = NewCoordinatorV1(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) + } else if strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { + proposedCoordinator = NewCoordinatorV2(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) + } - errActive := l.registerFilters(activeCoordinator) - errProposed := l.registerFilters(proposedCoordinator) - if errActive != nil || errProposed != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to register filters", "errorActive", errActive, "errorProposed", errProposed) + err = proposedCoordinator.RegisterFilters() + if err != nil { + l.lggr.Errorw("LogPollerWrapper: Failed to register proposed coordinator filters", err) return } - l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.address.Hex(), "proposedCoordinator", proposedCoordinator.address.Hex()) + l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Address().Hex(), "proposedCoordinator", proposedCoordinator.Address().Hex()) l.activeCoordinator = activeCoordinator l.proposedCoordinator = proposedCoordinator for _, subscriber := range l.subscribers { - err := subscriber.UpdateRoutes(activeCoordinator.address, proposedCoordinator.address) + err := subscriber.UpdateRoutes(activeCoordinator.Address(), proposedCoordinator.Address()) if err != nil { l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err) } } } - -func filterName(addr common.Address, version string) string { - return logpoller.FilterName("FunctionsLogPollerWrapper", addr.String(), "-v", version) -} - -func (l *logPollerWrapper) registerFilters(coordinator coordinator) error { - if (coordinator.address == common.Address{}) { - return nil - } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - return l.logPoller.RegisterFilter( - logpoller.Filter{ - Name: filterName(coordinator.address, "1"), - EventSigs: []common.Hash{ - functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), - functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), - }, - Addresses: []common.Address{coordinator.address}, - }) - } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - return l.logPoller.RegisterFilter( - logpoller.Filter{ - Name: filterName(coordinator.address, "2"), - EventSigs: []common.Hash{ - functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), - functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), - }, - Addresses: []common.Address{coordinator.address}, - }) - } - - l.lggr.Errorw("RegisterFilters: Unsupported Coordinator version ", coordinator.typeAndVersion) - return errors.Errorf("RegisterFilters: Unsupported Coordinator version") -} - -func oracleRequestLogTopic(coordinator coordinator) (common.Hash, error) { - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - return functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), nil - } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - return functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), nil - } - return common.Hash{}, errors.Errorf("OracleRequestLogTopic: Unsupported Coordinator version %s", coordinator.typeAndVersion) -} - -func oracleResponseLogTopic(coordinator coordinator) (common.Hash, error) { - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - return functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), nil - } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - return functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), nil - } - return common.Hash{}, errors.Errorf("OracleResponseLogTopic: Unsupported Coordinator version %s", coordinator.typeAndVersion) -} - -func (l *logPollerWrapper) logsToRequests(coordinator coordinator, requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { - var requests []evmRelayTypes.OracleRequest - - uint32Type, errType1 := abi.NewType("uint32", "uint32", nil) - uint40Type, errType2 := abi.NewType("uint40", "uint40", nil) - uint64Type, errType3 := abi.NewType("uint64", "uint64", nil) - uint72Type, errType4 := abi.NewType("uint72", "uint72", nil) - uint96Type, errType5 := abi.NewType("uint96", "uint96", nil) - addressType, errType6 := abi.NewType("address", "address", nil) - bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) - - if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { - l.lggr.Errorw("LogsToRequests: failed to initialize types", "errType1", errType1, - "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, - ) - } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(coordinator.address, l.client) - if err != nil { - return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed") - } - - for _, log := range requestLogs { - gethLog := log.ToGethLog() - oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) - continue - } - - commitmentABIV1 := abi.Arguments{ - {Type: bytes32Type}, // RequestId - {Type: addressType}, // Coordinator - {Type: uint96Type}, // EstimatedTotalCostJuels - {Type: addressType}, // Client - {Type: uint64Type}, // SubscriptionId - {Type: uint32Type}, // CallbackGasLimit - {Type: uint72Type}, // AdminFee - {Type: uint72Type}, // DonFee - {Type: uint40Type}, // GasOverheadBeforeCallback - {Type: uint40Type}, // GasOverheadAfterCallback - {Type: uint32Type}, // TimeoutTimestamp - } - - commitmentBytesV1, err := commitmentABIV1.Pack( - oracleRequest.Commitment.RequestId, - oracleRequest.Commitment.Coordinator, - oracleRequest.Commitment.EstimatedTotalCostJuels, - oracleRequest.Commitment.Client, - oracleRequest.Commitment.SubscriptionId, - oracleRequest.Commitment.CallbackGasLimit, - oracleRequest.Commitment.AdminFee, - oracleRequest.Commitment.DonFee, - oracleRequest.Commitment.GasOverheadBeforeCallback, - oracleRequest.Commitment.GasOverheadAfterCallback, - oracleRequest.Commitment.TimeoutTimestamp, - ) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to pack Coordinator v1 commitment bytes, skipping", err) - } - - OracleRequestV1 := evmRelayTypes.OracleRequest{ - RequestId: oracleRequest.RequestId, - RequestingContract: oracleRequest.RequestingContract, - RequestInitiator: oracleRequest.RequestInitiator, - SubscriptionId: oracleRequest.SubscriptionId, - SubscriptionOwner: oracleRequest.SubscriptionOwner, - Data: oracleRequest.Data, - DataVersion: oracleRequest.DataVersion, - Flags: oracleRequest.Flags, - CallbackGasLimit: oracleRequest.CallbackGasLimit, - TxHash: oracleRequest.Raw.TxHash, - OnchainMetadata: commitmentBytesV1, - CoordinatorContract: coordinator.address, - } - - requests = append(requests, OracleRequestV1) - } - return requests, nil - } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator.address, l.client) - if err != nil { - return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed") - } - - for _, log := range requestLogs { - gethLog := log.ToGethLog() - oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) - continue - } - - commitmentABIV2 := abi.Arguments{ - {Type: bytes32Type}, // RequestId - {Type: addressType}, // Coordinator - {Type: uint96Type}, // EstimatedTotalCostJuels - {Type: addressType}, // Client - {Type: uint64Type}, // SubscriptionId - {Type: uint32Type}, // CallbackGasLimit - {Type: uint72Type}, // AdminFee - {Type: uint72Type}, // DonFee - {Type: uint40Type}, // GasOverheadBeforeCallback - {Type: uint40Type}, // GasOverheadAfterCallback - {Type: uint32Type}, // TimeoutTimestamp - {Type: uint72Type}, // OperationFee - } - - commitmentBytesV2, err := commitmentABIV2.Pack( - oracleRequest.Commitment.RequestId, - oracleRequest.Commitment.Coordinator, - oracleRequest.Commitment.EstimatedTotalCostJuels, - oracleRequest.Commitment.Client, - oracleRequest.Commitment.SubscriptionId, - oracleRequest.Commitment.CallbackGasLimit, - oracleRequest.Commitment.AdminFeeJuels, - oracleRequest.Commitment.DonFeeJuels, - oracleRequest.Commitment.GasOverheadBeforeCallback, - oracleRequest.Commitment.GasOverheadAfterCallback, - oracleRequest.Commitment.TimeoutTimestamp, - oracleRequest.Commitment.OperationFeeJuels, - ) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to pack Coordinator v2 commitment bytes, skipping", err) - } - - OracleRequestV2 := evmRelayTypes.OracleRequest{ - RequestId: oracleRequest.RequestId, - RequestingContract: oracleRequest.RequestingContract, - RequestInitiator: oracleRequest.RequestInitiator, - SubscriptionId: oracleRequest.SubscriptionId, - SubscriptionOwner: oracleRequest.SubscriptionOwner, - Data: oracleRequest.Data, - DataVersion: oracleRequest.DataVersion, - Flags: oracleRequest.Flags, - CallbackGasLimit: oracleRequest.CallbackGasLimit, - TxHash: oracleRequest.Raw.TxHash, - OnchainMetadata: commitmentBytesV2, - CoordinatorContract: coordinator.address, - } - - requests = append(requests, OracleRequestV2) - } - return requests, nil - } - - return nil, errors.Errorf("LogsToRequests: Unsupported Coordinator version %s", coordinator.typeAndVersion) -} - -func (l *logPollerWrapper) logsToResponses(coordinator coordinator, responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) { - var responses []evmRelayTypes.OracleResponse - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(coordinator.address, l.client) - if err != nil { - return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") - } - for _, log := range responseLogs { - gethLog := log.ToGethLog() - oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) - if err != nil { - l.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") - continue - } - responses = append(responses, evmRelayTypes.OracleResponse{ - RequestId: oracleResponse.RequestId, - }) - } - return responses, nil - } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator.address, l.client) - if err != nil { - return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") - } - for _, log := range responseLogs { - gethLog := log.ToGethLog() - oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) - if err != nil { - l.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") - continue - } - responses = append(responses, evmRelayTypes.OracleResponse{ - RequestId: oracleResponse.RequestId, - }) - } - return responses, nil - } - - return nil, errors.Errorf("LogsToResponses: Unsupported Coordinator version %s", coordinator.typeAndVersion) -} From fad8945b7803ec3e79928fdcfaa344a96f016dd5 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Mon, 19 Feb 2024 13:23:23 +0100 Subject: [PATCH 4/6] fix: remove duplicates abitype definitions --- .../relay/evm/functions/coordinator_v1.go | 14 ----- .../relay/evm/functions/coordinator_v2.go | 14 ----- .../relay/evm/functions/logpoller_wrapper.go | 53 +++++++++++++++++++ 3 files changed, 53 insertions(+), 28 deletions(-) diff --git a/core/services/relay/evm/functions/coordinator_v1.go b/core/services/relay/evm/functions/coordinator_v1.go index 506d14c9504..c8450e642c8 100644 --- a/core/services/relay/evm/functions/coordinator_v1.go +++ b/core/services/relay/evm/functions/coordinator_v1.go @@ -59,20 +59,6 @@ func (c *CoordinatorV1) OracleResponseLogTopic() (common.Hash, error) { func (c *CoordinatorV1) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { var requests []evmRelayTypes.OracleRequest - uint32Type, errType1 := abi.NewType("uint32", "uint32", nil) - uint40Type, errType2 := abi.NewType("uint40", "uint40", nil) - uint64Type, errType3 := abi.NewType("uint64", "uint64", nil) - uint72Type, errType4 := abi.NewType("uint72", "uint72", nil) - uint96Type, errType5 := abi.NewType("uint96", "uint96", nil) - addressType, errType6 := abi.NewType("address", "address", nil) - bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) - - if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { - c.lggr.Errorw("LogsToRequests: failed to initialize types", "errType1", errType1, - "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, - ) - } - parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) if err != nil { return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed") diff --git a/core/services/relay/evm/functions/coordinator_v2.go b/core/services/relay/evm/functions/coordinator_v2.go index 513344c9052..7e360db609c 100644 --- a/core/services/relay/evm/functions/coordinator_v2.go +++ b/core/services/relay/evm/functions/coordinator_v2.go @@ -56,20 +56,6 @@ func (c *CoordinatorV2) OracleResponseLogTopic() (common.Hash, error) { func (c *CoordinatorV2) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { var requests []evmRelayTypes.OracleRequest - uint32Type, errType1 := abi.NewType("uint32", "uint32", nil) - uint40Type, errType2 := abi.NewType("uint40", "uint40", nil) - uint64Type, errType3 := abi.NewType("uint64", "uint64", nil) - uint72Type, errType4 := abi.NewType("uint72", "uint72", nil) - uint96Type, errType5 := abi.NewType("uint96", "uint96", nil) - addressType, errType6 := abi.NewType("address", "address", nil) - bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) - - if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { - c.lggr.Errorw("LogsToRequests: failed to initialize types", "errType1", errType1, - "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, - ) - } - parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) if err != nil { return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed") diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 87d758e09dc..4cb3d8da019 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -2,10 +2,12 @@ package functions import ( "context" + "fmt" "strings" "sync" "time" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" @@ -70,6 +72,23 @@ const logPollerCacheDurationSecDefault = 300 const pastBlocksToPollDefault = 50 const maxLogsToProcess = 1000 +var ( + uint32Type abi.Type + uint40Type abi.Type + uint64Type abi.Type + uint72Type abi.Type + uint96Type abi.Type + addressType abi.Type + bytes32Type abi.Type +) + +func init() { + err := initAbiTypes() + if err != nil { + panic(err) + } +} + var _ evmRelayTypes.LogPollerWrapper = &logPollerWrapper{} func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig config.PluginConfig, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) (evmRelayTypes.LogPollerWrapper, error) { @@ -416,3 +435,37 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinatorAddress common.Add } } } + +func initAbiTypes() error { + var err error + uint32Type, err = abi.NewType("uint32", "uint32", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint32Type type: %w", err) + } + uint40Type, err = abi.NewType("uint40", "uint40", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint40Type type: %w", err) + } + uint64Type, err = abi.NewType("uint64", "uint64", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint64Type type: %w", err) + } + uint72Type, err = abi.NewType("uint72", "uint72", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint72Type type: %w", err) + } + uint96Type, err = abi.NewType("uint96", "uint96", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint96Type type: %w", err) + } + addressType, err = abi.NewType("address", "address", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize addressType type: %w", err) + } + bytes32Type, err = abi.NewType("bytes32", "bytes32", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize bytes32Type type: %w", err) + } + + return nil +} From ff44e0ddd90dee754d680e5af9873b14b7a4c2f1 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Mon, 19 Feb 2024 15:06:32 +0100 Subject: [PATCH 5/6] fix: lint sonarqube reliability issue --- .../relay/evm/functions/coordinator_v1.go | 8 ++- .../relay/evm/functions/coordinator_v2.go | 8 ++- .../relay/evm/functions/logpoller_wrapper.go | 55 ++++++++++++------- .../evm/functions/logpoller_wrapper_test.go | 15 ++++- 4 files changed, 60 insertions(+), 26 deletions(-) diff --git a/core/services/relay/evm/functions/coordinator_v1.go b/core/services/relay/evm/functions/coordinator_v1.go index c8450e642c8..683d39ddd1a 100644 --- a/core/services/relay/evm/functions/coordinator_v1.go +++ b/core/services/relay/evm/functions/coordinator_v1.go @@ -1,9 +1,11 @@ package functions import ( + "fmt" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator_1_1_0" @@ -61,7 +63,7 @@ func (c *CoordinatorV1) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayT parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) if err != nil { - return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed") + return nil, fmt.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed: %w", err) } for _, log := range requestLogs { @@ -128,7 +130,7 @@ func (c *CoordinatorV1) LogsToResponses(responseLogs []logpoller.Log) ([]evmRela parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) if err != nil { - return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") + return nil, fmt.Errorf("LogsToResponses: creating a contract instance for parsing failed: %w", err) } for _, log := range responseLogs { gethLog := log.ToGethLog() diff --git a/core/services/relay/evm/functions/coordinator_v2.go b/core/services/relay/evm/functions/coordinator_v2.go index 7e360db609c..04c02570dc2 100644 --- a/core/services/relay/evm/functions/coordinator_v2.go +++ b/core/services/relay/evm/functions/coordinator_v2.go @@ -1,9 +1,11 @@ package functions import ( + "fmt" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator" @@ -58,7 +60,7 @@ func (c *CoordinatorV2) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayT parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) if err != nil { - return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed") + return nil, fmt.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed: %w", err) } for _, log := range requestLogs { @@ -127,7 +129,7 @@ func (c *CoordinatorV2) LogsToResponses(responseLogs []logpoller.Log) ([]evmRela parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) if err != nil { - return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") + return nil, fmt.Errorf("LogsToResponses: creating a contract instance for parsing failed: %w", err) } for _, log := range responseLogs { gethLog := log.ToGethLog() diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 4cb3d8da019..a374f01fa9e 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -176,10 +176,10 @@ func (l *logPollerWrapper) Name() string { return l.lggr.Name() } func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmRelayTypes.OracleResponse, error) { l.mu.Lock() coordinators := []Coordinator{} - if l.activeCoordinator.Address() != (common.Address{}) { + if l.activeCoordinator != nil && l.activeCoordinator.Address() != (common.Address{}) { coordinators = append(coordinators, l.activeCoordinator) } - if l.proposedCoordinator.Address() != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { + if l.proposedCoordinator != nil && l.proposedCoordinator.Address() != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { coordinators = append(coordinators, l.proposedCoordinator) } latest, err := l.logPoller.LatestBlock() @@ -385,48 +385,65 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinatorAddress common.Add return } - if activeCoordinatorAddress == l.activeCoordinator.Address() && proposedCoordinatorAddress == l.proposedCoordinator.Address() { + if (l.activeCoordinator != nil && l.activeCoordinator.Address() == activeCoordinatorAddress) && + (l.proposedCoordinator != nil && l.proposedCoordinator.Address() == proposedCoordinatorAddress) { l.lggr.Debug("LogPollerWrapper: no changes to routes") return } activeCoordinatorTypeAndVersion, err := l.getTypeAndVersion(activeCoordinatorAddress) if err != nil { + l.lggr.Errorf("LogPollerWrapper: failed to get active coordinatorTypeAndVersion: %w", err) return } var activeCoordinator Coordinator - if strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { + switch { + case strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING): activeCoordinator = NewCoordinatorV1(activeCoordinatorAddress, l.client, l.logPoller, l.lggr) - } else if strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { + case strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING): activeCoordinator = NewCoordinatorV2(activeCoordinatorAddress, l.client, l.logPoller, l.lggr) + default: + l.lggr.Errorf("LogPollerWrapper: Invalid active coordinator type and version: %q", activeCoordinatorTypeAndVersion) + return } - err = activeCoordinator.RegisterFilters() - if err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to register active coordinator filters", err) - return + if activeCoordinator != nil { + err = activeCoordinator.RegisterFilters() + if err != nil { + l.lggr.Errorw("LogPollerWrapper: Failed to register active coordinator filters", err) + return + } + l.activeCoordinator = activeCoordinator + l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Address().Hex()) } proposedCoordinatorTypeAndVersion, err := l.getTypeAndVersion(proposedCoordinatorAddress) if err != nil { + l.lggr.Errorf("LogPollerWrapper: failed to get proposed coordinatorTypeAndVersion: %w", err) return } + var proposedCoordinator Coordinator - if strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { + switch { + // proposedCoordinatorTypeAndVersion can be empty due to an empty proposedCoordinatorAddress + case proposedCoordinatorTypeAndVersion == "": + proposedCoordinator = NewCoordinatorV1(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) + case strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING): proposedCoordinator = NewCoordinatorV1(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) - } else if strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { + case strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING): proposedCoordinator = NewCoordinatorV2(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) - } - err = proposedCoordinator.RegisterFilters() - if err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to register proposed coordinator filters", err) - return } - l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Address().Hex(), "proposedCoordinator", proposedCoordinator.Address().Hex()) - l.activeCoordinator = activeCoordinator - l.proposedCoordinator = proposedCoordinator + if proposedCoordinator != nil { + err = proposedCoordinator.RegisterFilters() + if err != nil { + l.lggr.Errorw("LogPollerWrapper: Failed to register proposed coordinator filters", err) + return + } + l.proposedCoordinator = proposedCoordinator + l.lggr.Debugw("LogPollerWrapper: new routes", "proposedCoordinator", proposedCoordinator.Address().Hex()) + } for _, subscriber := range l.subscribers { err := subscriber.UpdateRoutes(activeCoordinator.Address(), proposedCoordinator.Address()) diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index ef3cba72a09..b2f196c5da6 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -232,7 +232,20 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() servicetest.Run(t, lpWrapper) - subscriber.updates.Wait() + + done := make(chan struct{}) + go func() { + subscriber.updates.Wait() + close(done) + }() + + select { + case <-done: // subscriber.updates is 0 + break + case <-time.After(5 * time.Second): // Hit timeout. + t.Log("TestLogPollerWrapper_LatestEvents_ReorgHandlingV1: timeout") + t.FailNow() + } oracleRequests, _, err := lpWrapper.LatestEvents() require.NoError(t, err) From c9f41141e3200628b6d901e4ec71ee524e843dd7 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Mon, 19 Feb 2024 23:31:46 +0100 Subject: [PATCH 6/6] fix: race conditions due to share variables --- .../relay/evm/functions/coordinator_v2.go | 6 +- .../evm/functions/logpoller_wrapper_test.go | 185 +++++++++--------- 2 files changed, 98 insertions(+), 93 deletions(-) diff --git a/core/services/relay/evm/functions/coordinator_v2.go b/core/services/relay/evm/functions/coordinator_v2.go index 04c02570dc2..4189e575b2a 100644 --- a/core/services/relay/evm/functions/coordinator_v2.go +++ b/core/services/relay/evm/functions/coordinator_v2.go @@ -93,12 +93,12 @@ func (c *CoordinatorV2) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayT oracleRequest.Commitment.Client, oracleRequest.Commitment.SubscriptionId, oracleRequest.Commitment.CallbackGasLimit, - oracleRequest.Commitment.AdminFee, - oracleRequest.Commitment.DonFee, + oracleRequest.Commitment.AdminFeeJuels, + oracleRequest.Commitment.DonFeeJuels, oracleRequest.Commitment.GasOverheadBeforeCallback, oracleRequest.Commitment.GasOverheadAfterCallback, oracleRequest.Commitment.TimeoutTimestamp, - oracleRequest.Commitment.OperationFee, + oracleRequest.Commitment.OperationFeeJuels, ) if err != nil { c.lggr.Errorw("LogsToRequests: failed to pack Coordinator v2 commitment bytes, skipping", err) diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index b2f196c5da6..9087356c65b 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -37,11 +37,6 @@ const ( OracleRequestV200 = `[{"constant":true,"inputs":[{"indexed":true,"internalType":"bytes32","name":"requestId","type":"bytes32"},{"indexed":true,"internalType":"address","name":"requestingContract","type":"address"},{"indexed":false,"internalType":"address","name":"requestInitiator","type":"address"},{"indexed":false,"internalType":"uint64","name":"subscriptionId","type":"uint64"},{"indexed":false,"internalType":"address","name":"subscriptionOwner","type":"address"},{"indexed":false,"internalType":"bytes","name":"data","type":"bytes"},{"indexed":false,"internalType":"uint16","name":"dataVersion","type":"uint16"},{"indexed":false,"internalType":"bytes32","name":"flags","type":"bytes32"},{"indexed":false,"internalType":"uint64","name":"callbackGasLimit","type":"uint64"},{"components":[{"internalType":"bytes32","name":"requestId","type":"bytes32"},{"internalType":"address","name":"coordinator","type":"address"},{"internalType":"uint96","name":"estimatedTotalCostJuels","type":"uint96"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint64","name":"subscriptionId","type":"uint64"},{"internalType":"uint32","name":"callbackGasLimit","type":"uint32"},{"internalType":"uint72","name":"adminFee","type":"uint72"},{"internalType":"uint72","name":"donFee","type":"uint72"},{"internalType":"uint40","name":"gasOverheadBeforeCallback","type":"uint40"},{"internalType":"uint40","name":"gasOverheadAfterCallback","type":"uint40"},{"internalType":"uint32","name":"timeoutTimestamp","type":"uint32"},{"internalType":"uint72","name":"operationFee","type":"uint72"}],"indexed":false,"internalType":"structFunctionsResponse.Commitment","name":"commitment","type":"tuple"}],"name":"OracleRequest","type":"function"}]` ) -var routerAddressBytes []byte -var routerAddressHex common.Address -var coordinatorAddressBytes []byte -var coordinatorAddressHex common.Address - func (s *subscriber) UpdateRoutes(activeCoordinator common.Address, proposedCoordinator common.Address) error { if s.expectedCalls == 0 { panic("unexpected call to UpdateRoutes") @@ -68,24 +63,34 @@ func addr(lastByte string) ([]byte, error) { return contractAddr, nil } -func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.LogPollerWrapper, *evmclimocks.Client) { +type setupResponse struct { + LogPoller *lpmocks.LogPoller + LogPollerWrapper types.LogPollerWrapper + Client *evmclimocks.Client + RouterAddress common.Address + CoordinatorAddress common.Address + CoordinatorAddressBytes []byte +} + +func setUp(t *testing.T, updateFrequencySec uint32) setupResponse { + s := setupResponse{} lggr := logger.TestLogger(t) - client := evmclimocks.NewClient(t) - lp := lpmocks.NewLogPoller(t) + s.Client = evmclimocks.NewClient(t) + s.LogPoller = lpmocks.NewLogPoller(t) config := config.PluginConfig{ ContractUpdateCheckFrequencySec: updateFrequencySec, ContractVersion: 1, } routerAddressBytes, err := addr("01") require.NoError(t, err) - routerAddressHex = common.BytesToAddress(routerAddressBytes) - coordinatorAddressBytes, err = addr("02") + s.RouterAddress = common.BytesToAddress(routerAddressBytes) + s.CoordinatorAddressBytes, err = addr("02") require.NoError(t, err) - coordinatorAddressHex = common.BytesToAddress(coordinatorAddressBytes) - lpWrapper, err := NewLogPollerWrapper(routerAddressHex, config, client, lp, lggr) + s.CoordinatorAddress = common.BytesToAddress(s.CoordinatorAddressBytes) + s.LogPollerWrapper, err = NewLogPollerWrapper(s.RouterAddress, config, s.Client, s.LogPoller, lggr) require.NoError(t, err) - return lp, lpWrapper, client + return s } func getMockedRequestLogV1(t *testing.T) logpoller.Log { @@ -126,31 +131,31 @@ func getMockedRequestLogV2(t *testing.T) logpoller.Log { func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - reqs, resps, err := lpWrapper.LatestEvents() + reqs, resps, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) require.Equal(t, 0, len(reqs)) require.Equal(t, 0, len(resps)) @@ -158,31 +163,31 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - reqs, resps, err := lpWrapper.LatestEvents() + reqs, resps, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) require.Equal(t, 0, len(reqs)) require.Equal(t, 0, len(resps)) @@ -190,48 +195,48 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr("00")) + setup.Client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr("00")) - servicetest.Run(t, lpWrapper) - _, _, err := lpWrapper.LatestEvents() + servicetest.Run(t, setup.LogPollerWrapper) + _, _, err := setup.LogPollerWrapper.LatestEvents() require.Error(t, err) } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup := setUp(t, 100_000) + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) mockedLog := getMockedRequestLogV1(t) // All logPoller queries for responses return none - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) // On the first logPoller query for requests, the request log appears - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() // On the 2nd query, the request log disappears - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() // On the 3rd query, the original request log appears again - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) done := make(chan struct{}) go func() { @@ -247,74 +252,74 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { t.FailNow() } - oracleRequests, _, err := lpWrapper.LatestEvents() + oracleRequests, _, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 1, len(oracleRequests)) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) require.NoError(t, err) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV2(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup := setUp(t, 100_000) + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) mockedLog := getMockedRequestLogV2(t) // All logPoller queries for responses return none - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) // On the first logPoller query for requests, the request log appears - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() // On the 2nd query, the request log disappears - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() // On the 3rd query, the original request log appears again - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - oracleRequests, _, err := lpWrapper.LatestEvents() + oracleRequests, _, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 1, len(oracleRequests)) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) require.NoError(t, err) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testing.T) { t.Parallel() - _, lpWrapper, _ := setUp(t, 100_000) + setup := setUp(t, 100_000) inputLogs := make([]logpoller.Log, maxLogsToProcess+100) for i := 0; i < 1100; i++ { inputLogs[i] = getMockedRequestLogV1(t) } - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") @@ -325,12 +330,12 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testin func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *testing.T) { t.Parallel() - _, lpWrapper, _ := setUp(t, 100_000) + setup := setUp(t, 100_000) inputLogs := []logpoller.Log{getMockedRequestLogV1(t)} inputLogs[0].Topics = [][]byte{[]byte("invalid topic")} mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") assert.Equal(t, 0, len(outputLogs)) @@ -340,7 +345,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *test func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetectedEvent(t *testing.T) { t.Parallel() - _, lpWrapper, _ := setUp(t, 100_000) + setup := setUp(t, 100_000) mockedRequestLog := getMockedRequestLogV1(t) inputLogs := []logpoller.Log{mockedRequestLog} var mockedRequestId [32]byte @@ -356,7 +361,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetect timeDetected: time.Now().Add(-time.Second * time.Duration(logPollerCacheDurationSecDefault+1)), } - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") assert.Equal(t, 0, len(outputLogs))