From f4d2c8304af093365f44f66f3221fab3cedd7950 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Tue, 5 Sep 2023 19:02:55 +0000 Subject: [PATCH] write latest block to db on initialization --- relayer/relayer.go | 43 ++++++++++++++++++----------- vms/evm/subscriber.go | 63 ++++++++++++++++++++++++------------------- 2 files changed, 63 insertions(+), 43 deletions(-) diff --git a/relayer/relayer.go b/relayer/relayer.go index 4be40273..f965db78 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -125,29 +125,42 @@ func NewRelayer( return nil, nil, err } - // Get the latest processed block height from the database. If the database doesn't have a value for the latest block height, - // for this chain, return here without an error. This will cause the subscriber to begin processing new incoming warp messages. - latestSeenBlockData, err := r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey)) - if errors.Is(err, database.ErrChainNotFound) { + // Get the latest processed block height from the database. + var ( + latestSeenBlockData []byte + latestSeenBlock *big.Int // initialized to nil + ) + latestSeenBlockData, err = r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey)) + + // The following cases are treated as successful: + // 1) The database contains the latest seen block data for the chain + // - In this case, we parse the block height and process warp logs from that height to the current block + // 2) The database has been configured for the chain, but does not contain the latest seen block data + // - In this case, we save the current block height in the database, but do not process any historical warp logs + if err == nil { + // If the database contains the latest seen block data, then back-process all warp messages from the + // latest seen block to the latest block + // This will query the node for any logs that match the filter query from the stored block height, + r.logger.Info("latest processed block", zap.String("block", string(latestSeenBlockData))) + var success bool + latestSeenBlock, success = new(big.Int).SetString(string(latestSeenBlockData), 10) + if !success { + r.logger.Error("failed to convert latest block to big.Int", zap.Error(err)) + return nil, nil, err + } + } else if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) { + // Otherwise, latestSeenBlock is nil, so the call to ProcessFromHeight will simply update the database with the + // latest block height logger.Info( "Latest seen block not found in database. Starting from latest block.", zap.String("chainID", r.sourceChainID.String()), ) - return &r, sub, nil - } - if err != nil { + } else { r.logger.Warn("failed to get latest block from database", zap.Error(err)) return nil, nil, err } - latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10) - if !success { - r.logger.Error("failed to convert latest block to big.Int", zap.Error(err)) - return nil, nil, err - } - // Back-process all warp messages from the latest seen block to the latest block - // This will query the node for any logs that match the filter query from the stored block height, - // and process the contained warp messages. If initialization fails, continue with normal relayer operation, but log the error. + // Process historical logs. If this fails for any reason, continue with normal relayer operation, but log the error. err = sub.ProcessFromHeight(latestSeenBlock) if err != nil { logger.Warn( diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 2f7bb505..f6304f12 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -139,6 +139,9 @@ func (s *subscriber) forwardLogs() { } } +// Process logs from the given block height to the latest block +// If height is nil, then simply store the latest block height in the database, +// but do not process any logs func (s *subscriber) ProcessFromHeight(height *big.Int) error { ethClient, err := ethclient.Dial(s.nodeRPCURL) if err != nil { @@ -155,39 +158,43 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error { return err } - // Filter logs from the latest processed block to the latest block - // Since initializationFilterQuery does not modify existing fields of warpFilterQuery, - // we can safely reuse warpFilterQuery with only a shallow copy - initializationFilterQuery := interfaces.FilterQuery{ - Topics: warpFilterQuery.Topics, - Addresses: warpFilterQuery.Addresses, - FromBlock: height, - } - logs, err := ethClient.FilterLogs(context.Background(), initializationFilterQuery) - if err != nil { - s.logger.Error( - "Failed to get logs on initialization", - zap.Error(err), - ) - return err - } - - // Queue each of the logs to be processed - s.logger.Info( - "Processing logs on initialization", - zap.String("fromBlockHeight", height.String()), - zap.String("toBlockHeight", strconv.Itoa(int(latestBlock))), - ) - for _, log := range logs { - messageInfo, err := s.NewWarpLogInfo(log) + // Only process logs if the provided height is not nil. Otherwise, simply update the database with + // the latest block height + if height != nil { + // Filter logs from the latest processed block to the latest block + // Since initializationFilterQuery does not modify existing fields of warpFilterQuery, + // we can safely reuse warpFilterQuery with only a shallow copy + initializationFilterQuery := interfaces.FilterQuery{ + Topics: warpFilterQuery.Topics, + Addresses: warpFilterQuery.Addresses, + FromBlock: height, + } + logs, err := ethClient.FilterLogs(context.Background(), initializationFilterQuery) if err != nil { s.logger.Error( - "Invalid log when processing from height. Continuing.", + "Failed to get logs on initialization", zap.Error(err), ) - continue + return err + } + + // Queue each of the logs to be processed + s.logger.Info( + "Processing logs on initialization", + zap.String("fromBlockHeight", height.String()), + zap.String("toBlockHeight", strconv.Itoa(int(latestBlock))), + ) + for _, log := range logs { + messageInfo, err := s.NewWarpLogInfo(log) + if err != nil { + s.logger.Error( + "Invalid log when processing from height. Continuing.", + zap.Error(err), + ) + continue + } + s.log <- *messageInfo } - s.log <- *messageInfo } // Update the database with the latest seen block height