Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cometbft-v1 engine #74

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
798 changes: 798 additions & 0 deletions engines/cometbft-v1/cometbft.go

Large diffs are not rendered by default.

142 changes: 142 additions & 0 deletions engines/cometbft-v1/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package cometbft_v1

import (
"context"
"fmt"
cfg "github.com/KYVENetwork/cometbft/v100/config"
cs "github.com/KYVENetwork/cometbft/v100/consensus"
"github.com/KYVENetwork/cometbft/v100/evidence"
mempl "github.com/KYVENetwork/cometbft/v100/mempool"
"github.com/KYVENetwork/cometbft/v100/proxy"
"github.com/KYVENetwork/cometbft/v100/state"
sm "github.com/KYVENetwork/cometbft/v100/state"
"github.com/KYVENetwork/cometbft/v100/store"
cometTypes "github.com/KYVENetwork/cometbft/v100/types"
dbm "github.com/cometbft/cometbft-db"
"github.com/spf13/viper"
"path/filepath"
)

type DBContext struct {
ID string
Config *Config
}

func LoadConfig(homePath string) (*cfg.Config, error) {
config := cfg.DefaultConfig()

viper.SetConfigName("config")
viper.SetConfigType("toml")
viper.AddConfigPath(homePath)
viper.AddConfigPath(filepath.Join(homePath, "config"))

if err := viper.ReadInConfig(); err != nil {
return nil, err
}

if err := viper.Unmarshal(config); err != nil {
return nil, err
}

config.SetRoot(homePath)

return config, nil
}

func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
dbType := dbm.BackendType(ctx.Config.DBBackend)
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
}

func GetStateDBs(config *Config) (dbm.DB, state.Store, error) {
stateDB, err := DefaultDBProvider(&DBContext{"state", config})
if err != nil {
return nil, nil, err
}

stateStore := state.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
})

return stateDB, stateStore, nil
}

func GetBlockstoreDBs(config *Config) (dbm.DB, *store.BlockStore, error) {
blockStoreDB, err := DefaultDBProvider(&DBContext{"blockstore", config})
if err != nil {
return nil, nil, err
}

blockStore := store.NewBlockStore(blockStoreDB)

return blockStoreDB, blockStore, nil
}

func CreateAndStartProxyAppConns(config *Config) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), proxy.NopMetrics())
proxyApp.SetLogger(cometLogger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}

func CreateAndStartEventBus() (*cometTypes.EventBus, error) {
eventBus := cometTypes.NewEventBus()
eventBus.SetLogger(cometLogger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
}

func DoHandshake(
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
genDoc *GenesisDoc,
eventBus cometTypes.BlockEventPublisher,
proxyApp proxy.AppConns,
) error {
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(cometLogger.With("module", "consensus"))
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(context.Background(), proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
}

func CreateMempool(config *Config, proxyApp proxy.AppConns, state sm.State) mempl.Mempool {
logger := cometLogger.With("module", "mempool")
mp := mempl.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempl.WithMetrics(mempl.NopMetrics()),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)

mp.SetLogger(logger)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}

return mp
}

func CreateEvidenceReactor(config *Config, stateStore sm.Store, blockStore *store.BlockStore) (*evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := DefaultDBProvider(&DBContext{ID: "evidence", Config: config})
if err != nil {
return nil, nil, err
}
evidenceLogger := cometLogger.With("module", "evidence")
evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
if err != nil {
return nil, nil, err
}
evidenceReactor := evidence.NewReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger)
return evidenceReactor, evidencePool, nil
}
48 changes: 48 additions & 0 deletions engines/cometbft-v1/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cometbft_v1

import (
"fmt"
"github.com/KYVENetwork/cometbft/v100/libs/log"
klogger "github.com/KYVENetwork/ksync/utils"
"github.com/rs/zerolog"
)

func CometLogger() (logger log.Logger) {
logger = KsyncCometLogger{logger: klogger.LogFormatter("")}
return
}

type KsyncCometLogger struct {
logger zerolog.Logger
}

func (l KsyncCometLogger) Debug(msg string, keyvals ...interface{}) {}

func (l KsyncCometLogger) Info(msg string, keyvals ...interface{}) {
logger := l.logger.Info()

for i := 0; i < len(keyvals); i = i + 2 {
if keyvals[i] == "hash" || keyvals[i] == "appHash" {
logger = logger.Str(fmt.Sprintf("%v", keyvals[i]), fmt.Sprintf("%x", keyvals[i+1]))
} else {
logger = logger.Str(fmt.Sprintf("%v", keyvals[i]), fmt.Sprintf("%v", keyvals[i+1]))
}
}

logger.Msg(msg)
}

func (l KsyncCometLogger) Error(msg string, keyvals ...interface{}) {
logger := l.logger.Error()

for i := 0; i < len(keyvals); i = i + 2 {
logger = logger.Str(fmt.Sprintf("%v", keyvals[i]), fmt.Sprintf("%v", keyvals[i+1]))
}

logger.Msg(msg)
}

func (l KsyncCometLogger) With(keyvals ...interface{}) (logger log.Logger) {
logger = KsyncCometLogger{logger: klogger.LogFormatter(keyvals)}
return
}
174 changes: 174 additions & 0 deletions engines/cometbft-v1/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package cometbft_v1

import (
"fmt"
bcproto "github.com/KYVENetwork/cometbft/v100/api/cometbft/blocksync/v1"
bc "github.com/KYVENetwork/cometbft/v100/blocksync"
bcv0 "github.com/KYVENetwork/cometbft/v100/blocksync"
cometLog "github.com/KYVENetwork/cometbft/v100/libs/log"
"github.com/KYVENetwork/cometbft/v100/p2p"
sm "github.com/KYVENetwork/cometbft/v100/state"
"github.com/KYVENetwork/cometbft/v100/version"
log "github.com/KYVENetwork/ksync/utils"
"reflect"
)

const (
BlocksyncChannel = byte(0x40)
)

var (
logger = log.KsyncLogger("p2p")
)

type BlockchainReactor struct {
p2p.BaseReactor

block *Block
nextBlock *Block
}

func NewBlockchainReactor(block *Block, nextBlock *Block) *BlockchainReactor {
bcR := &BlockchainReactor{
block: block,
nextBlock: nextBlock,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}

func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlocksyncChannel,
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
},
}
}

func (bcR *BlockchainReactor) sendStatusToPeer(src p2p.Peer) (queued bool) {
logger.Info().Int64("base", bcR.block.Height).Int64("height", bcR.block.Height+1).Msg("Sent status to peer")

return src.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.block.Height,
Height: bcR.block.Height + 1,
},
})
}

func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queued bool) {
if msg.Height == bcR.block.Height {
bl, err := bcR.block.ToProto()
if err != nil {
logger.Error().Str("could not convert msg to protobuf", err.Error())
return false
}

logger.Info().Msg(fmt.Sprintf("sent block with height %d to peer", bcR.block.Height))

return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
}

if msg.Height == bcR.nextBlock.Height {
bl, err := bcR.nextBlock.ToProto()
if err != nil {
logger.Error().Str("could not convert msg to protobuf", err.Error())
return false
}

logger.Info().Msg(fmt.Sprintf("sent block with height %d to peer", bcR.nextBlock.Height))

return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
}

logger.Error().Msg(fmt.Sprintf("peer asked for different block, expected = %d,%d, requested %d", bcR.block.Height, bcR.nextBlock.Height, msg.Height))
return false
}

func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
if err := bc.ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}

switch msg := e.Message.(type) {
case *bcproto.StatusRequest:
logger.Info().Msg("Incoming status request")
bcR.sendStatusToPeer(e.Src)
case *bcproto.BlockRequest:
logger.Info().Int64("height", msg.Height).Msg("Incoming block request")
bcR.sendBlockToPeer(msg, e.Src)
case *bcproto.StatusResponse:
logger.Info().Int64("base", msg.Base).Int64("height", msg.Height).Msgf("Incoming status response")
default:
logger.Error().Msg(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}

func MakeNodeInfo(
config *Config,
nodeKey *p2p.NodeKey,
genDoc *GenesisDoc,
) (p2p.NodeInfo, error) {
nodeInfo := p2p.DefaultNodeInfo{
ProtocolVersion: p2p.NewProtocolVersion(
version.P2PProtocol,
sm.InitStateVersion.Consensus.Block,
sm.InitStateVersion.Consensus.App,
),
DefaultNodeID: nodeKey.ID(),
Network: genDoc.ChainID,
Version: version.CMTSemVer,
Channels: []byte{bcv0.BlocksyncChannel},
Moniker: config.Moniker,
Other: p2p.DefaultNodeInfoOther{
TxIndex: "off",
RPCAddress: config.RPC.ListenAddress,
},
}

lAddr := config.P2P.ExternalAddress

if lAddr == "" {
lAddr = config.P2P.ListenAddress
}

nodeInfo.ListenAddr = lAddr

err := nodeInfo.Validate()
return nodeInfo, err
}

func CreateSwitch(config *Config,
transport p2p.Transport,
bcReactor p2p.Reactor,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
logger cometLog.Logger) *p2p.Switch {

sw := p2p.NewSwitch(
config.P2P,
transport,
)
sw.SetLogger(logger)
bcReactor.SetLogger(logger)
sw.AddReactor("BLOCKCHAIN", bcReactor)

sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)

logger.Info("P2P Node ID", "ID", nodeKey.ID())
return sw
}
Loading