diff --git a/core/services/relay/evm/functions/coordinator_v1.go b/core/services/relay/evm/functions/coordinator_v1.go new file mode 100644 index 00000000000..683d39ddd1a --- /dev/null +++ b/core/services/relay/evm/functions/coordinator_v1.go @@ -0,0 +1,147 @@ +package functions + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator_1_1_0" + "github.com/smartcontractkit/chainlink/v2/core/logger" + evmRelayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +type CoordinatorV1 struct { + address common.Address + + client client.Client + logPoller logpoller.LogPoller + lggr logger.Logger +} + +func NewCoordinatorV1(address common.Address, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) *CoordinatorV1 { + return &CoordinatorV1{ + address: address, + client: client, + logPoller: logPoller, + lggr: lggr, + } +} + +func (c *CoordinatorV1) Address() common.Address { + return c.address +} + +func (c *CoordinatorV1) RegisterFilters() error { + if (c.address == common.Address{}) { + return nil + } + + return c.logPoller.RegisterFilter( + logpoller.Filter{ + Name: logpoller.FilterName("FunctionsLogPollerWrapper", c.address.String(), "-v", "1"), + EventSigs: []common.Hash{ + functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), + functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), + }, + Addresses: []common.Address{c.address}, + }) +} + +func (c *CoordinatorV1) OracleRequestLogTopic() (common.Hash, error) { + return functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), nil +} + +func (c *CoordinatorV1) OracleResponseLogTopic() (common.Hash, error) { + return functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), nil +} + +func (c *CoordinatorV1) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { + var requests []evmRelayTypes.OracleRequest + + parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) + if err != nil { + return nil, fmt.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed: %w", err) + } + + for _, log := range requestLogs { + gethLog := log.ToGethLog() + oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) + continue + } + + commitmentABIV1 := abi.Arguments{ + {Type: bytes32Type}, // RequestId + {Type: addressType}, // Coordinator + {Type: uint96Type}, // EstimatedTotalCostJuels + {Type: addressType}, // Client + {Type: uint64Type}, // SubscriptionId + {Type: uint32Type}, // CallbackGasLimit + {Type: uint72Type}, // AdminFee + {Type: uint72Type}, // DonFee + {Type: uint40Type}, // GasOverheadBeforeCallback + {Type: uint40Type}, // GasOverheadAfterCallback + {Type: uint32Type}, // TimeoutTimestamp + } + + commitmentBytesV1, err := commitmentABIV1.Pack( + oracleRequest.Commitment.RequestId, + oracleRequest.Commitment.Coordinator, + oracleRequest.Commitment.EstimatedTotalCostJuels, + oracleRequest.Commitment.Client, + oracleRequest.Commitment.SubscriptionId, + oracleRequest.Commitment.CallbackGasLimit, + oracleRequest.Commitment.AdminFee, + oracleRequest.Commitment.DonFee, + oracleRequest.Commitment.GasOverheadBeforeCallback, + oracleRequest.Commitment.GasOverheadAfterCallback, + oracleRequest.Commitment.TimeoutTimestamp, + ) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to pack Coordinator v1 commitment bytes, skipping", err) + } + + OracleRequestV1 := evmRelayTypes.OracleRequest{ + RequestId: oracleRequest.RequestId, + RequestingContract: oracleRequest.RequestingContract, + RequestInitiator: oracleRequest.RequestInitiator, + SubscriptionId: oracleRequest.SubscriptionId, + SubscriptionOwner: oracleRequest.SubscriptionOwner, + Data: oracleRequest.Data, + DataVersion: oracleRequest.DataVersion, + Flags: oracleRequest.Flags, + CallbackGasLimit: oracleRequest.CallbackGasLimit, + TxHash: oracleRequest.Raw.TxHash, + OnchainMetadata: commitmentBytesV1, + CoordinatorContract: c.address, + } + + requests = append(requests, OracleRequestV1) + } + return requests, nil +} + +func (c *CoordinatorV1) LogsToResponses(responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) { + var responses []evmRelayTypes.OracleResponse + + parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(c.address, c.client) + if err != nil { + return nil, fmt.Errorf("LogsToResponses: creating a contract instance for parsing failed: %w", err) + } + for _, log := range responseLogs { + gethLog := log.ToGethLog() + oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) + if err != nil { + c.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") + continue + } + responses = append(responses, evmRelayTypes.OracleResponse{ + RequestId: oracleResponse.RequestId, + }) + } + return responses, nil +} diff --git a/core/services/relay/evm/functions/coordinator_v2.go b/core/services/relay/evm/functions/coordinator_v2.go new file mode 100644 index 00000000000..4189e575b2a --- /dev/null +++ b/core/services/relay/evm/functions/coordinator_v2.go @@ -0,0 +1,146 @@ +package functions + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator" + "github.com/smartcontractkit/chainlink/v2/core/logger" + evmRelayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +type CoordinatorV2 struct { + address common.Address + + client client.Client + logPoller logpoller.LogPoller + lggr logger.Logger +} + +func NewCoordinatorV2(address common.Address, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) *CoordinatorV2 { + return &CoordinatorV2{ + address: address, + client: client, + logPoller: logPoller, + lggr: lggr, + } +} + +func (c *CoordinatorV2) Address() common.Address { + return c.address +} + +func (c *CoordinatorV2) RegisterFilters() error { + if (c.address == common.Address{}) { + return nil + } + + return c.logPoller.RegisterFilter( + logpoller.Filter{ + Name: logpoller.FilterName("FunctionsLogPollerWrapper", c.address.String(), "-v", "2"), + EventSigs: []common.Hash{ + functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), + functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), + }, + Addresses: []common.Address{c.address}, + }) +} +func (c *CoordinatorV2) OracleRequestLogTopic() (common.Hash, error) { + return functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), nil +} +func (c *CoordinatorV2) OracleResponseLogTopic() (common.Hash, error) { + return functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), nil +} +func (c *CoordinatorV2) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { + var requests []evmRelayTypes.OracleRequest + + parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) + if err != nil { + return nil, fmt.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed: %w", err) + } + + for _, log := range requestLogs { + gethLog := log.ToGethLog() + oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) + continue + } + + commitmentABIV2 := abi.Arguments{ + {Type: bytes32Type}, // RequestId + {Type: addressType}, // Coordinator + {Type: uint96Type}, // EstimatedTotalCostJuels + {Type: addressType}, // Client + {Type: uint64Type}, // SubscriptionId + {Type: uint32Type}, // CallbackGasLimit + {Type: uint72Type}, // AdminFee + {Type: uint72Type}, // DonFee + {Type: uint40Type}, // GasOverheadBeforeCallback + {Type: uint40Type}, // GasOverheadAfterCallback + {Type: uint32Type}, // TimeoutTimestamp + {Type: uint72Type}, // OperationFee + } + + commitmentBytesV2, err := commitmentABIV2.Pack( + oracleRequest.Commitment.RequestId, + oracleRequest.Commitment.Coordinator, + oracleRequest.Commitment.EstimatedTotalCostJuels, + oracleRequest.Commitment.Client, + oracleRequest.Commitment.SubscriptionId, + oracleRequest.Commitment.CallbackGasLimit, + oracleRequest.Commitment.AdminFeeJuels, + oracleRequest.Commitment.DonFeeJuels, + oracleRequest.Commitment.GasOverheadBeforeCallback, + oracleRequest.Commitment.GasOverheadAfterCallback, + oracleRequest.Commitment.TimeoutTimestamp, + oracleRequest.Commitment.OperationFeeJuels, + ) + if err != nil { + c.lggr.Errorw("LogsToRequests: failed to pack Coordinator v2 commitment bytes, skipping", err) + } + + OracleRequestV2 := evmRelayTypes.OracleRequest{ + RequestId: oracleRequest.RequestId, + RequestingContract: oracleRequest.RequestingContract, + RequestInitiator: oracleRequest.RequestInitiator, + SubscriptionId: oracleRequest.SubscriptionId, + SubscriptionOwner: oracleRequest.SubscriptionOwner, + Data: oracleRequest.Data, + DataVersion: oracleRequest.DataVersion, + Flags: oracleRequest.Flags, + CallbackGasLimit: oracleRequest.CallbackGasLimit, + TxHash: oracleRequest.Raw.TxHash, + OnchainMetadata: commitmentBytesV2, + CoordinatorContract: c.address, + } + + requests = append(requests, OracleRequestV2) + } + return requests, nil +} + +func (c *CoordinatorV2) LogsToResponses(responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) { + var responses []evmRelayTypes.OracleResponse + + parsingContract, err := functions_coordinator.NewFunctionsCoordinator(c.address, c.client) + if err != nil { + return nil, fmt.Errorf("LogsToResponses: creating a contract instance for parsing failed: %w", err) + } + for _, log := range responseLogs { + gethLog := log.ToGethLog() + oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) + if err != nil { + c.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") + continue + } + responses = append(responses, evmRelayTypes.OracleResponse{ + RequestId: oracleResponse.RequestId, + }) + } + return responses, nil +} diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 73c32be7db8..a374f01fa9e 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -2,6 +2,7 @@ package functions import ( "context" + "fmt" "strings" "sync" "time" @@ -15,8 +16,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator_1_1_0" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" type_and_version "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/type_and_version_interface_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -33,8 +32,8 @@ type logPollerWrapper struct { client client.Client logPoller logpoller.LogPoller subscribers map[string]evmRelayTypes.RouteUpdateSubscriber - activeCoordinator coordinator - proposedCoordinator coordinator + activeCoordinator Coordinator + proposedCoordinator Coordinator requestBlockOffset int64 responseBlockOffset int64 pastBlocksToPoll int64 @@ -57,22 +56,39 @@ type detectedEvents struct { detectedEventsOrdered []detectedEvent } -type coordinator struct { - address common.Address - typeAndVersion string +type Coordinator interface { + Address() common.Address + RegisterFilters() error + OracleRequestLogTopic() (common.Hash, error) + OracleResponseLogTopic() (common.Hash, error) + LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) + LogsToResponses(responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) } const FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING = "Functions Coordinator v1" const FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING = "Functions Coordinator v2" -type Functions_Coordinator interface { - *functions_coordinator_1_1_0.FunctionsCoordinator110 | *functions_coordinator.FunctionsCoordinator -} - const logPollerCacheDurationSecDefault = 300 const pastBlocksToPollDefault = 50 const maxLogsToProcess = 1000 +var ( + uint32Type abi.Type + uint40Type abi.Type + uint64Type abi.Type + uint72Type abi.Type + uint96Type abi.Type + addressType abi.Type + bytes32Type abi.Type +) + +func init() { + err := initAbiTypes() + if err != nil { + panic(err) + } +} + var _ evmRelayTypes.LogPollerWrapper = &logPollerWrapper{} func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig config.PluginConfig, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) (evmRelayTypes.LogPollerWrapper, error) { @@ -159,11 +175,11 @@ func (l *logPollerWrapper) Name() string { return l.lggr.Name() } // methods of LogPollerWrapper func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmRelayTypes.OracleResponse, error) { l.mu.Lock() - coordinators := []coordinator{} - if l.activeCoordinator.address != (common.Address{}) { + coordinators := []Coordinator{} + if l.activeCoordinator != nil && l.activeCoordinator.Address() != (common.Address{}) { coordinators = append(coordinators, l.activeCoordinator) } - if l.proposedCoordinator.address != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { + if l.proposedCoordinator != nil && l.proposedCoordinator.Address() != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { coordinators = append(coordinators, l.proposedCoordinator) } latest, err := l.logPoller.LatestBlock() @@ -188,12 +204,12 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR for _, coordinator := range coordinators { requestEndBlock := latestBlockNum - l.requestBlockOffset - requestLogTopic, err := oracleRequestLogTopic(coordinator) + requestLogTopic, err := coordinator.OracleRequestLogTopic() if err != nil { l.lggr.Errorw("LatestEvents: ", err) return nil, nil, err } - requestLogs, err := l.logPoller.Logs(startBlockNum, requestEndBlock, requestLogTopic, coordinator.address) + requestLogs, err := l.logPoller.Logs(startBlockNum, requestEndBlock, requestLogTopic, coordinator.Address()) if err != nil { l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", requestEndBlock) return nil, nil, err @@ -201,12 +217,12 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debugw("LatestEvents: fetched request logs", "nRequestLogs", len(requestLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", requestEndBlock) requestLogs = l.filterPreviouslyDetectedEvents(requestLogs, &l.detectedRequests, "requests") responseEndBlock := latestBlockNum - l.responseBlockOffset - responseLogTopic, err := oracleResponseLogTopic(coordinator) + responseLogTopic, err := coordinator.OracleResponseLogTopic() if err != nil { l.lggr.Errorw("LatestEvents: ", err) return nil, nil, err } - responseLogs, err := l.logPoller.Logs(startBlockNum, responseEndBlock, responseLogTopic, coordinator.address) + responseLogs, err := l.logPoller.Logs(startBlockNum, responseEndBlock, responseLogTopic, coordinator.Address()) if err != nil { l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", responseEndBlock) return nil, nil, err @@ -214,13 +230,13 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debugw("LatestEvents: fetched request logs", "nResponseLogs", len(responseLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", responseEndBlock) responseLogs = l.filterPreviouslyDetectedEvents(responseLogs, &l.detectedResponses, "responses") - l.lggr.Debugw("LatestEvents: parsing logs", "nRequestLogs", len(requestLogs), "nResponseLogs", len(responseLogs), "coordinatorAddress", coordinator.address.Hex()) - requests, err := l.logsToRequests(coordinator, requestLogs) + l.lggr.Debugw("LatestEvents: parsing logs", "nRequestLogs", len(requestLogs), "nResponseLogs", len(responseLogs), "coordinatorAddress", coordinator.Address().Hex()) + requests, err := coordinator.LogsToRequests(requestLogs) if err != nil { return nil, nil, err } resultsReq = append(resultsReq, requests...) - responses, err := l.logsToResponses(coordinator, responseLogs) + responses, err := coordinator.LogsToResponses(responseLogs) if err != nil { return nil, nil, err } @@ -333,7 +349,7 @@ func (l *logPollerWrapper) getCurrentCoordinators(ctx context.Context) (common.A Context: ctx, }, donId) if err != nil { - return activeCoordinatorAddress, l.proposedCoordinator.address, nil + return activeCoordinatorAddress, l.proposedCoordinator.Address(), nil } return activeCoordinatorAddress, proposedCoordinator, nil @@ -369,291 +385,104 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinatorAddress common.Add return } - if activeCoordinatorAddress == l.activeCoordinator.address && proposedCoordinatorAddress == l.proposedCoordinator.address { + if (l.activeCoordinator != nil && l.activeCoordinator.Address() == activeCoordinatorAddress) && + (l.proposedCoordinator != nil && l.proposedCoordinator.Address() == proposedCoordinatorAddress) { l.lggr.Debug("LogPollerWrapper: no changes to routes") return } activeCoordinatorTypeAndVersion, err := l.getTypeAndVersion(activeCoordinatorAddress) if err != nil { + l.lggr.Errorf("LogPollerWrapper: failed to get active coordinatorTypeAndVersion: %w", err) return } - activeCoordinator := coordinator{address: activeCoordinatorAddress, typeAndVersion: activeCoordinatorTypeAndVersion} - - proposedCoordinatorTypeAndVersion, err := l.getTypeAndVersion(proposedCoordinatorAddress) - if err != nil { - return - } - proposedCoordinator := coordinator{address: proposedCoordinatorAddress, typeAndVersion: proposedCoordinatorTypeAndVersion} - - errActive := l.registerFilters(activeCoordinator) - errProposed := l.registerFilters(proposedCoordinator) - if errActive != nil || errProposed != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to register filters", "errorActive", errActive, "errorProposed", errProposed) + var activeCoordinator Coordinator + switch { + case strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING): + activeCoordinator = NewCoordinatorV1(activeCoordinatorAddress, l.client, l.logPoller, l.lggr) + case strings.Contains(activeCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING): + activeCoordinator = NewCoordinatorV2(activeCoordinatorAddress, l.client, l.logPoller, l.lggr) + default: + l.lggr.Errorf("LogPollerWrapper: Invalid active coordinator type and version: %q", activeCoordinatorTypeAndVersion) return } - l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.address.Hex(), "proposedCoordinator", proposedCoordinator.address.Hex()) - l.activeCoordinator = activeCoordinator - l.proposedCoordinator = proposedCoordinator - - for _, subscriber := range l.subscribers { - err := subscriber.UpdateRoutes(activeCoordinator.address, proposedCoordinator.address) + if activeCoordinator != nil { + err = activeCoordinator.RegisterFilters() if err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err) + l.lggr.Errorw("LogPollerWrapper: Failed to register active coordinator filters", err) + return } - } -} - -func filterName(addr common.Address, version string) string { - return logpoller.FilterName("FunctionsLogPollerWrapper", addr.String(), "-v", version) -} - -func (l *logPollerWrapper) registerFilters(coordinator coordinator) error { - if (coordinator.address == common.Address{}) { - return nil + l.activeCoordinator = activeCoordinator + l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Address().Hex()) } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - return l.logPoller.RegisterFilter( - logpoller.Filter{ - Name: filterName(coordinator.address, "1"), - EventSigs: []common.Hash{ - functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), - functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), - }, - Addresses: []common.Address{coordinator.address}, - }) - } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - return l.logPoller.RegisterFilter( - logpoller.Filter{ - Name: filterName(coordinator.address, "2"), - EventSigs: []common.Hash{ - functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), - functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), - }, - Addresses: []common.Address{coordinator.address}, - }) - } - - l.lggr.Errorw("RegisterFilters: Unsupported Coordinator version ", coordinator.typeAndVersion) - return errors.Errorf("RegisterFilters: Unsupported Coordinator version") -} - -func oracleRequestLogTopic(coordinator coordinator) (common.Hash, error) { - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - return functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), nil - } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - return functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), nil - } - return common.Hash{}, errors.Errorf("OracleRequestLogTopic: Unsupported Coordinator version %s", coordinator.typeAndVersion) -} - -func oracleResponseLogTopic(coordinator coordinator) (common.Hash, error) { - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - return functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), nil - } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - return functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), nil + proposedCoordinatorTypeAndVersion, err := l.getTypeAndVersion(proposedCoordinatorAddress) + if err != nil { + l.lggr.Errorf("LogPollerWrapper: failed to get proposed coordinatorTypeAndVersion: %w", err) + return } - return common.Hash{}, errors.Errorf("OracleResponseLogTopic: Unsupported Coordinator version %s", coordinator.typeAndVersion) -} - -func (l *logPollerWrapper) logsToRequests(coordinator coordinator, requestLogs []logpoller.Log) ([]evmRelayTypes.OracleRequest, error) { - var requests []evmRelayTypes.OracleRequest - uint32Type, errType1 := abi.NewType("uint32", "uint32", nil) - uint40Type, errType2 := abi.NewType("uint40", "uint40", nil) - uint64Type, errType3 := abi.NewType("uint64", "uint64", nil) - uint72Type, errType4 := abi.NewType("uint72", "uint72", nil) - uint96Type, errType5 := abi.NewType("uint96", "uint96", nil) - addressType, errType6 := abi.NewType("address", "address", nil) - bytes32Type, errType7 := abi.NewType("bytes32", "bytes32", nil) + var proposedCoordinator Coordinator + switch { + // proposedCoordinatorTypeAndVersion can be empty due to an empty proposedCoordinatorAddress + case proposedCoordinatorTypeAndVersion == "": + proposedCoordinator = NewCoordinatorV1(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) + case strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING): + proposedCoordinator = NewCoordinatorV1(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) + case strings.Contains(proposedCoordinatorTypeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING): + proposedCoordinator = NewCoordinatorV2(proposedCoordinatorAddress, l.client, l.logPoller, l.lggr) - if errType1 != nil || errType2 != nil || errType3 != nil || errType4 != nil || errType5 != nil || errType6 != nil || errType7 != nil { - l.lggr.Errorw("LogsToRequests: failed to initialize types", "errType1", errType1, - "errType2", errType2, "errType3", errType3, "errType4", errType4, "errType5", errType5, "errType6", errType6, "errType7", errType7, - ) } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(coordinator.address, l.client) + if proposedCoordinator != nil { + err = proposedCoordinator.RegisterFilters() if err != nil { - return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator110 parsing failed") - } - - for _, log := range requestLogs { - gethLog := log.ToGethLog() - oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) - continue - } - - commitmentABIV1 := abi.Arguments{ - {Type: bytes32Type}, // RequestId - {Type: addressType}, // Coordinator - {Type: uint96Type}, // EstimatedTotalCostJuels - {Type: addressType}, // Client - {Type: uint64Type}, // SubscriptionId - {Type: uint32Type}, // CallbackGasLimit - {Type: uint72Type}, // AdminFee - {Type: uint72Type}, // DonFee - {Type: uint40Type}, // GasOverheadBeforeCallback - {Type: uint40Type}, // GasOverheadAfterCallback - {Type: uint32Type}, // TimeoutTimestamp - } - - commitmentBytesV1, err := commitmentABIV1.Pack( - oracleRequest.Commitment.RequestId, - oracleRequest.Commitment.Coordinator, - oracleRequest.Commitment.EstimatedTotalCostJuels, - oracleRequest.Commitment.Client, - oracleRequest.Commitment.SubscriptionId, - oracleRequest.Commitment.CallbackGasLimit, - oracleRequest.Commitment.AdminFee, - oracleRequest.Commitment.DonFee, - oracleRequest.Commitment.GasOverheadBeforeCallback, - oracleRequest.Commitment.GasOverheadAfterCallback, - oracleRequest.Commitment.TimeoutTimestamp, - ) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to pack Coordinator v1 commitment bytes, skipping", err) - } - - OracleRequestV1 := evmRelayTypes.OracleRequest{ - RequestId: oracleRequest.RequestId, - RequestingContract: oracleRequest.RequestingContract, - RequestInitiator: oracleRequest.RequestInitiator, - SubscriptionId: oracleRequest.SubscriptionId, - SubscriptionOwner: oracleRequest.SubscriptionOwner, - Data: oracleRequest.Data, - DataVersion: oracleRequest.DataVersion, - Flags: oracleRequest.Flags, - CallbackGasLimit: oracleRequest.CallbackGasLimit, - TxHash: oracleRequest.Raw.TxHash, - OnchainMetadata: commitmentBytesV1, - CoordinatorContract: coordinator.address, - } - - requests = append(requests, OracleRequestV1) + l.lggr.Errorw("LogPollerWrapper: Failed to register proposed coordinator filters", err) + return } - return requests, nil + l.proposedCoordinator = proposedCoordinator + l.lggr.Debugw("LogPollerWrapper: new routes", "proposedCoordinator", proposedCoordinator.Address().Hex()) } - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator.address, l.client) + for _, subscriber := range l.subscribers { + err := subscriber.UpdateRoutes(activeCoordinator.Address(), proposedCoordinator.Address()) if err != nil { - return nil, errors.Errorf("LogsToRequests: creating a contract instance for NewFunctionsCoordinator parsing failed") - } - - for _, log := range requestLogs { - gethLog := log.ToGethLog() - oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to parse a request log, skipping", "err", err) - continue - } - - commitmentABIV2 := abi.Arguments{ - {Type: bytes32Type}, // RequestId - {Type: addressType}, // Coordinator - {Type: uint96Type}, // EstimatedTotalCostJuels - {Type: addressType}, // Client - {Type: uint64Type}, // SubscriptionId - {Type: uint32Type}, // CallbackGasLimit - {Type: uint72Type}, // AdminFee - {Type: uint72Type}, // DonFee - {Type: uint40Type}, // GasOverheadBeforeCallback - {Type: uint40Type}, // GasOverheadAfterCallback - {Type: uint32Type}, // TimeoutTimestamp - {Type: uint72Type}, // OperationFee - } - - commitmentBytesV2, err := commitmentABIV2.Pack( - oracleRequest.Commitment.RequestId, - oracleRequest.Commitment.Coordinator, - oracleRequest.Commitment.EstimatedTotalCostJuels, - oracleRequest.Commitment.Client, - oracleRequest.Commitment.SubscriptionId, - oracleRequest.Commitment.CallbackGasLimit, - oracleRequest.Commitment.AdminFeeJuels, - oracleRequest.Commitment.DonFeeJuels, - oracleRequest.Commitment.GasOverheadBeforeCallback, - oracleRequest.Commitment.GasOverheadAfterCallback, - oracleRequest.Commitment.TimeoutTimestamp, - oracleRequest.Commitment.OperationFeeJuels, - ) - if err != nil { - l.lggr.Errorw("LogsToRequests: failed to pack Coordinator v2 commitment bytes, skipping", err) - } - - OracleRequestV2 := evmRelayTypes.OracleRequest{ - RequestId: oracleRequest.RequestId, - RequestingContract: oracleRequest.RequestingContract, - RequestInitiator: oracleRequest.RequestInitiator, - SubscriptionId: oracleRequest.SubscriptionId, - SubscriptionOwner: oracleRequest.SubscriptionOwner, - Data: oracleRequest.Data, - DataVersion: oracleRequest.DataVersion, - Flags: oracleRequest.Flags, - CallbackGasLimit: oracleRequest.CallbackGasLimit, - TxHash: oracleRequest.Raw.TxHash, - OnchainMetadata: commitmentBytesV2, - CoordinatorContract: coordinator.address, - } - - requests = append(requests, OracleRequestV2) + l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err) } - return requests, nil } - - return nil, errors.Errorf("LogsToRequests: Unsupported Coordinator version %s", coordinator.typeAndVersion) } -func (l *logPollerWrapper) logsToResponses(coordinator coordinator, responseLogs []logpoller.Log) ([]evmRelayTypes.OracleResponse, error) { - var responses []evmRelayTypes.OracleResponse - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_1_SUBSTRING) { - parsingContract, err := functions_coordinator_1_1_0.NewFunctionsCoordinator110(coordinator.address, l.client) - if err != nil { - return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") - } - for _, log := range responseLogs { - gethLog := log.ToGethLog() - oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) - if err != nil { - l.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") - continue - } - responses = append(responses, evmRelayTypes.OracleResponse{ - RequestId: oracleResponse.RequestId, - }) - } - return responses, nil +func initAbiTypes() error { + var err error + uint32Type, err = abi.NewType("uint32", "uint32", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint32Type type: %w", err) } - - if strings.Contains(coordinator.typeAndVersion, FUNCTIONS_COORDINATOR_VERSION_2_SUBSTRING) { - parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator.address, l.client) - if err != nil { - return nil, errors.Errorf("LogsToResponses: creating a contract instance for parsing failed") - } - for _, log := range responseLogs { - gethLog := log.ToGethLog() - oracleResponse, err := parsingContract.ParseOracleResponse(gethLog) - if err != nil { - l.lggr.Errorw("LogsToResponses: failed to parse a response log, skipping") - continue - } - responses = append(responses, evmRelayTypes.OracleResponse{ - RequestId: oracleResponse.RequestId, - }) - } - return responses, nil + uint40Type, err = abi.NewType("uint40", "uint40", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint40Type type: %w", err) + } + uint64Type, err = abi.NewType("uint64", "uint64", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint64Type type: %w", err) + } + uint72Type, err = abi.NewType("uint72", "uint72", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint72Type type: %w", err) + } + uint96Type, err = abi.NewType("uint96", "uint96", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize uint96Type type: %w", err) + } + addressType, err = abi.NewType("address", "address", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize addressType type: %w", err) + } + bytes32Type, err = abi.NewType("bytes32", "bytes32", nil) + if err != nil { + return fmt.Errorf("LogsToRequests: failed to initialize bytes32Type type: %w", err) } - return nil, errors.Errorf("LogsToResponses: Unsupported Coordinator version %s", coordinator.typeAndVersion) + return nil } diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index c68739f45a4..9087356c65b 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -37,11 +37,6 @@ const ( OracleRequestV200 = `[{"constant":true,"inputs":[{"indexed":true,"internalType":"bytes32","name":"requestId","type":"bytes32"},{"indexed":true,"internalType":"address","name":"requestingContract","type":"address"},{"indexed":false,"internalType":"address","name":"requestInitiator","type":"address"},{"indexed":false,"internalType":"uint64","name":"subscriptionId","type":"uint64"},{"indexed":false,"internalType":"address","name":"subscriptionOwner","type":"address"},{"indexed":false,"internalType":"bytes","name":"data","type":"bytes"},{"indexed":false,"internalType":"uint16","name":"dataVersion","type":"uint16"},{"indexed":false,"internalType":"bytes32","name":"flags","type":"bytes32"},{"indexed":false,"internalType":"uint64","name":"callbackGasLimit","type":"uint64"},{"components":[{"internalType":"bytes32","name":"requestId","type":"bytes32"},{"internalType":"address","name":"coordinator","type":"address"},{"internalType":"uint96","name":"estimatedTotalCostJuels","type":"uint96"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint64","name":"subscriptionId","type":"uint64"},{"internalType":"uint32","name":"callbackGasLimit","type":"uint32"},{"internalType":"uint72","name":"adminFee","type":"uint72"},{"internalType":"uint72","name":"donFee","type":"uint72"},{"internalType":"uint40","name":"gasOverheadBeforeCallback","type":"uint40"},{"internalType":"uint40","name":"gasOverheadAfterCallback","type":"uint40"},{"internalType":"uint32","name":"timeoutTimestamp","type":"uint32"},{"internalType":"uint72","name":"operationFee","type":"uint72"}],"indexed":false,"internalType":"structFunctionsResponse.Commitment","name":"commitment","type":"tuple"}],"name":"OracleRequest","type":"function"}]` ) -var routerAddressBytes []byte -var routerAddressHex common.Address -var coordinatorAddressBytes []byte -var coordinatorAddressHex common.Address - func (s *subscriber) UpdateRoutes(activeCoordinator common.Address, proposedCoordinator common.Address) error { if s.expectedCalls == 0 { panic("unexpected call to UpdateRoutes") @@ -60,28 +55,42 @@ func newSubscriber(expectedCalls int) *subscriber { return sub } -func addr(t *testing.T, lastByte string) []byte { +func addr(lastByte string) ([]byte, error) { contractAddr, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000" + lastByte) - require.NoError(t, err) - return contractAddr + if err != nil { + return []byte{}, err + } + return contractAddr, nil +} + +type setupResponse struct { + LogPoller *lpmocks.LogPoller + LogPollerWrapper types.LogPollerWrapper + Client *evmclimocks.Client + RouterAddress common.Address + CoordinatorAddress common.Address + CoordinatorAddressBytes []byte } -func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.LogPollerWrapper, *evmclimocks.Client) { +func setUp(t *testing.T, updateFrequencySec uint32) setupResponse { + s := setupResponse{} lggr := logger.TestLogger(t) - client := evmclimocks.NewClient(t) - lp := lpmocks.NewLogPoller(t) + s.Client = evmclimocks.NewClient(t) + s.LogPoller = lpmocks.NewLogPoller(t) config := config.PluginConfig{ ContractUpdateCheckFrequencySec: updateFrequencySec, ContractVersion: 1, } - routerAddressBytes = addr(t, "01") - routerAddressHex = common.BytesToAddress(routerAddressBytes) - coordinatorAddressBytes = addr(t, "02") - coordinatorAddressHex = common.BytesToAddress(coordinatorAddressBytes) - lpWrapper, err := NewLogPollerWrapper(routerAddressHex, config, client, lp, lggr) + routerAddressBytes, err := addr("01") + require.NoError(t, err) + s.RouterAddress = common.BytesToAddress(routerAddressBytes) + s.CoordinatorAddressBytes, err = addr("02") + require.NoError(t, err) + s.CoordinatorAddress = common.BytesToAddress(s.CoordinatorAddressBytes) + s.LogPollerWrapper, err = NewLogPollerWrapper(s.RouterAddress, config, s.Client, s.LogPoller, lggr) require.NoError(t, err) - return lp, lpWrapper, client + return s } func getMockedRequestLogV1(t *testing.T) logpoller.Log { @@ -121,177 +130,196 @@ func getMockedRequestLogV2(t *testing.T) logpoller.Log { } func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T) { - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + t.Parallel() + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + }, mock.Anything).Return(addr("00")) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - reqs, resps, err := lpWrapper.LatestEvents() + reqs, resps, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) require.Equal(t, 0, len(reqs)) require.Equal(t, 0, len(resps)) } func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T) { - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + t.Parallel() + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + }, mock.Anything).Return(addr("00")) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - reqs, resps, err := lpWrapper.LatestEvents() + reqs, resps, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) require.Equal(t, 0, len(reqs)) require.Equal(t, 0, len(resps)) } func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + t.Parallel() + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "00"), nil) + setup.Client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr("00")) - servicetest.Run(t, lpWrapper) - _, _, err := lpWrapper.LatestEvents() + servicetest.Run(t, setup.LogPollerWrapper) + _, _, err := setup.LogPollerWrapper.LatestEvents() require.Error(t, err) } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { - lp, lpWrapper, client := setUp(t, 100_000) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + t.Parallel() + setup := setUp(t, 100_000) + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) + }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) mockedLog := getMockedRequestLogV1(t) // All logPoller queries for responses return none - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) // On the first logPoller query for requests, the request log appears - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() // On the 2nd query, the request log disappears - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() // On the 3rd query, the original request log appears again - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() - - servicetest.Run(t, lpWrapper) - subscriber.updates.Wait() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + + servicetest.Run(t, setup.LogPollerWrapper) + + done := make(chan struct{}) + go func() { + subscriber.updates.Wait() + close(done) + }() + + select { + case <-done: // subscriber.updates is 0 + break + case <-time.After(5 * time.Second): // Hit timeout. + t.Log("TestLogPollerWrapper_LatestEvents_ReorgHandlingV1: timeout") + t.FailNow() + } - oracleRequests, _, err := lpWrapper.LatestEvents() + oracleRequests, _, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 1, len(oracleRequests)) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) require.NoError(t, err) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV2(t *testing.T) { - lp, lpWrapper, client := setUp(t, 100_000) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + t.Parallel() + setup := setUp(t, 100_000) + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(addr(t, "00"), nil) + }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) mockedLog := getMockedRequestLogV2(t) // All logPoller queries for responses return none - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) // On the first logPoller query for requests, the request log appears - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() // On the 2nd query, the request log disappears - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() // On the 3rd query, the original request log appears again - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - oracleRequests, _, err := lpWrapper.LatestEvents() + oracleRequests, _, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 1, len(oracleRequests)) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) require.NoError(t, err) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testing.T) { - _, lpWrapper, _ := setUp(t, 100_000) + t.Parallel() + setup := setUp(t, 100_000) inputLogs := make([]logpoller.Log, maxLogsToProcess+100) for i := 0; i < 1100; i++ { inputLogs[i] = getMockedRequestLogV1(t) } - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") @@ -301,12 +329,13 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testin } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *testing.T) { - _, lpWrapper, _ := setUp(t, 100_000) + t.Parallel() + setup := setUp(t, 100_000) inputLogs := []logpoller.Log{getMockedRequestLogV1(t)} inputLogs[0].Topics = [][]byte{[]byte("invalid topic")} mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") assert.Equal(t, 0, len(outputLogs)) @@ -315,7 +344,8 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *test } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetectedEvent(t *testing.T) { - _, lpWrapper, _ := setUp(t, 100_000) + t.Parallel() + setup := setUp(t, 100_000) mockedRequestLog := getMockedRequestLogV1(t) inputLogs := []logpoller.Log{mockedRequestLog} var mockedRequestId [32]byte @@ -331,7 +361,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetect timeDetected: time.Now().Add(-time.Second * time.Duration(logPollerCacheDurationSecDefault+1)), } - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") assert.Equal(t, 0, len(outputLogs))