Skip to content

Commit

Permalink
off-chain changes
Browse files Browse the repository at this point in the history
  • Loading branch information
claravanstaden committed Dec 4, 2024
1 parent b23885e commit 994ad9f
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 564 deletions.
2 changes: 1 addition & 1 deletion relayer/cmd/generate_beacon_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func getEthereumEvent(ctx context.Context, gatewayContract *contracts.Gateway, c
for event == nil {
log.Info("looking for Ethereum event")

iter, err := gatewayContract.FilterOutboundMessageAccepted(&opts, [][32]byte{channelID}, [][32]byte{})
iter, err := gatewayContract.FilterOutboundMessageAccepted(&opts)
if err != nil {
return nil, err
}
Expand Down
19 changes: 14 additions & 5 deletions relayer/cmd/run/parachain/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"github.com/snowfork/snowbridge/relayer/chain/ethereum"
para "github.com/snowfork/snowbridge/relayer/chain/parachain"
"github.com/snowfork/snowbridge/relayer/relays/parachain"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

var (
configFile string
privateKey string
privateKeyFile string
privateKeyID string
configFile string
privateKey string
privateKeyFile string
privateKeyID string
parachainPrivateKey string
)

func Command() *cobra.Command {
Expand All @@ -42,6 +44,8 @@ func Command() *cobra.Command {
cmd.Flags().StringVar(&privateKeyFile, "ethereum.private-key-file", "", "The file from which to read the private key")
cmd.Flags().StringVar(&privateKeyID, "ethereum.private-key-id", "", "The secret id to lookup the private key in AWS Secrets Manager")

cmd.Flags().StringVar(&parachainPrivateKey, "substrate.private-key", "", "substrate private key")

return cmd
}

Expand Down Expand Up @@ -70,7 +74,12 @@ func run(_ *cobra.Command, _ []string) error {
return err
}

relay, err := parachain.NewRelay(&config, keypair)
keypair2, err := para.ResolvePrivateKey(parachainPrivateKey, "", "")
if err != nil {
return err
}

relay, err := parachain.NewRelay(&config, keypair, keypair2)
if err != nil {
return err
}
Expand Down
25 changes: 19 additions & 6 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
"channelId": r.config.Source.ChannelID,
}).Info("Polling Nonces")

paraNonces, err := r.fetchUnprocessedParachainNonces()
ethNonce, err := r.fetchEthereumNonce(ctx)
if err != nil {
return err
}

ethNonce, err := r.fetchEthereumNonce(ctx)
paraNonces, err := r.fetchUnprocessedParachainNonces(ethNonce)
if err != nil {
return err
}
Expand Down Expand Up @@ -195,7 +195,8 @@ func (r *Relay) writeToParachain(ctx context.Context, proof scale.ProofPayload,
return nil
}

func (r *Relay) fetchUnprocessedParachainNonces() ([]uint64, error) {
func (r *Relay) fetchUnprocessedParachainNonces(latest uint64) ([]uint64, error) {
log.WithField("latest", latest).Info("latest nonce is")
unprocessedNonces := []uint64{}
startKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueue", "NonceBitmap", nil)
if err != nil {
Expand All @@ -221,10 +222,18 @@ func (r *Relay) fetchUnprocessedParachainNonces() ([]uint64, error) {
continue // Skip empty buckets
}

unprocessedNonces = extractUnprocessedNonces(value)
unprocessedNonces = extractUnprocessedNonces(value, latest)

if len(unprocessedNonces) > 0 && unprocessedNonces[len(unprocessedNonces)-1] >= latest {
break
}

fmt.Printf("Unprocessed nonces for bucket %s: %v\n", key.Hex(), unprocessedNonces)
}

log.WithFields(logrus.Fields{
"unprocessedNonces": unprocessedNonces,
}).Debug("unprocessed nonces")
return unprocessedNonces, nil
}

Expand Down Expand Up @@ -256,9 +265,13 @@ func checkBitState(bucketValue types.U128, bitPosition uint64) bool {
return new(big.Int).And(bucketValue.Int, mask).Cmp(big.NewInt(0)) != 0
}

func extractUnprocessedNonces(bitmap types.U128) []uint64 {
func extractUnprocessedNonces(bitmap types.U128, latest uint64) []uint64 {
var unprocessed []uint64
for i := 0; i < 128; i++ {
if uint64(i) > latest {
break // Stop processing if index exceeds `latest`
}

mask := new(big.Int).Lsh(big.NewInt(1), uint(i))
bit := new(big.Int).And(bitmap.Int, mask)
if bit.Cmp(big.NewInt(0)) == 0 {
Expand Down Expand Up @@ -496,7 +509,7 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa

// isMessageProcessed checks if the provided event nonce has already been processed on-chain.
func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) {
paraNonces, err := r.fetchUnprocessedParachainNonces()
paraNonces, err := r.fetchUnprocessedParachainNonces(eventNonce)
if err != nil {
return false, fmt.Errorf("fetch latest parachain nonce: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions relayer/relays/parachain/beefy-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er
}
for _, task := range tasks {
paraNonce := (*task.MessageProofs)[0].Message.Nonce
waitingPeriod := (paraNonce + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount
waitingPeriod := (uint64(paraNonce) + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount
err = li.waitAndSend(ctx, task, waitingPeriod)
if err != nil {
return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err)
Expand Down Expand Up @@ -326,11 +326,11 @@ func (li *BeefyListener) waitAndSend(ctx context.Context, task *Task, waitingPer
var cnt uint64
var err error
for {
ethInboundNonce, err := li.scanner.findLatestNonce(ctx)
isRelayed, err := li.scanner.isNonceRelayed(ctx, uint64(paraNonce))
if err != nil {
return err
}
if ethInboundNonce >= paraNonce {
if isRelayed {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
Expand Down
25 changes: 14 additions & 11 deletions relayer/relays/parachain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@ import (
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
beaconconf "github.com/snowfork/snowbridge/relayer/relays/beacon/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Schedule ScheduleConfig `mapstructure:"schedule"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Schedule ScheduleConfig `mapstructure:"schedule"`
RewardAddress string `mapstructure:"reward-address"`
}

type SourceConfig struct {
Polkadot config.PolkadotConfig `mapstructure:"polkadot"`
Parachain config.ParachainConfig `mapstructure:"parachain"`
Ethereum config.EthereumConfig `mapstructure:"ethereum"`
Contracts SourceContractsConfig `mapstructure:"contracts"`
ChannelID ChannelID `mapstructure:"channel-id"`
Polkadot config.PolkadotConfig `mapstructure:"polkadot"`
Parachain config.ParachainConfig `mapstructure:"parachain"`
Ethereum config.EthereumConfig `mapstructure:"ethereum"`
Contracts SourceContractsConfig `mapstructure:"contracts"`
Beacon beaconconf.BeaconConfig `mapstructure:"beacon"`
}

type SourceContractsConfig struct {
Expand Down Expand Up @@ -76,9 +78,6 @@ func (c Config) Validate() error {
if c.Source.Contracts.Gateway == "" {
return fmt.Errorf("source contracts setting [Gateway] is not set")
}
if c.Source.ChannelID == [32]byte{} {
return fmt.Errorf("source setting [channel-id] is not set")
}

// Sink
err = c.Sink.Ethereum.Validate()
Expand All @@ -95,5 +94,9 @@ func (c Config) Validate() error {
return fmt.Errorf("relay config: %w", err)
}

if c.RewardAddress == "" {
return fmt.Errorf("reward address is not set")
}

return nil
}
36 changes: 22 additions & 14 deletions relayer/relays/parachain/ethereum-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,34 @@ import (
"github.com/snowfork/snowbridge/relayer/chain/ethereum"
"github.com/snowfork/snowbridge/relayer/contracts"
"github.com/snowfork/snowbridge/relayer/crypto/keccak"
"github.com/snowfork/snowbridge/relayer/relays/util"

gsrpcTypes "github.com/snowfork/go-substrate-rpc-client/v4/types"

log "github.com/sirupsen/logrus"
)

type EthereumWriter struct {
config *SinkConfig
conn *ethereum.Connection
gateway *contracts.Gateway
tasks <-chan *Task
gatewayABI abi.ABI
config *SinkConfig
conn *ethereum.Connection
gateway *contracts.Gateway
tasks <-chan *Task
gatewayABI abi.ABI
relayConfig *Config
}

func NewEthereumWriter(
config *SinkConfig,
conn *ethereum.Connection,
tasks <-chan *Task,
relayConfig *Config,
) (*EthereumWriter, error) {
return &EthereumWriter{
config: config,
conn: conn,
gateway: nil,
tasks: tasks,
config: config,
conn: conn,
gateway: nil,
tasks: tasks,
relayConfig: relayConfig,
}, nil
}

Expand Down Expand Up @@ -143,8 +147,13 @@ func (wr *EthereumWriter) WriteChannel(
LeafProofOrder: new(big.Int).SetUint64(proof.MMRProof.MerkleProofOrder),
}

tx, err := wr.gateway.SubmitV1(
options, message, commitmentProof.Proof.InnerHashes, verificationProof,
rewardAddress, err := util.HexStringTo32Bytes(wr.relayConfig.RewardAddress)
if err != nil {
return fmt.Errorf("convert to reward address: %w", err)
}

tx, err := wr.gateway.V2Submit(
options, message, commitmentProof.Proof.InnerHashes, verificationProof, rewardAddress,
)
if err != nil {
return fmt.Errorf("send transaction Gateway.submit: %w", err)
Expand Down Expand Up @@ -183,9 +192,8 @@ func (wr *EthereumWriter) WriteChannel(
return fmt.Errorf("unpack event log: %w", err)
}
log.WithFields(log.Fields{
"channelID": Hex(holder.ChannelID[:]),
"nonce": holder.Nonce,
"success": holder.Success,
"nonce": holder.Nonce,
"success": holder.Success,
}).Info("Message dispatched")
}
}
Expand Down
7 changes: 3 additions & 4 deletions relayer/relays/parachain/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ func (wr *EthereumWriter) logFieldsForSubmission(

params := log.Fields{
"message": log.Fields{
"channelID": Hex(message.ChannelID[:]),
"nonce": message.Nonce,
"command": message.Command,
"params": Hex(message.Params),
"nonce": message.Nonce,
"commands": message.Commands,
"origin": Hex(message.Origin[:]),
},
"messageProof": messageProofHexes,
"proof": log.Fields{
Expand Down
49 changes: 48 additions & 1 deletion relayer/relays/parachain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/snowfork/snowbridge/relayer/chain/parachain"
"github.com/snowfork/snowbridge/relayer/chain/relaychain"
"github.com/snowfork/snowbridge/relayer/crypto/secp256k1"
"github.com/snowfork/snowbridge/relayer/crypto/sr25519"

"github.com/snowfork/snowbridge/relayer/relays/beacon/header"
"github.com/snowfork/snowbridge/relayer/relays/beacon/header/syncer/api"
"github.com/snowfork/snowbridge/relayer/relays/beacon/protocol"
"github.com/snowfork/snowbridge/relayer/relays/beacon/store"

log "github.com/sirupsen/logrus"
)
Expand All @@ -23,9 +29,12 @@ type Relay struct {
ethereumConnBeefy *ethereum.Connection
ethereumChannelWriter *EthereumWriter
beefyListener *BeefyListener
parachainWriter *parachain.ParachainWriter
beaconHeader *header.Header
headerCache *ethereum.HeaderCache
}

func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {
func NewRelay(config *Config, keypair *secp256k1.Keypair, keypair2 *sr25519.Keypair) (*Relay, error) {
log.Info("Creating worker")

parachainConn := parachain.NewConnection(config.Source.Parachain.Endpoint, nil)
Expand All @@ -41,6 +50,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {
&config.Sink,
ethereumConnWriter,
tasks,
config,
)
if err != nil {
return nil, err
Expand All @@ -55,6 +65,30 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {
tasks,
)

parachainWriterConn := parachain.NewConnection(config.Source.Parachain.Endpoint, keypair2.AsKeyringPair())

parachainWriter := parachain.NewParachainWriter(
parachainWriterConn,
8,
)
headerCache, err := ethereum.NewHeaderBlockCache(
&ethereum.DefaultBlockLoader{Conn: ethereumConnWriter},
)
if err != nil {
return nil, err
}
p := protocol.New(config.Source.Beacon.Spec, 20)
store := store.New(config.Source.Beacon.DataStore.Location, config.Source.Beacon.DataStore.MaxEntries, *p)
store.Connect()
beaconAPI := api.NewBeaconClient(config.Source.Beacon.Endpoint, config.Source.Beacon.StateEndpoint)
beaconHeader := header.New(
parachainWriter,
beaconAPI,
config.Source.Beacon.Spec,
&store,
p,
0, // setting is not used in the execution relay
)
return &Relay{
config: config,
parachainConn: parachainConn,
Expand All @@ -63,6 +97,9 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {
ethereumConnBeefy: ethereumConnBeefy,
ethereumChannelWriter: ethereumChannelWriter,
beefyListener: beefyListener,
parachainWriter: parachainWriter,
beaconHeader: &beaconHeader,
headerCache: headerCache,
}, nil
}

Expand Down Expand Up @@ -99,6 +136,16 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
return err
}

err = relay.parachainWriter.Start(ctx, eg)
if err != nil {
return err
}

//err = relay.startDeliverProof(ctx, eg)
//if err != nil {
// return err
//}

log.Info("Current relay's ID:", relay.config.Schedule.ID)

return nil
Expand Down
Loading

0 comments on commit 994ad9f

Please sign in to comment.