Skip to content

Commit

Permalink
Add interface between batcher -> proposer to speed up proposals of sa…
Browse files Browse the repository at this point in the history
…fe head
  • Loading branch information
mdehoog committed Oct 5, 2024
1 parent bc026e5 commit a7cf583
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ require (

replace github.com/ethereum/go-ethereum => github.com/mdehoog/op-geth v0.0.0-20241003075401-d8f4cde5a852

replace github.com/ethereum-optimism/optimism => github.com/mdehoog/optimism v0.0.0-20241004232359-14da93be19e5
replace github.com/ethereum-optimism/optimism => github.com/mdehoog/optimism v0.0.0-20241005031606-445adf498c86
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWV
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mdehoog/op-geth v0.0.0-20241003075401-d8f4cde5a852 h1:sxczPjBB5xXnQWhdSrLdQLLIi/ZVv22pVOmd6xrHq8A=
github.com/mdehoog/op-geth v0.0.0-20241003075401-d8f4cde5a852/go.mod h1:7S4pp8KHBmEmKkRjL1BPOc6jY9hW+64YeMUjR3RVLw4=
github.com/mdehoog/optimism v0.0.0-20241004232359-14da93be19e5 h1:9Lm/4Jq+YEBWgEw9dX8QUCB8CvyZ8fDSi62Al7ebZns=
github.com/mdehoog/optimism v0.0.0-20241004232359-14da93be19e5/go.mod h1:cg1f0VUXn/9jfek2sWOrVRxxlxH7ia4UEZvEMa1X5/4=
github.com/mdehoog/optimism v0.0.0-20241005031606-445adf498c86 h1:FXEWfWyCxCrtUhJmlqljN7nwF6SLl2CdvhxOJYsQ5Lc=
github.com/mdehoog/optimism v0.0.0-20241005031606-445adf498c86/go.mod h1:cg1f0VUXn/9jfek2sWOrVRxxlxH7ia4UEZvEMa1X5/4=
github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
github.com/mdlayher/vsock v1.2.1 h1:pC1mTJTvjo1r9n9fbm7S1j04rCgCzhCOS5DY0zqHlnQ=
Expand Down
15 changes: 12 additions & 3 deletions op-batcher/batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/rpc"
thisflags "github.com/mdehoog/op-enclave/op-batcher/flags"
"github.com/urfave/cli/v2"
)

Expand All @@ -26,11 +28,18 @@ func Main(version string) cliapp.LifecycleAction {

l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.Handler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
opservice.ValidateEnvVars(flags.EnvVarPrefix, thisflags.Flags, l)

proposerClient, err := rpc.DialContext(cliCtx.Context, cliCtx.String(thisflags.ProposerRpcFlag.Name))
if err != nil {
return nil, fmt.Errorf("failed to connect to Proposer: %w", err)
}

l.Info("Initializing Batch Submitter")
channelFactoryOpt := func(service *batcher.BatcherService, cfg *batcher.CLIConfig) {
service.ChannelFactory = NewChannel
channelFactoryOpt := func(setup *batcher.DriverSetup) {
metricer := NewMetricer(setup.Metr, setup.Log, proposerClient)
setup.Metr = metricer
setup.ChannelOutFactory = ChannelOutFactory(metricer)
}
return batcher.BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l, channelFactoryOpt)
}
Expand Down
44 changes: 0 additions & 44 deletions op-batcher/batcher/channel.go

This file was deleted.

57 changes: 57 additions & 0 deletions op-batcher/batcher/channel_out.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package batcher

import (
"errors"

"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
"github.com/ethereum/go-ethereum/core/types"
)

var ErrWithdrawalDetected = errors.New("withdrawal detected")

type ChannelOut interface {
derive.ChannelOut
Blocks() []*types.Block
}

func ChannelOutFactory(metricer Metricer) batcher.ChannelOutFactory {
return func(cfg batcher.ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) {
co, err := batcher.NewChannelOut(cfg, rollupCfg)
if err != nil {
return nil, err
}
wrapped := &channelOut{
ChannelOut: co,
}
metricer.RegisterChannel(wrapped)
return wrapped, nil
}
}

type channelOut struct {
derive.ChannelOut
fullErr error
blocks []*types.Block
}

func (c *channelOut) Blocks() []*types.Block {
return c.blocks
}

func (c *channelOut) AddBlock(config *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error) {
c.blocks = append(c.blocks, block)
if block.Bloom().Test(predeploys.L2ToL1MessagePasserAddr.Bytes()) {
c.fullErr = ErrWithdrawalDetected
}
return c.ChannelOut.AddBlock(config, block)
}

func (c *channelOut) FullErr() error {
if c.fullErr != nil {
return c.fullErr
}
return c.ChannelOut.FullErr()
}
55 changes: 55 additions & 0 deletions op-batcher/batcher/metricer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package batcher

import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

type Metricer interface {
metrics.Metricer
RegisterChannel(out ChannelOut)
}

type metricer struct {
metrics.Metricer
log log.Logger
proposerClient *rpc.Client
channels map[derive.ChannelID]ChannelOut
}

func NewMetricer(m metrics.Metricer, log log.Logger, proposerClient *rpc.Client) Metricer {
return &metricer{
Metricer: m,
log: log,
proposerClient: proposerClient,
channels: make(map[derive.ChannelID]ChannelOut),
}
}

func (m *metricer) RegisterChannel(out ChannelOut) {
m.channels[out.ID()] = out
}

func (m *metricer) RecordChannelFullySubmitted(id derive.ChannelID) {
m.Metricer.RecordChannelFullySubmitted(id)

channel, ok := m.channels[id]
if !ok {
return
}
delete(m.channels, id)
var numbers []uint64
for _, b := range channel.Blocks() {
numbers = append(numbers, b.NumberU64())
}
if err := m.proposerClient.Call(nil, "admin_blocksBatched", numbers); err != nil {
m.log.Error("failed to notify Proposer of batched blocks", "err", err)
}
}

func (m *metricer) RecordChannelTimedOut(id derive.ChannelID) {
m.Metricer.RecordChannelTimedOut(id)
delete(m.channels, id)
}
2 changes: 1 addition & 1 deletion op-batcher/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"os"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/mdehoog/op-enclave/op-batcher/batcher"
"github.com/mdehoog/op-enclave/op-batcher/flags"
"github.com/urfave/cli/v2"

opservice "github.com/ethereum-optimism/optimism/op-service"
Expand Down
30 changes: 30 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package flags

import (
"github.com/ethereum-optimism/optimism/op-batcher/flags"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2"
)

func prefixEnvVar(name string) []string {
return opservice.PrefixEnvVar(flags.EnvVarPrefix, name)
}

var (
ProposerRpcFlag = &cli.StringFlag{
Name: "proposer-rpc",
Usage: "HTTP provider URL for the Proposer",
EnvVars: prefixEnvVar("PROPOSER_RPC"),
Required: true,
}
)

var requiredFlags = []cli.Flag{
ProposerRpcFlag,
}

func init() {
Flags = append(requiredFlags, flags.Flags...)
}

var Flags []cli.Flag
59 changes: 49 additions & 10 deletions op-proposer/proposer/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type L2OutputSubmitter struct {
ooABI *abi.ABI

prover *Prover

blocksBatched map[uint64]struct{}
blocksBatchedMutex sync.Mutex
}

// NewL2OutputSubmitter creates a new L2 Output Submitter
Expand Down Expand Up @@ -109,6 +112,8 @@ func newL2OOSubmitter(ctx context.Context, cancel context.CancelFunc, setup Driv
ooContract: ooContract,
ooABI: parsed,
prover: prover,

blocksBatched: make(map[uint64]struct{}),
}, nil
}

Expand Down Expand Up @@ -157,6 +162,43 @@ func (l *L2OutputSubmitter) StopL2OutputSubmitting() error {
return nil
}

func (l *L2OutputSubmitter) BlocksBatched(numbers []uint64) error {
l.blocksBatchedMutex.Lock()
defer l.blocksBatchedMutex.Unlock()
for _, number := range numbers {
l.blocksBatched[number] = struct{}{}
}
return nil
}

func (l *L2OutputSubmitter) LatestBlockBatched(ctx context.Context) (uint64, error) {
syncStatus, err := l.RollupClient.SyncStatus(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get sync status from Rollup: %w", err)
}
batched := syncStatus.FinalizedL2.Number
if l.Cfg.AllowNonFinalized {
batched = syncStatus.PendingSafeL2.Number

l.blocksBatchedMutex.Lock()
defer l.blocksBatchedMutex.Unlock()

for number := range l.blocksBatched {
if number <= batched {
delete(l.blocksBatched, number)
}
}

// iterate through the batched blocks to find the last contiguous batched block number
for i := batched + 1; ; i++ {
if _, ok := l.blocksBatched[i]; !ok {
return i - 1, nil
}
}
}
return batched, nil
}

// loop is responsible for creating & submitting the next outputs
// The loop regularly polls the L2 chain to infer whether to make the next proposal.
func (l *L2OutputSubmitter) loop() {
Expand Down Expand Up @@ -224,31 +266,28 @@ func (l *L2OutputSubmitter) generateNextProposal(ctx context.Context, lastPropos
}

// generate new proposals up to the latest block
syncStatus, err := l.RollupClient.SyncStatus(ctx)
batchedBlockNumber, err := l.LatestBlockBatched(ctx)
if err != nil {
return nil, false, fmt.Errorf("failed to get sync status from Rollup: %w", err)
}
latestBlockNumber := syncStatus.FinalizedL2.Number
if l.Cfg.AllowNonFinalized {
latestBlockNumber = syncStatus.PendingSafeL2.Number
return nil, false, err
}

// TODO implement proposal array limit (aggregate in chunks)
// TODO implement a pool of go-routines for parallel proof generation
// TODO generate proofs for unsafe blocks ahead of time
var proposals []*Proposal
if lastProposal != nil {
proposals = append(proposals, lastProposal)
}
shouldPropose := lastProposalBlockNumber < latestBlockNumber &&
l.Cfg.MinProposalInterval > 0 && latestBlockNumber-proposedBlockNumber > l.Cfg.MinProposalInterval
for i := lastProposalBlockNumber + 1; i <= latestBlockNumber; i++ {
shouldPropose := lastProposalBlockNumber < batchedBlockNumber &&
l.Cfg.MinProposalInterval > 0 && batchedBlockNumber-proposedBlockNumber > l.Cfg.MinProposalInterval
for i := lastProposalBlockNumber + 1; i <= batchedBlockNumber; i++ {
proposal, anyWithdrawals, err := l.prover.Generate(ctx, i)
if err != nil {
return nil, false, fmt.Errorf("failed to generate proof for block %d: %w", i, err)
}
proposals = append(proposals, proposal)
shouldPropose = shouldPropose || anyWithdrawals
l.Log.Info("Generated proof for block", "block", i, "latest", latestBlockNumber, "shouldPropose", shouldPropose, "output", proposal.Output.OutputRoot.String())
l.Log.Info("Generated proof for block", "block", i, "batched", batchedBlockNumber, "shouldPropose", shouldPropose, "output", proposal.Output.OutputRoot.String())
}

if len(proposals) == 0 {
Expand Down
34 changes: 34 additions & 0 deletions op-proposer/proposer/rpc/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package rpc

import (
"context"

"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/rpc"
)

type ProposerDriver interface {
BlocksBatched(numbers []uint64) error
}

type adminAPI struct {
*rpc.CommonAdminAPI
b ProposerDriver
}

func NewAdminAPI(dr ProposerDriver, m metrics.RPCMetricer, log log.Logger) gethrpc.API {
return gethrpc.API{
Namespace: "admin",
Service: &adminAPI{
CommonAdminAPI: rpc.NewCommonAdminAPI(m, log),
b: dr,
},
}
}

func (a *adminAPI) BlocksBatched(_ context.Context, numbers []uint64) error {
return a.b.BlocksBatched(numbers)
}
Loading

0 comments on commit a7cf583

Please sign in to comment.