From 43969a2ad37e0554172ce789aa5f7ec5e10dac59 Mon Sep 17 00:00:00 2001 From: corver <29249923+corverroos@users.noreply.github.com> Date: Thu, 21 Nov 2024 18:01:44 +0200 Subject: [PATCH] fix(octane/evmengine): fix next proposer algo (#2526) Fixes next proposer selection algo: - Leverage comet light client that does heavy lifting of fetching the current validator set. - Use `CopyIncrementProposerPriority` which is built-in function to automatically select next proposer. - Add debug logs - Fix missing local address in lazy voter issue: #2463 --- halo/app/lazyvoter.go | 2 +- halo/app/start.go | 2 +- halo/comet/comet.go | 110 ++++-------------- octane/evmengine/keeper/abci.go | 4 +- octane/evmengine/keeper/abci_internal_test.go | 15 +-- octane/evmengine/keeper/keeper.go | 16 +-- .../evmengine/keeper/keeper_internal_test.go | 106 ++++++----------- 7 files changed, 69 insertions(+), 186 deletions(-) diff --git a/halo/app/lazyvoter.go b/halo/app/lazyvoter.go index f2f30e907..0a6a7145e 100644 --- a/halo/app/lazyvoter.go +++ b/halo/app/lazyvoter.go @@ -245,7 +245,7 @@ func (l *voterLoader) LocalAddress() common.Address { return v.LocalAddress() } - return common.Address{} + return l.localAddr } func (l *voterLoader) TrimBehind(minsByChain map[xchain.ChainVersion]uint64) int { diff --git a/halo/app/start.go b/halo/app/start.go index 3badddcf3..d9ee3db44 100644 --- a/halo/app/start.go +++ b/halo/app/start.go @@ -175,7 +175,7 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context) } rpcClient := rpclocal.New(cmtNode) - cmtAPI := comet.NewAPI(rpcClient) + cmtAPI := comet.NewAPI(rpcClient, cfg.Network.Static().OmniConsensusChainIDStr()) app.SetCometAPI(cmtAPI) clientCtx := app.ClientContext(ctx).WithClient(rpcClient).WithHomeDir(cfg.HomeDir) diff --git a/halo/comet/comet.go b/halo/comet/comet.go index 30f040198..bc9c4d928 100644 --- a/halo/comet/comet.go +++ b/halo/comet/comet.go @@ -4,118 +4,52 @@ import ( "context" "github.com/omni-network/omni/lib/errors" - "github.com/omni-network/omni/lib/k1util" "github.com/omni-network/omni/lib/tracer" + lightprovider "github.com/cometbft/cometbft/light/provider" + lighthttp "github.com/cometbft/cometbft/light/provider/http" rpcclient "github.com/cometbft/cometbft/rpc/client" cmttypes "github.com/cometbft/cometbft/types" - "github.com/ethereum/go-ethereum/common" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -const perPageConst = 100 - var _ API = adapter{} type API interface { - // Validators returns the cometBFT validators at the given height or false if not - // available (probably due to snapshot sync after height). - Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error) - - // IsValidator returns true if the given address is a validator at the latest height. - // It is best-effort, so returns false on any error. - IsValidator(ctx context.Context, valAddress common.Address) bool + // Validators returns the cometBFT validators at the given height. + Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error) } -func NewAPI(cl rpcclient.Client) API { - return adapter{cl: cl} +func NewAPI(cl rpcclient.Client, chainID string) API { + return adapter{ + cl: lighthttp.NewWithClient(chainID, remoteCl{cl}), + } } type adapter struct { - cl rpcclient.Client + cl lightprovider.Provider } -// IsValidator returns true if the given address is a validator at the latest height. -// It is best-effort, so returns false on any error. -func (a adapter) IsValidator(ctx context.Context, valAddress common.Address) bool { - ctx, span := tracer.Start(ctx, "comet/is_validator") - defer span.End() - - status, err := a.cl.Status(ctx) - if err != nil || status.SyncInfo.CatchingUp { - return false // Best effort - } - - valset, ok, err := a.Validators(ctx, status.SyncInfo.LatestBlockHeight) - if !ok || err != nil { - return false // Best effort - } - - for _, val := range valset.Validators { - addr, err := k1util.PubKeyToAddress(val.PubKey) - if err != nil { - continue // Best effort - } - - if addr == valAddress { - return true - } - } - - return false -} - -// Validators returns the cometBFT validators at the given height or false if not -// available (probably due to snapshot sync after height). -func (a adapter) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error) { +// Validators returns the cometBFT validators at the given height. +func (a adapter) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error) { ctx, span := tracer.Start(ctx, "comet/validators", trace.WithAttributes(attribute.Int64("height", height))) defer span.End() - perPage := perPageConst // Can't take a pointer to a const directly. - - var vals []*cmttypes.Validator - for page := 1; ; page++ { // Pages are 1-indexed. - if page > 10 { // Sanity check. - return nil, false, errors.New("too many validators [BUG]") - } - - status, err := a.cl.Status(ctx) - if err != nil { - return nil, false, errors.Wrap(err, "fetch status") - } else if height < status.SyncInfo.EarliestBlockHeight { - // This can happen if height is before snapshot restore. - return nil, false, nil - } - - valResp, err := a.cl.Validators(ctx, &height, &page, &perPage) - if err != nil { - return nil, false, errors.Wrap(err, "fetch validators") - } - - for _, v := range valResp.Validators { - vals = append(vals, cmttypes.NewValidator(v.PubKey, v.VotingPower)) - } - - if len(vals) == valResp.Total { - break - } + block, err := a.cl.LightBlock(ctx, height) // LightBlock does all the heavy lifting to query the validator set. + if err != nil { + return nil, errors.Wrap(err, "fetch light block") } - // cmttypes.NewValidatorSet() panics on error, so manually construct it for proper error handling. - valset := new(cmttypes.ValidatorSet) - if err := valset.UpdateWithChangeSet(vals); err != nil { - return nil, false, errors.Wrap(err, "update with change set") - } - if len(vals) > 0 { - valset.IncrementProposerPriority(1) // See cmttypes.NewValidatorSet - } + return block.ValidatorSet, nil +} - if err := valset.ValidateBasic(); err != nil { - return nil, false, errors.Wrap(err, "validate basic") - } +// remoteCl is a wrapper around rpcclient.Client to implement rpcclient.RemoteClient. +type remoteCl struct { + rpcclient.Client +} - return valset, true, nil +func (remoteCl) Remote() string { + return "" } diff --git a/octane/evmengine/keeper/abci.go b/octane/evmengine/keeper/abci.go index 85c92cb20..d874c9e86 100644 --- a/octane/evmengine/keeper/abci.go +++ b/octane/evmengine/keeper/abci.go @@ -84,6 +84,7 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos return nil, err } triggeredAt = time.Now() + log.Debug(ctx, "Started non-optimistic payload", "height", req.Height, "payload", payloadID.String()) } else { log.Debug(ctx, "Using optimistic payload", "height", height, "payload", payloadID.String()) } @@ -181,7 +182,6 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error { // Extract context values height := ctx.BlockHeight() - proposer := ctx.BlockHeader().ProposerAddress timestamp := ctx.BlockTime() appHash, err := cast.EthHash(ctx.BlockHeader().AppHash) // This is the app hash after the block is finalized. if err != nil { @@ -189,7 +189,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error { } // Maybe start building the next block if we are the next proposer. - isNext, err := k.isNextProposer(ctx, proposer, height) + isNext, err := k.isNextProposer(ctx, height) if err != nil { // IsNextProposer does non-deterministic cometBFT queries, don't stall node due to errors. log.Warn(ctx, "Next proposer failed, skipping optimistic EVM payload build", err) diff --git a/octane/evmengine/keeper/abci_internal_test.go b/octane/evmengine/keeper/abci_internal_test.go index 9187d12df..45735459b 100644 --- a/octane/evmengine/keeper/abci_internal_test.go +++ b/octane/evmengine/keeper/abci_internal_test.go @@ -347,19 +347,17 @@ func TestOptimistic(t *testing.T) { mockEngine, err := newMockEngineAPI(0) require.NoError(t, err) - vals, ok, err := cmtAPI.Validators(ctx, height) + vals, err := cmtAPI.Validators(ctx, height) require.NoError(t, err) - require.True(t, ok) - // Proposer is val0 - val0 := vals.Validators[0].Address - // Optimistic build will trigger if we are next proposer; ie. val1 - val1, err := k1util.PubKeyToAddress(vals.Validators[1].PubKey) + // Optimistic build will trigger if we are next proposer + nextProposer := vals.CopyIncrementProposerPriority(1).Proposer + localAddr, err := k1util.PubKeyToAddress(nextProposer.PubKey) require.NoError(t, err) - ap := mockAddressProvider{ - address: val1, + address: localAddr, } + frp := newRandomFeeRecipientProvider() keeper, err := NewKeeper(cdc, storeService, &mockEngine, txConfig, ap, frp, mockLogProvider{}) require.NoError(t, err) @@ -370,7 +368,6 @@ func TestOptimistic(t *testing.T) { timestamp := time.Now() ctx = ctx. - WithProposer(val0.Bytes()). WithBlockHeight(height). WithBlockTime(timestamp) diff --git a/octane/evmengine/keeper/keeper.go b/octane/evmengine/keeper/keeper.go index 577a60e36..7d8750883 100644 --- a/octane/evmengine/keeper/keeper.go +++ b/octane/evmengine/keeper/keeper.go @@ -165,29 +165,21 @@ func (k *Keeper) parseAndVerifyProposedPayload(ctx context.Context, msg *types.M } // isNextProposer returns true if the local node is the proposer -// for the next block. It also returns the next block height. +// for the next block. // // Note that the validator set can change, so this is an optimistic check. -func (k *Keeper) isNextProposer(ctx context.Context, currentProposer []byte, currentHeight int64) (bool, error) { +func (k *Keeper) isNextProposer(ctx context.Context, currentHeight int64) (bool, error) { // cometAPI is lazily set and may be nil on startup (e.g. rollbacks). if k.cmtAPI == nil { return false, nil } - valset, ok, err := k.cmtAPI.Validators(ctx, currentHeight) + valset, err := k.cmtAPI.Validators(ctx, currentHeight) if err != nil { return false, err - } else if !ok || len(valset.Validators) == 0 { - return false, errors.New("validators not available") } - idx, _ := valset.GetByAddress(currentProposer) - if idx < 0 { - return false, errors.New("proposer not in validator set") - } - - nextIdx := int(idx+1) % len(valset.Validators) - nextProposer := valset.Validators[nextIdx] + nextProposer := valset.CopyIncrementProposerPriority(1).Proposer nextAddr, err := k1util.PubKeyToAddress(nextProposer.PubKey) if err != nil { return false, err diff --git a/octane/evmengine/keeper/keeper_internal_test.go b/octane/evmengine/keeper/keeper_internal_test.go index 0c4c00edd..6e703220f 100644 --- a/octane/evmengine/keeper/keeper_internal_test.go +++ b/octane/evmengine/keeper/keeper_internal_test.go @@ -22,88 +22,49 @@ func TestKeeper_isNextProposer(t *testing.T) { t.Parallel() type args struct { height int64 - validatorsFunc func(context.Context, int64) (*cmttypes.ValidatorSet, bool, error) - current int - next int - header func(height int64, address []byte) cmtproto.Header + validatorsFunc func(context.Context, int64) (*cmttypes.ValidatorSet, error) + incMoreTimes int32 + header func(height int64) cmtproto.Header } height := int64(1) tests := []struct { - name string - args args - want bool - wantHeight uint64 - wantErr bool + name string + args args + want bool + wantErr bool }{ { - name: "is next proposer", + name: "not proposer", args: args{ - height: height, - current: 0, - next: 1, - header: func(height int64, address []byte) cmtproto.Header { - return cmtproto.Header{Height: height, ProposerAddress: address} + height: height, + incMoreTimes: 9, + header: func(height int64) cmtproto.Header { + return cmtproto.Header{Height: height} }, }, - want: true, - wantHeight: 2, - wantErr: false, + want: false, + wantErr: false, }, { - name: "proposer false", + name: "next proposer", args: args{ - height: height, - current: 0, - next: 2, - header: func(height int64, address []byte) cmtproto.Header { - return cmtproto.Header{Height: height, ProposerAddress: address} + height: height, + header: func(height int64) cmtproto.Header { + return cmtproto.Header{Height: height} }, }, - want: false, - wantHeight: 2, - wantErr: false, + want: true, + wantErr: false, }, { name: "validatorsFunc error", args: args{ - height: height, - current: 0, - next: 1, - validatorsFunc: func(ctx context.Context, i int64) (*cmttypes.ValidatorSet, bool, error) { - return nil, false, errors.New("error") - }, - header: func(height int64, address []byte) cmtproto.Header { - return cmtproto.Header{Height: height, ProposerAddress: address} - }, - }, - want: false, - wantErr: true, - }, - { - name: "validatorsFunc not ok", - args: args{ - height: height, - current: 0, - next: 1, - validatorsFunc: func(ctx context.Context, i int64) (*cmttypes.ValidatorSet, bool, error) { - return nil, false, nil + height: height, + validatorsFunc: func(ctx context.Context, i int64) (*cmttypes.ValidatorSet, error) { + return nil, errors.New("error") }, - header: func(height int64, address []byte) cmtproto.Header { - return cmtproto.Header{Height: height, ProposerAddress: address} - }, - }, - want: false, - wantErr: true, - }, - { - name: "invalid val index", - args: args{ - height: height, - current: 0, - next: 1, - - header: func(height int64, address []byte) cmtproto.Header { - return cmtproto.Header{Height: height, ProposerAddress: []byte("invalid")} + header: func(height int64) cmtproto.Header { + return cmtproto.Header{Height: height} }, }, want: false, @@ -120,9 +81,9 @@ func TestKeeper_isNextProposer(t *testing.T) { require.NoError(t, err) cmtAPI := newMockCometAPI(t, tt.args.validatorsFunc) - header := tt.args.header(height, cmtAPI.validatorSet.Validators[tt.args.current].Address) + header := tt.args.header(height) - nxtAddr, err := k1util.PubKeyToAddress(cmtAPI.validatorSet.Validators[tt.args.next].PubKey) + nxtAddr, err := k1util.PubKeyToAddress(cmtAPI.validatorSet.CopyIncrementProposerPriority(1 + tt.args.incMoreTimes).Proposer.PubKey) require.NoError(t, err) ctx, storeService := setupCtxStore(t, &header) @@ -136,7 +97,7 @@ func TestKeeper_isNextProposer(t *testing.T) { keeper.SetCometAPI(cmtAPI) populateGenesisHead(ctx, t, keeper) - got, err := keeper.isNextProposer(ctx, ctx.BlockHeader().ProposerAddress, ctx.BlockHeader().Height) + got, err := keeper.isNextProposer(ctx, ctx.BlockHeader().Height) if (err != nil) != tt.wantErr { t.Errorf("isNextProposer() error = %v, wantErr %v", err, tt.wantErr) return @@ -156,11 +117,11 @@ type mockCometAPI struct { comet.API fuzzer *fuzz.Fuzzer validatorSet *cmttypes.ValidatorSet - validatorsFunc func(context.Context, int64) (*cmttypes.ValidatorSet, bool, error) + validatorsFunc func(context.Context, int64) (*cmttypes.ValidatorSet, error) height int64 } -func newMockCometAPI(t *testing.T, valFun func(context.Context, int64) (*cmttypes.ValidatorSet, bool, error)) *mockCometAPI { +func newMockCometAPI(t *testing.T, valFun func(context.Context, int64) (*cmttypes.ValidatorSet, error)) *mockCometAPI { t.Helper() fuzzer := newFuzzer(0) valSet := fuzzValidators(t, fuzzer) @@ -178,20 +139,19 @@ func fuzzValidators(t *testing.T, fuzzer *fuzz.Fuzzer) *cmttypes.ValidatorSet { fuzzer.NilChance(0).NumElements(3, 7).Fuzz(&validators) - valSet := new(cmttypes.ValidatorSet) - err := valSet.UpdateWithChangeSet(validators) + valSet, err := cmttypes.ValidatorSetFromExistingValidators(validators) require.NoError(t, err) return valSet } -func (m *mockCometAPI) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error) { +func (m *mockCometAPI) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error) { m.height = height if m.validatorsFunc != nil { return m.validatorsFunc(ctx, height) } - return m.validatorSet, true, nil + return m.validatorSet, nil } // newFuzzer - create a new custom cmttypes.Validator fuzzer.