Skip to content

Commit

Permalink
feat(op-node/op-batcher/op-proposer): add fallbackClient (#55)
Browse files Browse the repository at this point in the history
* FallbackClient impl

* double check fail count

* RegisterSubscribeFunc

* FallbackClient for op-batcher,op-proposer

* miss currentClient

* add log and change order

* fallback client add fallbackThreshold

* add validateRpc

* add document

* add document

* Put the switching logic into goroutine and modify the code according to the comments

* add metrics and don't switch url when error is Rpc.Error

* use const to remove magic number

* fix NoopTxMetrics

* add TestL1FallbackClient_SwitchUrl e2e case

* should be >= threshold

* change threshold to 20

* log->logT

* miss make channel

* fix lint

---------

Co-authored-by: Welkin <[email protected]>
  • Loading branch information
welkin22 and Welkin authored Oct 17, 2023
1 parent 628b517 commit 6421d48
Show file tree
Hide file tree
Showing 25 changed files with 861 additions and 18 deletions.
4 changes: 3 additions & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package batcher
import (
"time"

"github.com/ethereum-optimism/optimism/op-service/client"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
Expand All @@ -22,7 +24,7 @@ import (
type Config struct {
log log.Logger
metr metrics.Metricer
L1Client *ethclient.Client
L1Client client.EthClient
L2Client *ethclient.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri

// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout)
l1Client, err := opclient.DialEthClientWithTimeoutAndFallback(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout, l, opclient.BatcherFallbackThreshold, m)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
// Required flags
L1EthRpcFlag = cli.StringFlag{
Name: "l1-eth-rpc",
Usage: "HTTP provider URL for L1",
Usage: "HTTP provider URL for L1. Multiple alternative addresses are supported, separated by commas, and the first address is used by default",
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "L1_ETH_RPC"),
}
L2EthRpcFlag = cli.StringFlag{
Expand Down
5 changes: 3 additions & 2 deletions op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package metrics
import (
"context"

"github.com/ethereum/go-ethereum"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -185,7 +186,7 @@ func (m *Metrics) Document() []opmetrics.DocumentedMetric {
}

func (m *Metrics) StartBalanceMetrics(ctx context.Context,
l log.Logger, client *ethclient.Client, account common.Address) {
l log.Logger, client ethereum.ChainStateReader, account common.Address) {
opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account)
}

Expand Down
3 changes: 3 additions & 0 deletions op-batcher/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {}
func (*noopMetrics) RecordBatchTxSubmitted() {}
func (*noopMetrics) RecordBatchTxSuccess() {}
func (*noopMetrics) RecordBatchTxFailed() {}

func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) {
}
120 changes: 120 additions & 0 deletions op-e2e/actions/fallback_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package actions

import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog"
service_client "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
"math/big"
"testing"
"time"
)

func setupFallbackClientTest(t Testing, sd *e2eutils.SetupData, log log.Logger, l1Url string) (*L1Miner, *L1Replica, *L1Replica, *L2Engine, *L2Sequencer, *sources.FallbackClient) {
jwtPath := e2eutils.WriteDefaultJWT(t)

miner := NewL1MinerWithPort(t, log, sd.L1Cfg, 8545)
l1_2 := NewL1ReplicaWithPort(t, log, sd.L1Cfg, 8546)
l1_3 := NewL1ReplicaWithPort(t, log, sd.L1Cfg, 8547)
isMultiUrl, urlList := service_client.MultiUrlParse(l1Url)
require.True(t, isMultiUrl)
opts := []client.RPCOption{
client.WithHttpPollInterval(0),
client.WithDialBackoff(10),
}
rpc, err := client.NewRPC(t.Ctx(), log, urlList[0], opts...)
require.NoError(t, err)
fallbackClient := sources.NewFallbackClient(t.Ctx(), rpc, urlList, log, sd.RollupCfg.L1ChainID, sd.RollupCfg.Genesis.L1, func(url string) (client.RPC, error) {
return client.NewRPC(t.Ctx(), log, url, opts...)
})
l1F, err := sources.NewL1Client(fallbackClient, log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

sequencer := NewL2Sequencer(t, log, l1F, l2Cl, sd.RollupCfg, 0)
return miner, l1_2, l1_3, engine, sequencer, fallbackClient.(*sources.FallbackClient)
}

func TestL1FallbackClient_SwitchUrl(gt *testing.T) {
t := NewDefaultTesting(gt)
p := &e2eutils.TestParams{
MaxSequencerDrift: 300,
SequencerWindowSize: 200,
ChannelTimeout: 120,
L1BlockTime: 12,
}
dp := e2eutils.MakeDeployParams(t, p)
sd := e2eutils.Setup(t, dp, defaultAlloc)
logT := testlog.Logger(t, log.LvlDebug)
miner, l1_2, _, engine, sequencer, fallbackClient := setupFallbackClientTest(t, sd, logT, "http://127.0.0.1:8545,http://127.0.0.1:8546,http://127.0.0.1:8547")
miner.ActL1SetFeeRecipient(common.Address{'A'})

sequencer.ActL2PipelineFull(t)

signer := types.LatestSigner(sd.L2Cfg.Config)
cl := engine.EthClient()
aliceTx := func() {
n, err := cl.PendingNonceAt(t.Ctx(), dp.Addresses.Alice)
require.NoError(t, err)
tx := types.MustSignNewTx(dp.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: sd.L2Cfg.Config.ChainID,
Nonce: n,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: new(big.Int).Add(miner.l1Chain.CurrentBlock().BaseFee, big.NewInt(2*params.GWei)),
Gas: params.TxGas,
To: &dp.Addresses.Bob,
Value: e2eutils.Ether(2),
})
require.NoError(gt, cl.SendTransaction(t.Ctx(), tx))
}
makeL2BlockWithAliceTx := func() {
aliceTx()
sequencer.ActL2StartBlock(t)
engine.ActL2IncludeTx(dp.Addresses.Alice)(t) // include a test tx from alice
sequencer.ActL2EndBlock(t)
}

errRpc := miner.RPCClient().CallContext(t.Ctx(), nil, "admin_stopHTTP")
require.NoError(t, errRpc)

l2BlockCount := 0
for i := 0; i < 6; i++ {
miner.ActL1StartBlock(12)(t)
miner.ActL1EndBlock(t)
newBlock := miner.l1Chain.GetBlockByHash(miner.l1Chain.CurrentBlock().Hash())
_, err := l1_2.l1Chain.InsertChain([]*types.Block{newBlock})
require.NoError(t, err)

sequencer.L2Verifier.l1State.HandleNewL1HeadBlock(eth.L1BlockRef{
Hash: newBlock.Hash(),
Number: newBlock.NumberU64(),
ParentHash: newBlock.ParentHash(),
Time: newBlock.Time(),
})
origin := miner.l1Chain.CurrentBlock()

for sequencer.SyncStatus().UnsafeL2.Time+sd.RollupCfg.BlockTime < origin.Time {
makeL2BlockWithAliceTx()
//require.Equal(t, uint64(i), sequencer.SyncStatus().UnsafeL2.L1Origin.Number, "no L1 origin change before time matches")
l2BlockCount++
if l2BlockCount == 23 {
require.Equal(t, 1, fallbackClient.GetCurrentIndex(), "fallback client should switch url to second url")
errRpc2 := miner.RPCClient().CallContext(t.Ctx(), nil, "admin_startHTTP", "127.0.0.1", 8545, "*", "eth,net,web3,debug,admin,txpool", "*")
require.NoError(t, errRpc2)
}
if l2BlockCount == 34 {
require.Equal(t, 0, fallbackClient.GetCurrentIndex(), "fallback client should recover url to first url")
}
time.Sleep(500 * time.Millisecond)
}
}
}
7 changes: 7 additions & 0 deletions op-e2e/actions/l1_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func NewL1Miner(t Testing, log log.Logger, genesis *core.Genesis) *L1Miner {
}
}

func NewL1MinerWithPort(t Testing, log log.Logger, genesis *core.Genesis, port int) *L1Miner {
rep := NewL1ReplicaWithPort(t, log, genesis, port)
return &L1Miner{
L1Replica: *rep,
}
}

// ActL1StartBlock returns an action to build a new L1 block on top of the head block,
// with timeDelta added to the head block time.
func (s *L1Miner) ActL1StartBlock(timeDelta uint64) Action {
Expand Down
45 changes: 45 additions & 0 deletions op-e2e/actions/l1_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,51 @@ func NewL1Replica(t Testing, log log.Logger, genesis *core.Genesis) *L1Replica {
}
}

func NewL1ReplicaWithPort(t Testing, log log.Logger, genesis *core.Genesis, port int) *L1Replica {
ethCfg := &ethconfig.Config{
NetworkId: genesis.Config.ChainID.Uint64(),
Genesis: genesis,
RollupDisableTxPoolGossip: true,
}
nodeCfg := &node.Config{
Name: "l1-geth",
WSHost: "127.0.0.1",
WSPort: port,
HTTPHost: "127.0.0.1",
HTTPPort: port,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal"},
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal"},
DataDir: "", // in-memory
P2P: p2p.Config{
NoDiscovery: true,
NoDial: true,
},
}
n, err := node.New(nodeCfg)
require.NoError(t, err)
t.Cleanup(func() {
_ = n.Close()
})

backend, err := eth.New(n, ethCfg)
require.NoError(t, err)
backend.Merger().FinalizePoS()

n.RegisterAPIs(tracers.APIs(backend.APIBackend))

require.NoError(t, n.Start(), "failed to start L1 geth node")
return &L1Replica{
log: log,
node: n,
eth: backend,
l1Chain: backend.BlockChain(),
l1Database: backend.ChainDb(),
l1Cfg: genesis,
l1Signer: types.LatestSigner(genesis.Config),
failL1RPC: nil,
}
}

// ActL1RewindToParent rewinds the L1 chain to parent block of head
func (s *L1Replica) ActL1RewindToParent(t Testing) {
s.ActL1RewindDepth(1)(t)
Expand Down
2 changes: 1 addition & 1 deletion op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
/* Required Flags */
L1NodeAddr = cli.StringFlag{
Name: "l1",
Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required)",
Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required). Multiple alternative addresses are supported, separated by commas, and the first address is used by default",
Value: "http://127.0.0.1:8545",
EnvVar: prefixEnvVar("L1_ETH_RPC"),
}
Expand Down
10 changes: 10 additions & 0 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Metricer interface {
RecordIPUnban()
RecordDial(allow bool)
RecordAccept(allow bool)
RecordL1UrlSwitchEvent()
}

// Metrics tracks all the metrics for the op-node.
Expand All @@ -99,6 +100,7 @@ type Metrics struct {
DerivationErrors *EventMetrics
SequencingErrors *EventMetrics
PublishingErrors *EventMetrics
L1UrlSwitchEvent *EventMetrics

P2PReqDurationSeconds *prometheus.HistogramVec
P2PReqTotal *prometheus.CounterVec
Expand Down Expand Up @@ -236,6 +238,7 @@ func NewMetrics(procName string) *Metrics {
DerivationErrors: NewEventMetrics(factory, ns, "derivation_errors", "derivation errors"),
SequencingErrors: NewEventMetrics(factory, ns, "sequencing_errors", "sequencing errors"),
PublishingErrors: NewEventMetrics(factory, ns, "publishing_errors", "p2p publishing errors"),
L1UrlSwitchEvent: NewEventMetrics(factory, ns, "l1_url_switch", "L1 URL switch events"),

SequencerInconsistentL1Origin: NewEventMetrics(factory, ns, "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"),
SequencerResets: NewEventMetrics(factory, ns, "sequencer_resets", "sequencer resets"),
Expand Down Expand Up @@ -725,6 +728,10 @@ func (m *Metrics) RecordAccept(allow bool) {
}
}

func (m *Metrics) RecordL1UrlSwitchEvent() {
m.L1UrlSwitchEvent.RecordEvent()
}

type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)
Expand Down Expand Up @@ -845,3 +852,6 @@ func (n *noopMetricer) RecordDial(allow bool) {

func (n *noopMetricer) RecordAccept(allow bool) {
}

func (n *noopMetricer) RecordL1UrlSwitchEvent() {
}
19 changes: 19 additions & 0 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources"
service_client "github.com/ethereum-optimism/optimism/op-service/client"

"github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -175,6 +176,11 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}

isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr)
if isMultiUrl {
return fallbackClientWrap(ctx, log, urlList, cfg, rollupCfg, opts...)
}

l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
Expand All @@ -184,6 +190,19 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
return l1Node, rpcCfg, nil
}

func fallbackClientWrap(ctx context.Context, logger log.Logger, urlList []string, cfg *L1EndpointConfig, rollupCfg *rollup.Config, opts ...client.RPCOption) (client.RPC, *sources.L1ClientConfig, error) {
l1Node, err := client.NewRPC(ctx, logger, urlList[0], opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", urlList[0], err)
}
l1Node = sources.NewFallbackClient(ctx, l1Node, urlList, logger, rollupCfg.L1ChainID, rollupCfg.Genesis.L1, func(url string) (client.RPC, error) {
return client.NewRPC(ctx, logger, url, opts...)
})
rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind)
rpcCfg.MaxRequestsPerBatch = cfg.BatchSize
return l1Node, rpcCfg, nil
}

// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
type PreparedL1Endpoint struct {
Client client.RPC
Expand Down
7 changes: 7 additions & 0 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
}
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
})

if fallbackClient, ok := l1Node.(*sources.FallbackClient); ok {
fallbackClient.RegisterSubscribeFunc(func() (event.Subscription, error) {
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
}, &n.l1HeadsSub)
fallbackClient.RegisterMetrics(n.metrics)
}
go func() {
err, ok := <-n.l1HeadsSub.Err()
if !ok {
Expand Down
Loading

0 comments on commit 6421d48

Please sign in to comment.