Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ethereum Relayer V2 #1341

Draft
wants to merge 2 commits into
base: vincent/v2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions contracts/src/Gateway.sol
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ contract Gateway is IGatewayBase, IGatewayV1, IGatewayV2, IInitializable, IUpgra
return Functions.ensureAgent(agentID);
}

function outboundNonce()
external
view
returns (uint64)
{
return CallsV2.outboundNonce();
}

function pricingParameters() external view returns (UD60x18, uint128) {
return CallsV1.pricingParameters();
}
Expand Down
4 changes: 4 additions & 0 deletions contracts/src/v2/Calls.sol
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,8 @@ library CallsV2 {
);
}
}

function outboundNonce() external view returns (uint64) {
return CoreStorage.layout().outboundNonce;
}
}
2 changes: 2 additions & 0 deletions contracts/src/v2/IGateway.sol
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ interface IGatewayV2 {

function agentOf(bytes32 agentID) external view returns (address);

function outboundNonce() external view returns (uint64);

/**
* Events
*/
Expand Down
2 changes: 1 addition & 1 deletion relayer/cmd/generate_beacon_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func getEthereumEvent(ctx context.Context, gatewayContract *contracts.Gateway, c
for event == nil {
log.Info("looking for Ethereum event")

iter, err := gatewayContract.FilterOutboundMessageAccepted(&opts, [][32]byte{channelID}, [][32]byte{})
iter, err := gatewayContract.FilterOutboundMessageAccepted(&opts)
if err != nil {
return nil, err
}
Expand Down
19 changes: 14 additions & 5 deletions relayer/cmd/run/parachain/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"github.com/snowfork/snowbridge/relayer/chain/ethereum"
para "github.com/snowfork/snowbridge/relayer/chain/parachain"
"github.com/snowfork/snowbridge/relayer/relays/parachain"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

var (
configFile string
privateKey string
privateKeyFile string
privateKeyID string
configFile string
privateKey string
privateKeyFile string
privateKeyID string
parachainPrivateKey string
)

func Command() *cobra.Command {
Expand All @@ -42,6 +44,8 @@ func Command() *cobra.Command {
cmd.Flags().StringVar(&privateKeyFile, "ethereum.private-key-file", "", "The file from which to read the private key")
cmd.Flags().StringVar(&privateKeyID, "ethereum.private-key-id", "", "The secret id to lookup the private key in AWS Secrets Manager")

cmd.Flags().StringVar(&parachainPrivateKey, "substrate.private-key", "", "substrate private key")

return cmd
}

Expand Down Expand Up @@ -70,7 +74,12 @@ func run(_ *cobra.Command, _ []string) error {
return err
}

relay, err := parachain.NewRelay(&config, keypair)
keypair2, err := para.ResolvePrivateKey(parachainPrivateKey, "", "")
if err != nil {
return err
}

relay, err := parachain.NewRelay(&config, keypair, keypair2)
if err != nil {
return err
}
Expand Down
1,873 changes: 161 additions & 1,712 deletions relayer/contracts/gateway.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion relayer/generate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:generate bash -c "jq .abi ../contracts/out/BeefyClient.sol/BeefyClient.json | abigen --abi - --type BeefyClient --pkg contracts --out contracts/beefy_client.go"
//go:generate bash -c "jq .abi ../contracts/out/IGateway.sol/IGateway.json | abigen --abi - --type Gateway --pkg contracts --out contracts/gateway.go"
//go:generate bash -c "jq .abi ../contracts/out/IGateway.sol/IGatewayV2.json | abigen --abi - --type Gateway --pkg contracts --out contracts/gateway.go"

package main
160 changes: 110 additions & 50 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,44 +118,42 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
"channelId": r.config.Source.ChannelID,
}).Info("Polling Nonces")

paraNonce, err := r.fetchLatestParachainNonce()
ethNonce, err := r.fetchEthereumNonce(ctx)
if err != nil {
return err
}

ethNonce, err := r.fetchEthereumNonce(ctx)
paraNonces, err := r.fetchUnprocessedParachainNonces(ethNonce)
if err != nil {
return err
}

log.WithFields(log.Fields{
"channelId": types.H256(r.config.Source.ChannelID).Hex(),
"paraNonce": paraNonce,
"paraNonces": paraNonces,
"ethNonce": ethNonce,
"instantVerification": r.config.InstantVerification,
}).Info("Polled Nonces")

if paraNonce == ethNonce {
continue
}

blockNumber, err := ethconn.Client().BlockNumber(ctx)
if err != nil {
return fmt.Errorf("get last block number: %w", err)
}

events, err := r.findEvents(ctx, blockNumber, paraNonce+1)
if err != nil {
return fmt.Errorf("find events: %w", err)
}
for _, paraNonce := range paraNonces {
events, err := r.findEvents(ctx, blockNumber, paraNonce+1)
if err != nil {
return fmt.Errorf("find events: %w", err)
}

for _, ev := range events {
err := r.waitAndSend(ctx, ev)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet")
continue
} else if err != nil {
return fmt.Errorf("submit event: %w", err)
for _, ev := range events {
err := r.waitAndSend(ctx, ev)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet")
continue
} else if err != nil {
return fmt.Errorf("submit event: %w", err)
}
}
}
}
Expand Down Expand Up @@ -197,38 +195,99 @@ func (r *Relay) writeToParachain(ctx context.Context, proof scale.ProofPayload,
return nil
}

func (r *Relay) fetchLatestParachainNonce() (uint64, error) {
paraID := r.config.Source.ChannelID
encodedParaID, err := types.EncodeToBytes(r.config.Source.ChannelID)
func (r *Relay) fetchUnprocessedParachainNonces(latest uint64) ([]uint64, error) {
log.WithField("latest", latest).Info("latest nonce is")
unprocessedNonces := []uint64{}
startKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueue", "NonceBitmap", nil)
if err != nil {
return 0, err
return unprocessedNonces, fmt.Errorf("create storage key for EthereumInboundQueue.NonceBitmap: %w", err)
}

paraNonceKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueue", "Nonce", encodedParaID, nil)
blockHash, err := r.paraconn.API().RPC.Chain.GetBlockHashLatest()
if err != nil {
return 0, fmt.Errorf("create storage key for EthereumInboundQueue.Nonce(%v): %w",
paraID, err)
return unprocessedNonces, fmt.Errorf("fetch latest parachain block hash: %w", err)
}
var paraNonce uint64
ok, err := r.paraconn.API().RPC.State.GetStorageLatest(paraNonceKey, &paraNonce)

keys, err := r.paraconn.API().RPC.State.GetKeysPaged(startKey, 100, nil, blockHash)
if err != nil {
return 0, fmt.Errorf("fetch storage EthereumInboundQueue.Nonce(%v): %w",
paraID, err)
return unprocessedNonces, fmt.Errorf("fetch storage EthereumInboundQueue.NonceBitmap keys: %w", err)
}
for _, key := range keys {
var value types.U128
ok, err := r.paraconn.API().RPC.State.GetStorageLatest(key, &value)
if err != nil {
return unprocessedNonces, fmt.Errorf("fetch latest parachain value: %w", err)
}
if !ok || value.Int.Cmp(big.NewInt(0)) == 0 {
continue // Skip empty buckets
}

unprocessedNonces = extractUnprocessedNonces(value, latest)

if len(unprocessedNonces) > 0 && unprocessedNonces[len(unprocessedNonces)-1] >= latest {
break
}

fmt.Printf("Unprocessed nonces for bucket %s: %v\n", key.Hex(), unprocessedNonces)
}

log.WithFields(logrus.Fields{
"unprocessedNonces": unprocessedNonces,
}).Debug("unprocessed nonces")
return unprocessedNonces, nil
}

func (r *Relay) isParachainNonceSet(index uint64) (bool, error) {
// Calculate the bucket and bit position
bucket := index / 128
bitPosition := index % 128

encodedBucket, err := types.EncodeToBytes(types.NewU64(bucket))
bucketKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueue", "NonceBitmap", encodedBucket)
if err != nil {
return false, fmt.Errorf("create storage key for EthereumInboundQueue.NonceBitmap: %w", err)
}

var bucketValue types.U128
ok, err := r.paraconn.API().RPC.State.GetStorageLatest(bucketKey, &bucketValue)
if err != nil {
return false, fmt.Errorf("fetch storage EthereumInboundQueue.NonceBitmap keys: %w", err)
}
if !ok {
paraNonce = 0
return false, fmt.Errorf("bucket does not exist: %w", err)
}

return paraNonce, nil
return checkBitState(bucketValue, bitPosition), nil
}

func checkBitState(bucketValue types.U128, bitPosition uint64) bool {
mask := new(big.Int).Lsh(big.NewInt(1), uint(bitPosition)) // Create mask for the bit position
return new(big.Int).And(bucketValue.Int, mask).Cmp(big.NewInt(0)) != 0
}

func extractUnprocessedNonces(bitmap types.U128, latest uint64) []uint64 {
var unprocessed []uint64
for i := 0; i < 128; i++ {
if uint64(i) > latest {
break // Stop processing if index exceeds `latest`
}

mask := new(big.Int).Lsh(big.NewInt(1), uint(i))
bit := new(big.Int).And(bitmap.Int, mask)
if bit.Cmp(big.NewInt(0)) == 0 {
unprocessed = append(unprocessed, uint64(i))
}
}
return unprocessed
}

func (r *Relay) fetchEthereumNonce(ctx context.Context) (uint64, error) {
opts := bind.CallOpts{
Context: ctx,
}
_, ethOutboundNonce, err := r.gatewayContract.ChannelNoncesOf(&opts, r.config.Source.ChannelID)
ethOutboundNonce, err := r.gatewayContract.OutboundNonce(&opts)
if err != nil {
return 0, fmt.Errorf("fetch Gateway.ChannelNoncesOf(%v): %w", r.config.Source.ChannelID, err)
return 0, fmt.Errorf("fetch Gateway.OutboundNonce: %w", err)
}

return ethOutboundNonce, nil
Expand All @@ -241,9 +300,6 @@ func (r *Relay) findEvents(
latestFinalizedBlockNumber uint64,
start uint64,
) ([]*contracts.GatewayOutboundMessageAccepted, error) {

channelID := r.config.Source.ChannelID

var allEvents []*contracts.GatewayOutboundMessageAccepted

blockNumber := latestFinalizedBlockNumber
Expand All @@ -262,7 +318,7 @@ func (r *Relay) findEvents(
Context: ctx,
}

done, events, err := r.findEventsWithFilter(&opts, channelID, start)
done, events, err := r.findEventsWithFilter(&opts, start)
if err != nil {
return nil, fmt.Errorf("filter events: %w", err)
}
Expand All @@ -285,8 +341,8 @@ func (r *Relay) findEvents(
return allEvents, nil
}

func (r *Relay) findEventsWithFilter(opts *bind.FilterOpts, channelID [32]byte, start uint64) (bool, []*contracts.GatewayOutboundMessageAccepted, error) {
iter, err := r.gatewayContract.FilterOutboundMessageAccepted(opts, [][32]byte{channelID}, [][32]byte{})
func (r *Relay) findEventsWithFilter(opts *bind.FilterOpts, start uint64) (bool, []*contracts.GatewayOutboundMessageAccepted, error) {
iter, err := r.gatewayContract.FilterOutboundMessageAccepted(opts)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -409,7 +465,6 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
"blockNumber": ev.Raw.BlockNumber,
"txHash": ev.Raw.TxHash.Hex(),
"txIndex": ev.Raw.TxIndex,
"channelID": types.H256(ev.ChannelID).Hex(),
})

nextBlockNumber := new(big.Int).SetUint64(ev.Raw.BlockNumber + 1)
Expand Down Expand Up @@ -443,11 +498,8 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
return fmt.Errorf("write to parachain: %w", err)
}

paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
}
if paraNonce != ev.Nonce {
ok, err := r.isParachainNonceSet(ev.Nonce)
if !ok {
return fmt.Errorf("inbound message fail to execute")
}
logger.Info("inbound message executed successfully")
Expand All @@ -457,17 +509,21 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa

// isMessageProcessed checks if the provided event nonce has already been processed on-chain.
func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) {
paraNonce, err := r.fetchLatestParachainNonce()
paraNonces, err := r.fetchUnprocessedParachainNonces(eventNonce)
if err != nil {
return false, fmt.Errorf("fetch latest parachain nonce: %w", err)
}
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
if eventNonce <= paraNonce {
log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped")
return true, nil

for _, paraNonce := range paraNonces {
if eventNonce == paraNonce {
log.WithField("nonce", paraNonce).Info("unprocessed message found")
return false, nil
}
}

return false, nil
log.WithField("nonce", eventNonce).Info("processed message found")
return true, nil
}

// isInFinalizedBlock checks if the block containing the event is a finalized block.
Expand All @@ -481,3 +537,7 @@ func (r *Relay) isInFinalizedBlock(ctx context.Context, event *contracts.Gateway

return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
}

func (r *Relay) UnprocessedNonces() {

}
6 changes: 3 additions & 3 deletions relayer/relays/parachain/beefy-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er
}
for _, task := range tasks {
paraNonce := (*task.MessageProofs)[0].Message.Nonce
waitingPeriod := (paraNonce + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount
waitingPeriod := (uint64(paraNonce) + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount
err = li.waitAndSend(ctx, task, waitingPeriod)
if err != nil {
return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err)
Expand Down Expand Up @@ -326,11 +326,11 @@ func (li *BeefyListener) waitAndSend(ctx context.Context, task *Task, waitingPer
var cnt uint64
var err error
for {
ethInboundNonce, err := li.scanner.findLatestNonce(ctx)
isRelayed, err := li.scanner.isNonceRelayed(ctx, uint64(paraNonce))
if err != nil {
return err
}
if ethInboundNonce >= paraNonce {
if isRelayed {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
Expand Down
Loading
Loading