Skip to content

Commit

Permalink
fix(octane/evmengine): fix next proposer algo (#2526)
Browse files Browse the repository at this point in the history
Fixes next proposer selection algo:
- Leverage comet light client that does heavy lifting of fetching the
current validator set.
- Use `CopyIncrementProposerPriority` which is built-in function to
automatically select next proposer.
- Add debug logs
- Fix missing local address in lazy voter

issue: #2463
  • Loading branch information
corverroos authored Nov 21, 2024
1 parent a81f18c commit 43969a2
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 186 deletions.
2 changes: 1 addition & 1 deletion halo/app/lazyvoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (l *voterLoader) LocalAddress() common.Address {
return v.LocalAddress()
}

return common.Address{}
return l.localAddr
}

func (l *voterLoader) TrimBehind(minsByChain map[xchain.ChainVersion]uint64) int {
Expand Down
2 changes: 1 addition & 1 deletion halo/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context)
}

rpcClient := rpclocal.New(cmtNode)
cmtAPI := comet.NewAPI(rpcClient)
cmtAPI := comet.NewAPI(rpcClient, cfg.Network.Static().OmniConsensusChainIDStr())
app.SetCometAPI(cmtAPI)

clientCtx := app.ClientContext(ctx).WithClient(rpcClient).WithHomeDir(cfg.HomeDir)
Expand Down
110 changes: 22 additions & 88 deletions halo/comet/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,118 +4,52 @@ import (
"context"

"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/k1util"
"github.com/omni-network/omni/lib/tracer"

lightprovider "github.com/cometbft/cometbft/light/provider"
lighthttp "github.com/cometbft/cometbft/light/provider/http"
rpcclient "github.com/cometbft/cometbft/rpc/client"
cmttypes "github.com/cometbft/cometbft/types"

"github.com/ethereum/go-ethereum/common"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const perPageConst = 100

var _ API = adapter{}

type API interface {
// Validators returns the cometBFT validators at the given height or false if not
// available (probably due to snapshot sync after height).
Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error)

// IsValidator returns true if the given address is a validator at the latest height.
// It is best-effort, so returns false on any error.
IsValidator(ctx context.Context, valAddress common.Address) bool
// Validators returns the cometBFT validators at the given height.
Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error)
}

func NewAPI(cl rpcclient.Client) API {
return adapter{cl: cl}
func NewAPI(cl rpcclient.Client, chainID string) API {
return adapter{
cl: lighthttp.NewWithClient(chainID, remoteCl{cl}),
}
}

type adapter struct {
cl rpcclient.Client
cl lightprovider.Provider
}

// IsValidator returns true if the given address is a validator at the latest height.
// It is best-effort, so returns false on any error.
func (a adapter) IsValidator(ctx context.Context, valAddress common.Address) bool {
ctx, span := tracer.Start(ctx, "comet/is_validator")
defer span.End()

status, err := a.cl.Status(ctx)
if err != nil || status.SyncInfo.CatchingUp {
return false // Best effort
}

valset, ok, err := a.Validators(ctx, status.SyncInfo.LatestBlockHeight)
if !ok || err != nil {
return false // Best effort
}

for _, val := range valset.Validators {
addr, err := k1util.PubKeyToAddress(val.PubKey)
if err != nil {
continue // Best effort
}

if addr == valAddress {
return true
}
}

return false
}

// Validators returns the cometBFT validators at the given height or false if not
// available (probably due to snapshot sync after height).
func (a adapter) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error) {
// Validators returns the cometBFT validators at the given height.
func (a adapter) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error) {
ctx, span := tracer.Start(ctx, "comet/validators", trace.WithAttributes(attribute.Int64("height", height)))
defer span.End()

perPage := perPageConst // Can't take a pointer to a const directly.

var vals []*cmttypes.Validator
for page := 1; ; page++ { // Pages are 1-indexed.
if page > 10 { // Sanity check.
return nil, false, errors.New("too many validators [BUG]")
}

status, err := a.cl.Status(ctx)
if err != nil {
return nil, false, errors.Wrap(err, "fetch status")
} else if height < status.SyncInfo.EarliestBlockHeight {
// This can happen if height is before snapshot restore.
return nil, false, nil
}

valResp, err := a.cl.Validators(ctx, &height, &page, &perPage)
if err != nil {
return nil, false, errors.Wrap(err, "fetch validators")
}

for _, v := range valResp.Validators {
vals = append(vals, cmttypes.NewValidator(v.PubKey, v.VotingPower))
}

if len(vals) == valResp.Total {
break
}
block, err := a.cl.LightBlock(ctx, height) // LightBlock does all the heavy lifting to query the validator set.
if err != nil {
return nil, errors.Wrap(err, "fetch light block")
}

// cmttypes.NewValidatorSet() panics on error, so manually construct it for proper error handling.
valset := new(cmttypes.ValidatorSet)
if err := valset.UpdateWithChangeSet(vals); err != nil {
return nil, false, errors.Wrap(err, "update with change set")
}
if len(vals) > 0 {
valset.IncrementProposerPriority(1) // See cmttypes.NewValidatorSet
}
return block.ValidatorSet, nil
}

if err := valset.ValidateBasic(); err != nil {
return nil, false, errors.Wrap(err, "validate basic")
}
// remoteCl is a wrapper around rpcclient.Client to implement rpcclient.RemoteClient.
type remoteCl struct {
rpcclient.Client
}

return valset, true, nil
func (remoteCl) Remote() string {
return ""
}
4 changes: 2 additions & 2 deletions octane/evmengine/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos
return nil, err
}
triggeredAt = time.Now()
log.Debug(ctx, "Started non-optimistic payload", "height", req.Height, "payload", payloadID.String())
} else {
log.Debug(ctx, "Using optimistic payload", "height", height, "payload", payloadID.String())
}
Expand Down Expand Up @@ -181,15 +182,14 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {

// Extract context values
height := ctx.BlockHeight()
proposer := ctx.BlockHeader().ProposerAddress
timestamp := ctx.BlockTime()
appHash, err := cast.EthHash(ctx.BlockHeader().AppHash) // This is the app hash after the block is finalized.
if err != nil {
return err
}

// Maybe start building the next block if we are the next proposer.
isNext, err := k.isNextProposer(ctx, proposer, height)
isNext, err := k.isNextProposer(ctx, height)
if err != nil {
// IsNextProposer does non-deterministic cometBFT queries, don't stall node due to errors.
log.Warn(ctx, "Next proposer failed, skipping optimistic EVM payload build", err)
Expand Down
15 changes: 6 additions & 9 deletions octane/evmengine/keeper/abci_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,19 +347,17 @@ func TestOptimistic(t *testing.T) {
mockEngine, err := newMockEngineAPI(0)
require.NoError(t, err)

vals, ok, err := cmtAPI.Validators(ctx, height)
vals, err := cmtAPI.Validators(ctx, height)
require.NoError(t, err)
require.True(t, ok)

// Proposer is val0
val0 := vals.Validators[0].Address
// Optimistic build will trigger if we are next proposer; ie. val1
val1, err := k1util.PubKeyToAddress(vals.Validators[1].PubKey)
// Optimistic build will trigger if we are next proposer
nextProposer := vals.CopyIncrementProposerPriority(1).Proposer
localAddr, err := k1util.PubKeyToAddress(nextProposer.PubKey)
require.NoError(t, err)

ap := mockAddressProvider{
address: val1,
address: localAddr,
}

frp := newRandomFeeRecipientProvider()
keeper, err := NewKeeper(cdc, storeService, &mockEngine, txConfig, ap, frp, mockLogProvider{})
require.NoError(t, err)
Expand All @@ -370,7 +368,6 @@ func TestOptimistic(t *testing.T) {

timestamp := time.Now()
ctx = ctx.
WithProposer(val0.Bytes()).
WithBlockHeight(height).
WithBlockTime(timestamp)

Expand Down
16 changes: 4 additions & 12 deletions octane/evmengine/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,29 +165,21 @@ func (k *Keeper) parseAndVerifyProposedPayload(ctx context.Context, msg *types.M
}

// isNextProposer returns true if the local node is the proposer
// for the next block. It also returns the next block height.
// for the next block.
//
// Note that the validator set can change, so this is an optimistic check.
func (k *Keeper) isNextProposer(ctx context.Context, currentProposer []byte, currentHeight int64) (bool, error) {
func (k *Keeper) isNextProposer(ctx context.Context, currentHeight int64) (bool, error) {
// cometAPI is lazily set and may be nil on startup (e.g. rollbacks).
if k.cmtAPI == nil {
return false, nil
}

valset, ok, err := k.cmtAPI.Validators(ctx, currentHeight)
valset, err := k.cmtAPI.Validators(ctx, currentHeight)
if err != nil {
return false, err
} else if !ok || len(valset.Validators) == 0 {
return false, errors.New("validators not available")
}

idx, _ := valset.GetByAddress(currentProposer)
if idx < 0 {
return false, errors.New("proposer not in validator set")
}

nextIdx := int(idx+1) % len(valset.Validators)
nextProposer := valset.Validators[nextIdx]
nextProposer := valset.CopyIncrementProposerPriority(1).Proposer
nextAddr, err := k1util.PubKeyToAddress(nextProposer.PubKey)
if err != nil {
return false, err
Expand Down
Loading

0 comments on commit 43969a2

Please sign in to comment.