diff --git a/.github/workflows/release-relayer.yml b/.github/workflows/release-relayer.yml index 114c99ed10..d69249d660 100644 --- a/.github/workflows/release-relayer.yml +++ b/.github/workflows/release-relayer.yml @@ -2,6 +2,8 @@ name: release-relayer on: push: + paths: + - "relayer/**" branches: - main workflow_dispatch: @@ -69,6 +71,7 @@ jobs: id: new_version run: | # Get the most recent tag in the format relayer-- + current_tag=$(git tag --list "relayer-${{ steps.branch_name.outputs.branch }}-*" --sort=-v:refname | head -n 1 current_version=$(git tag --list "relayer-${{ steps.branch_name.outputs.branch }}-*" --sort=-v:refname | head -n 1 | sed -E 's/relayer-${{ steps.branch_name.outputs.branch }}-//') echo "Current version: $current_version" @@ -81,6 +84,7 @@ jobs: echo "New version: $new_version" echo "version=$new_version" >> $GITHUB_OUTPUT + echo "from_tag=$current_tag" >> $GITHUB_OUTPUT - name: Create new tag id: create_tag @@ -97,6 +101,9 @@ jobs: - name: "Build Changelog" id: build_changelog uses: mikepenz/release-changelog-builder-action@v4 + with: + fromTag: ${{ steps.new_version.outputs.from_tag }} + toTag: ${{ steps.create_tag.outputs.tag }} - name: Create a GitHub Release id: create_release diff --git a/relayer/relays/beacon/config/config.go b/relayer/relays/beacon/config/config.go index f0c9c86374..cd56105196 100644 --- a/relayer/relays/beacon/config/config.go +++ b/relayer/relays/beacon/config/config.go @@ -97,7 +97,7 @@ func (p ParachainConfig) Validate() error { return errors.New("[maxWatchedExtrinsics] is not set") } if p.HeaderRedundancy == 0 { - return errors.New("[HeaderRedundancy] is not set") + return errors.New("[headerRedundancy] is not set") } return nil } diff --git a/relayer/relays/beacon/header/header.go b/relayer/relays/beacon/header/header.go index e7dca2c828..81735284e2 100644 --- a/relayer/relays/beacon/header/header.go +++ b/relayer/relays/beacon/header/header.go @@ -510,6 +510,35 @@ func (h *Header) FetchExecutionProof(blockRoot common.Hash, instantVerification } +func (h *Header) CheckHeaderFinalized(blockRoot common.Hash, instantVerification bool) error { + header, err := h.syncer.Client.GetHeaderByBlockRoot(blockRoot) + if err != nil { + return fmt.Errorf("get beacon header by blockRoot: %w", err) + } + lastFinalizedHeaderState, err := h.writer.GetLastFinalizedHeaderState() + if err != nil { + return fmt.Errorf("fetch last finalized header state: %w", err) + } + + // The latest finalized header on-chain is older than the header containing the message, so we need to sync the + // finalized header with the message. + finalizedHeader, err := h.syncer.GetFinalizedHeader() + if err != nil { + return err + } + + // If the header is not finalized yet, we can't do anything further. + if header.Slot > uint64(finalizedHeader.Slot) { + return fmt.Errorf("chain not finalized yet: %w", ErrBeaconHeaderNotFinalized) + } + + if header.Slot > lastFinalizedHeaderState.BeaconSlot && !instantVerification { + return fmt.Errorf("on-chain header not recent enough and instantVerification is off: %w", ErrBeaconHeaderNotFinalized) + } + + return nil +} + func (h *Header) isInitialSyncPeriod() bool { initialPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.InitialCheckpointSlot) lastFinalizedPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.Finalized.LastSyncedSlot) diff --git a/relayer/relays/execution/main.go b/relayer/relays/execution/main.go index 39618e6410..df152cf335 100644 --- a/relayer/relays/execution/main.go +++ b/relayer/relays/execution/main.go @@ -103,7 +103,11 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error { ) r.beaconHeader = &beaconHeader - log.Info("Current relay's ID:", r.config.Schedule.ID) + log.WithFields(log.Fields{ + "relayerId": r.config.Schedule.ID, + "relayerCount": r.config.Schedule.TotalRelayerCount, + "sleepInterval": r.config.Schedule.SleepInterval, + }).Info("decentralization config") for { select { @@ -146,9 +150,12 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error { } for _, ev := range events { - err = r.waitAndSend(ctx, ev) - if err != nil { - return fmt.Errorf("submit message: %w", err) + err := r.waitAndSend(ctx, ev) + if errors.Is(err, header.ErrBeaconHeaderNotFinalized) { + log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet") + continue + } else if err != nil { + return fmt.Errorf("submit event: %w", err) } } } @@ -351,23 +358,32 @@ func (r *Relay) makeInboundMessage( } func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) { - var paraNonce uint64 ethNonce := ev.Nonce waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount + log.WithFields(logrus.Fields{ + "waitingPeriod": waitingPeriod, + }).Info("relayer waiting period") var cnt uint64 for { - paraNonce, err = r.fetchLatestParachainNonce() + // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state + isProcessed, err := r.isMessageProcessed(ev.Nonce) if err != nil { - return fmt.Errorf("fetch latest parachain nonce: %w", err) + return fmt.Errorf("is message procssed: %w", err) } - if ethNonce <= paraNonce { - log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce)) + // If the message is already processed we shouldn't submit it again + if isProcessed { return nil } + // Check if the beacon header is finalized + if r.isInFinalizedBlock(ctx, ev) != nil { + return err + } if cnt == waitingPeriod { break } + log.Info(fmt.Sprintf("sleeping for %d seconds.", time.Duration(r.config.Schedule.SleepInterval))) + time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second) cnt++ } @@ -406,13 +422,22 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa // ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification) if errors.Is(err, header.ErrBeaconHeaderNotFinalized) { - logger.Warn("beacon header not finalized, just skipped") - return nil + return err } if err != nil { return fmt.Errorf("fetch execution header proof: %w", err) } + // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state + isProcessed, err := r.isMessageProcessed(ev.Nonce) + if err != nil { + return fmt.Errorf("is message processed: %w", err) + } + // If the message is already processed we shouldn't submit it again + if isProcessed { + return nil + } + err = r.writeToParachain(ctx, proof, inboundMsg) if err != nil { return fmt.Errorf("write to parachain: %w", err) @@ -429,3 +454,30 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa return nil } + +// isMessageProcessed checks if the provided event nonce has already been processed on-chain. +func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) { + paraNonce, err := r.fetchLatestParachainNonce() + if err != nil { + return false, fmt.Errorf("fetch latest parachain nonce: %w", err) + } + // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state + if eventNonce <= paraNonce { + log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped") + return true, nil + } + + return false, nil +} + +// isInFinalizedBlock checks if the block containing the event is a finalized block. +func (r *Relay) isInFinalizedBlock(ctx context.Context, event *contracts.GatewayOutboundMessageAccepted) error { + nextBlockNumber := new(big.Int).SetUint64(event.Raw.BlockNumber + 1) + + blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber) + if err != nil { + return fmt.Errorf("get block header: %w", err) + } + + return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification) +}