-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Process missed blocks on startup #20
Conversation
database/json_file_storage.go
Outdated
} | ||
|
||
// Get the latest chain state from the json database, and retrieve the value from the key | ||
func (s *JsonFileStorage) Get(chainID ids.ID, key []byte) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is my understanding correct that Get
is only called once on start up to see what the latest processed block height was from the previous runs? Then we relay any messages up to the tip of chain, and then rely on the subscriber to push new logs, so we don't need to know what the latest processed block height at any other point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct.
vms/evm/subscriber.go
Outdated
@@ -84,7 +117,87 @@ func (s *subscriber) forwardLogs() { | |||
continue | |||
} | |||
s.log <- *messageInfo | |||
|
|||
// Update the database with the latest block height | |||
// TODO: This should also be done in a separate goroutine, rather than waiting for warp messages to be processed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify here, do you mean "rather than waiting for the write to disk to finish before processing future messages"?
I definitely agree with the TODOs here though, we should address them prior to merging I think. Another edge case to consider is if there are multiple messages in the same block. We should be careful to only update the "latest block height" once we are sure that all messages prior to (and including?) that block have been processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite, I more meant we should update the latest processed height on disk periodically, rather than only doing so when we process a warp message. I was thinking we'd want to avoid processing large gaps of blocks without warp messages on startup. Though thinking about it some more, I don't think that's even necessary, since the eth_getLogs
call on startup won't return any logs for that large gap anyway.
To illustrate, say there's a warp message at block 1000, and the next warp message is at block 2000, but the relayer is offline and unable to process that second message. On restart, whether or not the stored block heigh is 1000 or 1999, the eth_getLogs
query will return the same result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point on updating the latest block height only after all messages have been processed. I'll add that in as part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in the current iteration of this PR, we actually store the latest seen block, not the latest fully processed block (I will rename the variables and keys to be more clear). What this means is that if the relayer falls over while processing a large number of warp messages from a particular block, we'll re-process all of those messages on restart. That's obviously not the most efficient use of gas, as already delivered messages would be re-broadcast, so I created this ticket to address that: #22. IMO we can save that for a separate PR, since it doesn't impact correctness here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah all of that reasoning sounds good to me. I just want to confirm that we don't update the latest seen block until after all messages in that block have been processed. Otherwise, if we update the latest seen block prior to processing all messages in that block, we may end up missing messages in that block if the relayer hits a failure and restarts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eth_getLogs
returns logs from the provided block height inclusive. So say we see block 1000, and only process some of the warp messages contained therein. On restart, we'll reprocess block 1000, potentially re processing warp messages, but definitely not missing any.
relayer/relayer.go
Outdated
go r.RouteToMessageChannel() | ||
|
||
// Initialize the subscriber. This will poll the node for any logs that match the filter query from the stored block height, | ||
// and process the contained warp messages. If initialization fails, continue with normal relayer operation, but log the error. | ||
err = sub.Initialize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a possibility to still not process messages from all blocks here because the subscription is opened until after we finish Initialize
, so there could theoretically be new blocks produced/accepted in between those two events.
I'm not sure how complicated the implementation would be, but I think the desired behavior would be:
- Open the subscription to be notified about logs in new blocks.
- Get the current block height on chain.
- Process messages from any blocks from the "latest processed" height up to the current block height.
- Start processing any messages received over the subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think that should be fine, as long as we grab the latest processed block from the db before opening the subscription. Message order wouldn't be guaranteed, but that's not part of the relayer spec anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RIght now I think we're running subscribe
and processFromHeight
in two separate go routines concurrently, so if new blocks came before processFromHeight
gets back the latest block height, we'd double process that block right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. I think it would be somewhat tricky to cover all the edge cases here, especially since the eth_subscribe
and eth_ blockNumber
calls cannot be done atomically. The alternative I'm currently in favor of is implementing #22 to guard against double processing, rather than doing so by synchronizing the various goroutines.
vms/evm/subscriber.go
Outdated
} | ||
|
||
// Get the latest processed block height from the database. | ||
heightData, err := s.db.Get(s.chainID, []byte(database.LatestBlockHeightKey)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we handle the "file doesn't exist" error case here for the first initialization of the relayer?
We should clearly document the behavior in this case, but I think it likely makes the most sense to treat the case where the file doesn't exist as the first run of the relayer, and not relay any messages prior to it (since that would involve checking each message from the start of the chain).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Initialize
fails for any reason, we simply continue with normal operation, opening subscriptions, relaying messages, etc. So if the file doesn't exist, we don't back-process any warp messages, and only process new incoming ones. OTOH, if the file does exist and has latestProcessedBlock: 1
, then yes, we'll deliver every warp message from genesis (though I'm sure eth_getLogs
has some limits built in).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now when we do db.Get
it looks to me if a file does not exist, we still return an error, and when the relayer.NewRelayer
sees the errror it'll return and not continue processing.
I think we should return a specific file does not exist error from db.Get
, and then in NewRelayer
if we get that specific error back we return without setting latestSeenBlock
and going through processFromHeight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like
// Get the latest processed block height from the database.
latestSeenBlockData, err := r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey))
if err != nil {
if err == FileDoesNotExist {
r.logger.Info("First run of relayer for chain ID %s, db file does not exist")
return &r, sub, nil
}
r.logger.Warn("failed to get latest block from database", zap.Error(err))
return nil, nil, err
}
latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10)
if !success {
r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
return nil, nil, err
}
// Back-process all warp messages from the latest seen block to the latest block
// This will query the node for any logs that match the filter query from the stored block height,
// and process the contained warp messages. If initialization fails, continue with normal relayer operation, but log the error.
err = sub.ProcessFromHeight(latestSeenBlock)
if err != nil {
logger.Warn(
"Encountered an error when processing historical blocks. Continuing to normal relaying operation.",
zap.Error(err),
)
}
return &r, sub, nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed this now. In the case in which the file doesn't exist, we return the relayer instance, but skip the sub.ProcessFromHeight
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though I'm sure eth_getLogs has some limits built in
That's a good call out. We should check this to make sure we handle any page limits properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this limit is imposed by the deployment or client, and not by the method itself. Most services limit the number of blocks to 500-2000. I'd recommend limiting it to 200 as a conservative starting point. We can reevaluate this figure or add batched calls to accomodate larger ranges in the future, but IMO this is not an immediate concern.
|
||
// If the file does not exist, return false, but do not return an error as this | ||
// is an expected case | ||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would os.ErrNotExist
catch? In the comments and it mentions returning a *PathError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and how should we handle if an error is returned but it's not os.ErrNoExist
? Right now we continue to read file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
os.Stat
returns a *PathError
that wraps a os.ErrNotExist
if the file is not found. See here for an example: https://go.dev/play/p/qFwPDKFqSZS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
// RelayerDatabase is a key-value store for relayer state, with each chainID maintaining its own state | ||
type RelayerDatabase interface { | ||
Get(chainID ids.ID, key []byte) ([]byte, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we return []byte instead of string directly? When we write in to the file, we convert []byte to string. Also, when we use the result, we convert []byte to string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this offline, but for visibility, I chose []byte
in the interface so that the keys and values could be as general as possible. The underlying JSON databse uses strings so that the resulting file is human readable.
5b0cc63
vms/subscriber.go
Outdated
// UpdateLatestSeenBlock retrieves the latest block from the chain and updates the database | ||
UpdateLatestSeenBlock() error | ||
// UpdateLatestProcessedBlock retrieves the latest block from the chain and updates the database | ||
UpdateLatestProcessedBlock() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final nit: Could we think of a more descriptive name for this function? My only idea is SetInitialProcessedBlockHeight
, to make it clear that it is only used when there is not an existing processed block height on start up, and that it uses whatever the current block height of the chain is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm more in favor of omitting Initial
from the function name, since the interface imposes no requirement that this function be called at any particular time in the object lifecycle. I agree it could be more descriptive as to what it's actually updating the database value to. I've renamed it SetProcessedBlockHeightToLatest
; let me know if you think another name more accurately reflects the behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, yeah totally agree. Good call 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
// Relay the message to the destination. Messages from a given source chain must be processed in serial in order to | ||
// guarantee that the previous block (n-1) is fully processed by the relayer when processing a given log from block n. | ||
err = messageRelayer.relayMessage(warpMessageInfo, r.currentRequestID, messageManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a future PR: If we are not already, it might be helpful to capture some performance metrics (e.g., latency, wait time) so that we aren't caught off guard if the system is performing poorly. With our design, a relayer that can't keep up with the request rate may eventually not make any progress if it falls too far behind.
relayer/relayer.go
Outdated
zap.String("chainID", r.sourceChainID.String()), | ||
zap.Error(err), | ||
) | ||
return nil, nil, err | ||
} | ||
|
||
// RelayMessage relays a single warp message to the destination chain. Warp message relay requests are concurrent with each other, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Warp message relay requests are concurrent with each other"
Does this need to be updated after the recent change to process messages from a given source serially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch - updated now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
1e9891d
Why this should be merged
Enables the relayer to process warp messages sent while the relayer was offline. Fixes #14
How this works
Adds a
ProcessFromHeight
method toSubscriber
. The EVM implementation callseth_getLogs
to retrieve Warp logs since the last processed block. Adds JSON storage to persist the last processed block.How this was tested
CI, Teleporter integration tests, added unit tests for JSON storage, manual testing. Will circle back and add end-to-end tests once #15 is complete.