Skip to content

Commit

Permalink
faster beefy sync
Browse files Browse the repository at this point in the history
  • Loading branch information
vgeddes committed Nov 29, 2024
1 parent 5e9bfdd commit c743ec7
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions relayer/relays/beefy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error {
var parachainConn *parachain.Connection

// Initialize parachainConn
parachainConn := parachain.NewConnection(relay.config.Source.BridgeHubEndpoint)
err = parachainConn.Connect(ctx)
parachainConn = parachain.NewConnection(relay.config.Source.BridgeHubEndpoint, nil)
err := parachainConn.ConnectWithHeartBeat(ctx, 30*time.Second)
if err != nil {
return fmt.Errorf("create parachain connection: %w", err)
}

// Initialize relaychainConn
err = relay.relaychainConn.Connect(ctx)
err = relay.relaychainConn.ConnectWithHeartBeat(ctx, 30*time.Second)
if err != nil {
return fmt.Errorf("create relaychain connection: %w", err)
}
Expand All @@ -192,19 +192,19 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error {
return fmt.Errorf("initialize EthereumWriter: %w", err)
}

gatewayAddress := common.HexToAddress(relay.)
gatewayContract, err := contracts.NewGateway(address, wr.conn.Client())
gatewayAddress := common.HexToAddress(relay.config.Sink.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(gatewayAddress, relay.ethereumConn.Client())
if err != nil {
return fmt.Errorf("create gateway client: %w", err)
}

for {
paraNonce, ethNonce, err := relay.queryNonces(ctx)
paraNonce, ethNonce, err := relay.queryNonces(ctx, parachainConn, gatewayContract)
if err != nil {
return fmt.Errorf("require sync: %w", err)
}

if requireSync {
if paraNonce > ethNonce {
beefyBlockHash, err := relay.relaychainConn.API().RPC.Beefy.GetFinalizedHead()
if err != nil {
return fmt.Errorf("fetch latest beefy block: %w", err)
Expand All @@ -218,45 +218,44 @@ func (relay *Relay) RateLimitedSync(ctx context.Context) error {
relay.doSync(ctx, uint64(header.Number))
}

// Sleep for 1 minute
// Sleep for 5 minute
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second * 60):
case <-time.After(time.Second * 300):
}
}
}

func (relay *Relay) queryNonces(ctx context.Context) (uint64, uint64, error) {
func (relay *Relay) queryNonces(ctx context.Context, parachainConn *parachain.Connection, gatewayContract *contracts.Gateway) (uint64, uint64, error) {
data, err := types.HexDecodeString("0xc173fac324158e77fb5840738a1a541f633cbec8884c6a601c567d2b376a0539")
if err != nil {
return 0, fmt.Errorf("hex decode assethub channel: %w", err)
return 0, 0, fmt.Errorf("hex decode assethub channel: %w", err)
}

assetHubChannelID := *(*[32]byte)(data)

paraNonce, err := relay.fetchLatestParachainNonce(ctx, assetHubChannelID)
paraNonce, err := relay.fetchLatestParachainNonce(ctx, assetHubChannelID, parachainConn)
if err != nil {
return false, fmt.Errorf("fetch latest parachain nonce: %w", err)
return 0, 0, fmt.Errorf("fetch latest parachain nonce: %w", err)
}

ethNonce, err := relay.fetchEthereumNonce(ctx, assetHubChannelID)
ethNonce, err := relay.fetchEthereumNonce(ctx, assetHubChannelID, gatewayContract)
if err != nil {
return false, fmt.Errorf("fetch latest ethereum nonce: %w", err)
return 0, 0, fmt.Errorf("fetch latest ethereum nonce: %w", err)
}


return paraNonce, ethNonce, nil
}

func (r *Relay) fetchLatestParachainNonce(_ context.Context, channelId [32]byte) (uint64, error) {
paraNonceKey, err := types.CreateStorageKey(r.parachainConn.Metadata(), "EthereumOutboundQueue", "Nonce", channelId[:], nil)
func (r *Relay) fetchLatestParachainNonce(_ context.Context, channelId [32]byte, parachainConn *parachain.Connection) (uint64, error) {
paraNonceKey, err := types.CreateStorageKey(parachainConn.Metadata(), "EthereumOutboundQueue", "Nonce", channelId[:], nil)
if err != nil {
return 0, fmt.Errorf("create storage key for EthereumOutboundQueue.Nonce(%v): %w",
channelId, err)
}
var paraOutboundNonce uint64
ok, err := r.parachainConn.API().RPC.State.GetStorageLatest(paraNonceKey, &paraOutboundNonce)
ok, err := parachainConn.API().RPC.State.GetStorageLatest(paraNonceKey, &paraOutboundNonce)
if err != nil {
return 0, fmt.Errorf("fetch storage EthereumOutboundQueue.Nonce(%v): %w",
channelId, err)
Expand All @@ -268,11 +267,11 @@ func (r *Relay) fetchLatestParachainNonce(_ context.Context, channelId [32]byte)
return paraOutboundNonce, nil
}

func (r *Relay) fetchEthereumNonce(ctx context.Context, channelId [32]byte) (uint64, error) {
func (r *Relay) fetchEthereumNonce(ctx context.Context, channelId [32]byte, gatewayContract *contracts.Gateway) (uint64, error) {
opts := bind.CallOpts{
Context: ctx,
}
ethInboundNonce, _, err := r.gatewayContract.ChannelNoncesOf(&opts, channelId)
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&opts, channelId)
if err != nil {
return 0, fmt.Errorf("fetch Gateway.ChannelNoncesOf(%v): %w", channelId, err)
}
Expand Down

0 comments on commit c743ec7

Please sign in to comment.