From ed3d7c779d3e4d85fc1dadd9c70bcac74bb32132 Mon Sep 17 00:00:00 2001 From: pablomendezroyo Date: Fri, 20 Dec 2024 13:09:56 +0100 Subject: [PATCH 1/2] skip events scanners --- cmd/main.go | 4 +- .../adapters/execution/execution_adapter.go | 44 +++++++++++++++++++ .../execution_adapter_integration_test.go | 20 +++++++++ internal/application/ports/execution_port.go | 3 ++ .../distributionLogUpdatedEventScanner.go | 17 ++++++- .../validatorExitRequestEventScanner.go | 22 +++++++++- internal/config/config_loader.go | 5 +++ 7 files changed, 111 insertions(+), 4 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 32ee190..c675d45 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -94,8 +94,8 @@ func main() { // Initialize domain services eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter) - distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment) - validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment) + distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment, networkConfig.CSModuleTxReceipt) + validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment, networkConfig.CSModuleTxReceipt) validatorEjectorService := services.NewValidatorEjectorService(storageAdapter, notifierAdapter, exitValidatorAdapter, beaconchainAdapter) pendingHashesLoaderService := services.NewPendingHashesLoader(storageAdapter, notifierAdapter, ipfsAdapter, networkConfig.MinGenesisTime) // relaysCheckerService := services.NewRelayCronService(relaysAllowedAdapter, relaysUsedAdapter, notifierAdapter) diff --git a/internal/adapters/execution/execution_adapter.go b/internal/adapters/execution/execution_adapter.go index 275d27d..a1d5c10 100644 --- a/internal/adapters/execution/execution_adapter.go +++ b/internal/adapters/execution/execution_adapter.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" ) @@ -158,3 +159,46 @@ func (e *ExecutionAdapter) IsSyncing() (bool, error) { // If result is a map or object, the node is syncing return true, nil } + +// GetTransactionReceipt retrieves the transaction receipt for a given transaction hash. +func (e *ExecutionAdapter) GetTransactionReceipt(txHash common.Hash) (map[string]interface{}, error) { + // Create the request payload for eth_getTransactionReceipt + payload := map[string]interface{}{ + "jsonrpc": "2.0", + "method": "eth_getTransactionReceipt", + "params": []interface{}{txHash}, + "id": 1, + } + + // Marshal the payload to JSON + jsonPayload, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("failed to marshal request payload for eth_getTransactionReceipt: %w", err) + } + + // Send the request to the execution client + resp, err := http.Post(e.rpcURL, "application/json", bytes.NewBuffer(jsonPayload)) + if err != nil { + return nil, fmt.Errorf("failed to send request to execution client at %s: %w", e.rpcURL, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d received from execution client", resp.StatusCode) + } + + // Parse the response + var result struct { + Result map[string]interface{} `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response from execution client: %w", err) + } + + // Check if the result is null + if result.Result == nil { + return nil, nil // Returning nil to indicate no receipt is available + } + + return result.Result, nil +} diff --git a/internal/adapters/execution/execution_adapter_integration_test.go b/internal/adapters/execution/execution_adapter_integration_test.go index de9f822..60e3809 100644 --- a/internal/adapters/execution/execution_adapter_integration_test.go +++ b/internal/adapters/execution/execution_adapter_integration_test.go @@ -9,6 +9,7 @@ import ( "lido-events/internal/adapters/execution" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" ) @@ -81,3 +82,22 @@ func TestIsSyncingIntegration(t *testing.T) { t.Log("The Ethereum node is not syncing.") } } + +// TestGetTransactionReceiptIntegration tests retrieving the transaction receipt +func TestGetTransactionReceiptIntegration(t *testing.T) { + adapter, err := setupExecutionAdapter(t) + assert.NoError(t, err) + + // Specify a transaction hash to test + txHash := common.HexToHash("0x1475719ecbb73b28bc531bb54b37695df1bf6b71c6d2bf1d28b4efa404867e26") + + // Call the GetTransactionReceipt method + receipt, err := adapter.GetTransactionReceipt(txHash) + assert.NoError(t, err) + + // Ensure receipt is not nil + assert.NotNil(t, receipt, "Expected a non-nil transaction receipt") + + // Log the receipt for debugging + t.Logf("Transaction receipt: %+v", receipt) +} diff --git a/internal/application/ports/execution_port.go b/internal/application/ports/execution_port.go index cb6f32c..5f9eb91 100644 --- a/internal/application/ports/execution_port.go +++ b/internal/application/ports/execution_port.go @@ -1,7 +1,10 @@ package ports +import "github.com/ethereum/go-ethereum/common" + type ExecutionPort interface { GetMostRecentBlockNumber() (uint64, error) GetBlockTimestampByNumber(blockNumber uint64) (uint64, error) IsSyncing() (bool, error) + GetTransactionReceipt(txHash common.Hash) (map[string]interface{}, error) } diff --git a/internal/application/services/distributionLogUpdatedEventScanner.go b/internal/application/services/distributionLogUpdatedEventScanner.go index 2963326..30afb16 100644 --- a/internal/application/services/distributionLogUpdatedEventScanner.go +++ b/internal/application/services/distributionLogUpdatedEventScanner.go @@ -8,6 +8,8 @@ import ( "lido-events/internal/logger" "sync" "time" + + "github.com/ethereum/go-ethereum/common" ) type DistributionLogUpdatedEventScanner struct { @@ -16,16 +18,18 @@ type DistributionLogUpdatedEventScanner struct { executionPort ports.ExecutionPort csFeeDistributorImplPort ports.CsFeeDistributorImplPort csFeeDistributorBlockDeployment uint64 + csModuleTxReceipt common.Hash servicePrefix string } -func NewDistributionLogUpdatedEventScanner(storagePort ports.StoragePort, notifierPort ports.NotifierPort, executionPort ports.ExecutionPort, csFeeDistributorImplPort ports.CsFeeDistributorImplPort, csFeeDistributorBlockDeployment uint64) *DistributionLogUpdatedEventScanner { +func NewDistributionLogUpdatedEventScanner(storagePort ports.StoragePort, notifierPort ports.NotifierPort, executionPort ports.ExecutionPort, csFeeDistributorImplPort ports.CsFeeDistributorImplPort, csFeeDistributorBlockDeployment uint64, csModuleTxReceipt common.Hash) *DistributionLogUpdatedEventScanner { return &DistributionLogUpdatedEventScanner{ storagePort, notifierPort, executionPort, csFeeDistributorImplPort, csFeeDistributorBlockDeployment, + csModuleTxReceipt, "DistributionLogUpdatedEventScanner", } } @@ -70,6 +74,17 @@ func (ds *DistributionLogUpdatedEventScanner) runScan(ctx context.Context) { return } + // Skip if tx receipt not found (nil). This means that the node does not store log receipts and there are no logs at all + receipt, err := ds.executionPort.GetTransactionReceipt(ds.csModuleTxReceipt) + if err != nil { + logger.ErrorWithPrefix(ds.servicePrefix, "Error getting transaction receipt for csModule deployment: %v", err) + return + } + if receipt == nil { + logger.WarnWithPrefix(ds.servicePrefix, "Transaction receipt for csModule deployment not found, skipping ValidatorExitRequest event scan. This means that the node does not store log receipts and there are no logs at all") + return + } + // Retrieve start and end blocks for scanning start, err := ds.storagePort.GetDistributionLogLastProcessedBlock() if err != nil { diff --git a/internal/application/services/validatorExitRequestEventScanner.go b/internal/application/services/validatorExitRequestEventScanner.go index eb3ccad..d133a22 100644 --- a/internal/application/services/validatorExitRequestEventScanner.go +++ b/internal/application/services/validatorExitRequestEventScanner.go @@ -9,6 +9,8 @@ import ( "lido-events/internal/logger" "sync" "time" + + "github.com/ethereum/go-ethereum/common" ) type ValidatorExitRequestEventScanner struct { @@ -18,10 +20,11 @@ type ValidatorExitRequestEventScanner struct { executionPort ports.ExecutionPort beaconchainPort ports.Beaconchain veboBlockDeployment uint64 + csModuleTxReceipt common.Hash servicePrefix string } -func NewValidatorExitRequestEventScanner(storagePort ports.StoragePort, notifierPort ports.NotifierPort, veboPort ports.VeboPort, executionPort ports.ExecutionPort, beaconchainPort ports.Beaconchain, veboBlockDeployment uint64) *ValidatorExitRequestEventScanner { +func NewValidatorExitRequestEventScanner(storagePort ports.StoragePort, notifierPort ports.NotifierPort, veboPort ports.VeboPort, executionPort ports.ExecutionPort, beaconchainPort ports.Beaconchain, veboBlockDeployment uint64, csModuleTxReceipt common.Hash) *ValidatorExitRequestEventScanner { return &ValidatorExitRequestEventScanner{ storagePort, notifierPort, @@ -29,6 +32,7 @@ func NewValidatorExitRequestEventScanner(storagePort ports.StoragePort, notifier executionPort, beaconchainPort, veboBlockDeployment, + csModuleTxReceipt, "ValidatorExitRequestEventScanner", } } @@ -82,6 +86,22 @@ func (vs *ValidatorExitRequestEventScanner) runScan(ctx context.Context) { return } + // Skip if tx receipt not found (nil). This means that the node does not store log receipts and there are no logs at all + receipt, err := vs.executionPort.GetTransactionReceipt(vs.csModuleTxReceipt) + if err != nil { + logger.ErrorWithPrefix(vs.servicePrefix, "Error getting transaction receipt for csModule deployment: %v", err) + return + } + if receipt == nil { + logger.WarnWithPrefix(vs.servicePrefix, "Transaction receipt for csModule deployment not found, skipping ValidatorExitRequest event scan. This means that the node does not store log receipts and there are no logs at all") + // notify the user to switch to an execution client that does store the log receipts + message := "- 🚨 The node does not store log receipts and there are no logs at all. ValidatorExitRequest events cannot be scanned. We highly recommend switching to a Execution Client that does store the log receipts" + if err := vs.notifierPort.SendNotification(message); err != nil { + logger.ErrorWithPrefix(vs.servicePrefix, "Error sending notification: %v", err) + } + return + } + // Retrieve start and end blocks for scanning start, err := vs.storagePort.GetValidatorExitRequestLastProcessedBlock() if err != nil { diff --git a/internal/config/config_loader.go b/internal/config/config_loader.go index 71ae481..de03eee 100644 --- a/internal/config/config_loader.go +++ b/internal/config/config_loader.go @@ -38,6 +38,9 @@ type Config struct { VeboBlockDeployment uint64 CsFeeDistributorBlockDeployment uint64 + // tx receipts + CSModuleTxReceipt common.Hash + // Lido specifics LidoKeysApiUrl string ProxyApiPort uint64 @@ -161,6 +164,7 @@ func LoadNetworkConfig() (Config, error) { VeboBlockDeployment: uint64(30701), CsFeeDistributorBlockDeployment: uint64(1774650), CSModuleAddress: common.HexToAddress("0x4562c3e63c2e586cD1651B958C22F88135aCAd4f"), + CSModuleTxReceipt: common.HexToHash("0x1475719ecbb73b28bc531bb54b37695df1bf6b71c6d2bf1d28b4efa404867e26"), LidoKeysApiUrl: "https://keys-api-holesky.testnet.fi", ProxyApiPort: proxyApiPort, MinGenesisTime: uint64(1695902400), @@ -198,6 +202,7 @@ func LoadNetworkConfig() (Config, error) { VeboBlockDeployment: uint64(17172556), CsFeeDistributorBlockDeployment: uint64(20935463), CSModuleAddress: common.HexToAddress("0xdA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), + CSModuleTxReceipt: common.HexToHash("0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"), LidoKeysApiUrl: "https://keys-api.lido.fi", ProxyApiPort: proxyApiPort, MinGenesisTime: uint64(1606824023), From 324106548b70b739cead2503e746722eae7ad2ec Mon Sep 17 00:00:00 2001 From: pablomendezroyo Date: Mon, 23 Dec 2024 10:26:14 +0100 Subject: [PATCH 2/2] abstract function --- .../adapters/execution/execution_adapter.go | 30 +++++++++++++++++++ .../execution_adapter_integration_test.go | 19 ++++++++++++ internal/application/ports/execution_port.go | 2 +- .../distributionLogUpdatedEventScanner.go | 10 +++---- .../validatorExitRequestEventScanner.go | 12 ++++---- 5 files changed, 61 insertions(+), 12 deletions(-) diff --git a/internal/adapters/execution/execution_adapter.go b/internal/adapters/execution/execution_adapter.go index a1d5c10..31500e5 100644 --- a/internal/adapters/execution/execution_adapter.go +++ b/internal/adapters/execution/execution_adapter.go @@ -202,3 +202,33 @@ func (e *ExecutionAdapter) GetTransactionReceipt(txHash common.Hash) (map[string return result.Result, nil } + +// GetTransactionReceiptExists checks if the transaction receipt exists for a given transaction hash. +// - Reth running as fullnode returns "result": null if the transaction receipt does not exist in the database. +// TODO: test erigon response running it with config not to store receipts +func (e *ExecutionAdapter) GetTransactionReceiptExists(txHash common.Hash) (bool, error) { + receipt, err := e.GetTransactionReceipt(txHash) + if err != nil { + return false, err + } + + // Check if the receipt is nil + if receipt == nil { + return false, nil + } + + // Check if the receipt is an empty map + if len(receipt) == 0 { + return false, nil + } + + // Check specific fields in the receipt to ensure it is valid + if _, ok := receipt["transactionHash"]; !ok { + return false, nil + } + if _, ok := receipt["blockNumber"]; !ok { + return false, nil + } + + return true, nil +} \ No newline at end of file diff --git a/internal/adapters/execution/execution_adapter_integration_test.go b/internal/adapters/execution/execution_adapter_integration_test.go index 60e3809..fe3a868 100644 --- a/internal/adapters/execution/execution_adapter_integration_test.go +++ b/internal/adapters/execution/execution_adapter_integration_test.go @@ -101,3 +101,22 @@ func TestGetTransactionReceiptIntegration(t *testing.T) { // Log the receipt for debugging t.Logf("Transaction receipt: %+v", receipt) } + +// TestGetTransactionReceiptExistsIntegration tests checking if a transaction receipt exists +func TestGetTransactionReceiptExistsIntegration(t *testing.T) { + adapter, err := setupExecutionAdapter(t) + assert.NoError(t, err) + + // Specify a transaction hash to test + txHash := common.HexToHash("0x1475719ecbb73b28bc531bb54b37695df1bf6b71c6d2bf1d28b4efa404867e26") + + // Call the GetTransactionReceiptExists method + exists, err := adapter.GetTransactionReceiptExists(txHash) + assert.NoError(t, err) + + // Ensure exists is true + assert.True(t, exists, "Expected the transaction receipt to exist") + + // Log the result for debugging + t.Logf("Transaction receipt exists for hash %s: %v", txHash.Hex(), exists) +} diff --git a/internal/application/ports/execution_port.go b/internal/application/ports/execution_port.go index 5f9eb91..1326f50 100644 --- a/internal/application/ports/execution_port.go +++ b/internal/application/ports/execution_port.go @@ -6,5 +6,5 @@ type ExecutionPort interface { GetMostRecentBlockNumber() (uint64, error) GetBlockTimestampByNumber(blockNumber uint64) (uint64, error) IsSyncing() (bool, error) - GetTransactionReceipt(txHash common.Hash) (map[string]interface{}, error) + GetTransactionReceiptExists(txHash common.Hash) (bool, error) } diff --git a/internal/application/services/distributionLogUpdatedEventScanner.go b/internal/application/services/distributionLogUpdatedEventScanner.go index 30afb16..a792992 100644 --- a/internal/application/services/distributionLogUpdatedEventScanner.go +++ b/internal/application/services/distributionLogUpdatedEventScanner.go @@ -74,14 +74,14 @@ func (ds *DistributionLogUpdatedEventScanner) runScan(ctx context.Context) { return } - // Skip if tx receipt not found (nil). This means that the node does not store log receipts and there are no logs at all - receipt, err := ds.executionPort.GetTransactionReceipt(ds.csModuleTxReceipt) + // Skip if tx receipt not found. This means that the node does not store log receipts and there are no logs at all + receiptExists, err := ds.executionPort.GetTransactionReceiptExists(ds.csModuleTxReceipt) if err != nil { - logger.ErrorWithPrefix(ds.servicePrefix, "Error getting transaction receipt for csModule deployment: %v", err) + logger.ErrorWithPrefix(ds.servicePrefix, "Error checking if transaction receipt exists: %v", err) return } - if receipt == nil { - logger.WarnWithPrefix(ds.servicePrefix, "Transaction receipt for csModule deployment not found, skipping ValidatorExitRequest event scan. This means that the node does not store log receipts and there are no logs at all") + if !receiptExists { + logger.WarnWithPrefix(ds.servicePrefix, "Transaction receipt for csModule deployment not found. This probably means your node does not store log receipts, check out the official documentation of your node and configure the node to store them. Skipping DistributionLog event scan") return } diff --git a/internal/application/services/validatorExitRequestEventScanner.go b/internal/application/services/validatorExitRequestEventScanner.go index d133a22..fab1e21 100644 --- a/internal/application/services/validatorExitRequestEventScanner.go +++ b/internal/application/services/validatorExitRequestEventScanner.go @@ -86,16 +86,16 @@ func (vs *ValidatorExitRequestEventScanner) runScan(ctx context.Context) { return } - // Skip if tx receipt not found (nil). This means that the node does not store log receipts and there are no logs at all - receipt, err := vs.executionPort.GetTransactionReceipt(vs.csModuleTxReceipt) + // Skip if tx receipt not found. This means that the node does not store log receipts and there are no logs at all + receiptExists, err := vs.executionPort.GetTransactionReceiptExists(vs.csModuleTxReceipt) if err != nil { - logger.ErrorWithPrefix(vs.servicePrefix, "Error getting transaction receipt for csModule deployment: %v", err) + logger.ErrorWithPrefix(vs.servicePrefix, "Error checking if transaction receipt exists: %v", err) return } - if receipt == nil { - logger.WarnWithPrefix(vs.servicePrefix, "Transaction receipt for csModule deployment not found, skipping ValidatorExitRequest event scan. This means that the node does not store log receipts and there are no logs at all") + if !receiptExists { + logger.WarnWithPrefix(vs.servicePrefix, "Transaction receipt for csModule deployment not found. This probably means your node does not store log receipts, check out the official documentation of your node and configure the node to store them. Skipping ValidatorExitRequests event scan") // notify the user to switch to an execution client that does store the log receipts - message := "- 🚨 The node does not store log receipts and there are no logs at all. ValidatorExitRequest events cannot be scanned. We highly recommend switching to a Execution Client that does store the log receipts" + message := "- 🚨 Your Execution Client appears to be missing log receipt storage. As a result, ValidatorExitRequest events cannot be scanned. To resolve this issue, consider switching to an Execution Client that supports log receipt storage or updating your node configuration to enable this feature" if err := vs.notifierPort.SendNotification(message); err != nil { logger.ErrorWithPrefix(vs.servicePrefix, "Error sending notification: %v", err) }