Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process missed blocks on startup #20

Merged
merged 52 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c74ef48
add json database
cam-schultz Aug 29, 2023
1280b4a
initialize subscriber from stored block height
cam-schultz Aug 29, 2023
551b90e
subscriber initialization failure is non fatal
cam-schultz Aug 29, 2023
5ca0bd0
Merge branch 'main' into catch-up-blocks
cam-schultz Aug 29, 2023
ba3f2fb
add storage location cfg option
cam-schultz Aug 29, 2023
39fdc21
better read error handling
cam-schultz Aug 30, 2023
a1aab00
add comments
cam-schultz Aug 30, 2023
6913fb9
properly set default storage location
cam-schultz Aug 30, 2023
505b337
fix update json db vals bug
cam-schultz Aug 30, 2023
de7a347
add initialization log message
cam-schultz Aug 30, 2023
349ac1e
typo
cam-schultz Aug 30, 2023
162ca0d
validate rpc endpoint in config
cam-schultz Aug 30, 2023
7d07d82
create subscription before processing back events
cam-schultz Aug 30, 2023
9a0baa0
rename type
cam-schultz Aug 30, 2023
dc7ae3c
rename latestSeenBlock
cam-schultz Aug 30, 2023
860f595
json storage unit tests
cam-schultz Aug 30, 2023
aa8dd8b
correct comment
cam-schultz Aug 30, 2023
ac5d57d
better error messages
cam-schultz Aug 30, 2023
9a51727
more unit tests
cam-schultz Aug 31, 2023
16af742
error on invalid log
cam-schultz Sep 1, 2023
2bb9605
save constructed urls
cam-schultz Sep 1, 2023
1727f90
make json db writes atomic
cam-schultz Sep 1, 2023
e49395b
add license header
cam-schultz Sep 1, 2023
ebdfc79
properly handle json file missing case
cam-schultz Sep 5, 2023
532094c
cleanup
cam-schultz Sep 5, 2023
f4d2c83
write latest block to db on initialization
cam-schultz Sep 5, 2023
17879c0
Merge branch 'main' into catch-up-blocks
cam-schultz Sep 5, 2023
4e5419d
consolidate endpoint methods
cam-schultz Sep 5, 2023
cf9a0e7
clean up comments
cam-schultz Sep 5, 2023
9fdd275
rename var
cam-schultz Sep 5, 2023
32146a4
cap the number of blocks to process on startup
cam-schultz Sep 5, 2023
ce66b82
cleanup
cam-schultz Sep 5, 2023
e012c4e
refactor subscriber interface
cam-schultz Sep 5, 2023
44384ca
fix test
cam-schultz Sep 5, 2023
e9c0b01
use Int:Sub
cam-schultz Sep 6, 2023
878d4c6
process logs in serial for each source chain
cam-schultz Sep 6, 2023
85cf76c
return nil error in success case
cam-schultz Sep 6, 2023
813a8e9
remove irrelevant comment
cam-schultz Sep 6, 2023
fde1675
improve logs
cam-schultz Sep 7, 2023
c4e36bd
sort logs from eth_getLogs
cam-schultz Sep 7, 2023
b8902c5
fix logs
cam-schultz Sep 7, 2023
9bda673
sort by log index
cam-schultz Sep 11, 2023
48ec689
cleanup
cam-schultz Sep 11, 2023
4ed7671
cache current state
cam-schultz Sep 11, 2023
4cdc487
clarify latestSeenBlock comments
cam-schultz Sep 11, 2023
55bc03c
clarify latestSeenBlock comments
cam-schultz Sep 11, 2023
232f6fe
fix test
cam-schultz Sep 11, 2023
5b0cc63
rename latestProcessedBlock
cam-schultz Sep 12, 2023
995d204
rename SetProcessedBlockHeightToLatest
cam-schultz Sep 13, 2023
e30a4ac
update comment
cam-schultz Sep 13, 2023
1e9891d
Merge branch 'main' into catch-up-blocks
cam-schultz Sep 13, 2023
8870d8b
appease linter
cam-schultz Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

const (
LatestSeenBlockKey = "latestSeenBlock"
LatestProcessedBlockKey = "latestProcessedBlock"
)

var (
Expand Down
14 changes: 7 additions & 7 deletions database/json_file_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func TestConcurrentWriteReadSingleChain(t *testing.T) {
finalTargetValue := uint64(11)
testWrite(jsonStorage, networks[0], finalTargetValue)

latestSeenBlockData, err := jsonStorage.Get(networks[0], []byte(LatestSeenBlockKey))
latestProcessedBlockData, err := jsonStorage.Get(networks[0], []byte(LatestProcessedBlockKey))
if err != nil {
t.Fatalf("failed to retrieve from JSON storage. err: %v", err)
}
latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10)
latestProcessedBlock, success := new(big.Int).SetString(string(latestProcessedBlockData), 10)
if !success {
t.Fatalf("failed to convert latest block to big.Int. err: %v", err)
}
assert.Equal(t, finalTargetValue, latestSeenBlock.Uint64(), "latest seen block height is not correct.")
assert.Equal(t, finalTargetValue, latestProcessedBlock.Uint64(), "latest processed block height is not correct.")

}

Expand Down Expand Up @@ -78,15 +78,15 @@ func TestConcurrentWriteReadMultipleChains(t *testing.T) {
}

for i, id := range networks {
latestSeenBlockData, err := jsonStorage.Get(id, []byte(LatestSeenBlockKey))
latestProcessedBlockData, err := jsonStorage.Get(id, []byte(LatestProcessedBlockKey))
if err != nil {
t.Fatalf("failed to retrieve from JSON storage. networkID: %d err: %v", i, err)
}
latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10)
latestProcessedBlock, success := new(big.Int).SetString(string(latestProcessedBlockData), 10)
if !success {
t.Fatalf("failed to convert latest block to big.Int. err: %v", err)
}
assert.Equal(t, finalTargetValue, latestSeenBlock.Uint64(), fmt.Sprintf("latest seen block height is not correct. networkID: %d", i))
assert.Equal(t, finalTargetValue, latestProcessedBlock.Uint64(), fmt.Sprintf("latest processed block height is not correct. networkID: %d", i))
}
}

Expand All @@ -110,7 +110,7 @@ func setupJsonStorage(t *testing.T, networks []ids.ID) *JSONFileStorage {

func testWrite(storage *JSONFileStorage, chainID ids.ID, height uint64) {
fmt.Println(chainID, height)
err := storage.Put(chainID, []byte(LatestSeenBlockKey), []byte(strconv.FormatUint(height, 10)))
err := storage.Put(chainID, []byte(LatestProcessedBlockKey), []byte(strconv.FormatUint(height, 10)))
if err != nil {
fmt.Printf("failed to put data: %v", err)
return
Expand Down
32 changes: 16 additions & 16 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,26 @@ func NewRelayer(
return nil, nil, err
}

// Get the latest seen block height from the database.
latestSeenBlockData, err := r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey))
// Get the latest processed block height from the database.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this to a helper similar to processPreviousBlocks?

latestProcessedBlockData, err := r.db.Get(r.sourceChainID, []byte(database.LatestProcessedBlockKey))

// The following cases are treated as successful:
// 1) The database contains the latest seen block data for the chain
// 1) The database contains the latest processed block data for the chain
// - In this case, we parse the block height and process warp logs from that height to the current block
// 2) The database has been configured for the chain, but does not contain the latest seen block data
// 2) The database has been configured for the chain, but does not contain the latest processed block data
// - In this case, we save the current block height in the database, but do not process any historical warp logs
if err == nil {
// If the database contains the latest seen block data, then back-process all warp messages from that block to the latest block
// Note that the latest seen block may have already been partially (or fully) processed by the relayer on a previous run. When
// processing a warp message in real time, which is when we update the latest seen block in the database, we have no way of knowing
// If the database contains the latest processed block data, then back-process all warp messages from that block to the latest block
// Note that the retrieved latest processed block may have already been partially (or fully) processed by the relayer on a previous run. When
// processing a warp message in real time, which is when we update the latest processed block in the database, we have no way of knowing
// if that is the last warp message in the block
latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10)
latestProcessedBlock, success := new(big.Int).SetString(string(latestProcessedBlockData), 10)
if !success {
r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
return nil, nil, err
}

err = sub.ProcessFromHeight(latestSeenBlock)
err = sub.ProcessFromHeight(latestProcessedBlock)
if err != nil {
logger.Warn(
"Encountered an error when processing historical blocks. Continuing to normal relaying operation.",
Expand All @@ -146,16 +146,16 @@ func NewRelayer(
return &r, sub, nil
}
if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) {
// Otherwise, latestSeenBlock is nil, so we instead store the latest block height.
// Otherwise, latestProcessedBlock is nil, so we instead store the latest block height.
logger.Info(
"Latest seen block not found in database. Starting from latest block.",
"Latest processed block not found in database. Starting from latest block.",
zap.String("chainID", r.sourceChainID.String()),
)

err := sub.UpdateLatestSeenBlock()
err := sub.UpdateLatestProcessedBlock()
if err != nil {
logger.Warn(
"Failed to update latest seen block. Continuing to normal relaying operation",
"Failed to update latest processed block. Continuing to normal relaying operation",
zap.String("chainID", r.sourceChainID.String()),
zap.Error(err),
)
Expand Down Expand Up @@ -234,11 +234,11 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag
// Increment the request ID for the next message relay request
r.currentRequestID++

// Update the database with the latest seen block height
err = r.db.Put(r.sourceChainID, []byte(database.LatestSeenBlockKey), []byte(strconv.FormatUint(warpLogInfo.BlockNumber, 10)))
// Update the database with the latest processed block height
err = r.db.Put(r.sourceChainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(warpLogInfo.BlockNumber, 10)))
if err != nil {
r.logger.Error(
fmt.Sprintf("failed to put %s into database", database.LatestSeenBlockKey),
fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey),
zap.Error(err),
)
}
Expand Down
10 changes: 5 additions & 5 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
height = big.NewInt(0).Add(toBlock, big.NewInt(-MaxBlocksToProcess))
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

// Filter logs from the latest seen block to the latest block
// Filter logs from the latest processed block to the latest block
// Since initializationFilterQuery does not modify existing fields of warpFilterQuery,
// we can safely reuse warpFilterQuery with only a shallow copy
initializationFilterQuery := interfaces.FilterQuery{
Expand Down Expand Up @@ -226,9 +226,9 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error {
return nil
}

func (s *subscriber) UpdateLatestSeenBlock() error {
func (s *subscriber) UpdateLatestProcessedBlock() error {
s.logger.Info(
"Updating latest seen block in database",
"Updating latest processed block in database",
zap.String("chainID", s.chainID.String()),
)
ethClient, err := ethclient.Dial(s.nodeRPCURL)
Expand All @@ -251,10 +251,10 @@ func (s *subscriber) UpdateLatestSeenBlock() error {
return err
}

err = s.db.Put(s.chainID, []byte(database.LatestSeenBlockKey), []byte(strconv.FormatUint(latestBlock, 10)))
err = s.db.Put(s.chainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(latestBlock, 10)))
if err != nil {
s.logger.Error(
fmt.Sprintf("failed to put %s into database", database.LatestSeenBlockKey),
fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey),
zap.String("chainID", s.chainID.String()),
zap.Error(err),
)
Expand Down
4 changes: 2 additions & 2 deletions vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Subscriber interface {
// ProcessFromHeight processes events from {height} to the latest block
ProcessFromHeight(height *big.Int) error

// UpdateLatestSeenBlock retrieves the latest block from the chain and updates the database
UpdateLatestSeenBlock() error
// UpdateLatestProcessedBlock retrieves the latest block from the chain and updates the database
UpdateLatestProcessedBlock() error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final nit: Could we think of a more descriptive name for this function? My only idea is SetInitialProcessedBlockHeight, to make it clear that it is only used when there is not an existing processed block height on start up, and that it uses whatever the current block height of the chain is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more in favor of omitting Initial from the function name, since the interface imposes no requirement that this function be called at any particular time in the object lifecycle. I agree it could be more descriptive as to what it's actually updating the database value to. I've renamed it SetProcessedBlockHeightToLatest; let me know if you think another name more accurately reflects the behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, yeah totally agree. Good call 🙏


// Subscribe registers a subscription. After Subscribe is called,
// log events that match [filter] are written to the channel returned
Expand Down
Loading