Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(op-node/op-batcher/op-proposer): add fallbackClient #55

Merged
merged 23 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batcher

import (
"github.com/ethereum-optimism/optimism/op-service/client"
"time"

"github.com/ethereum/go-ethereum/ethclient"
Expand All @@ -22,7 +23,7 @@ import (
type Config struct {
log log.Logger
metr metrics.Metricer
L1Client *ethclient.Client
L1Client client.EthClient
L2Client *ethclient.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri

// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout)
l1Client, err := opclient.DialEthClientWithTimeoutAndFallback(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout, l, opclient.BatcherFallbackThreshold, m)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
// Required flags
L1EthRpcFlag = cli.StringFlag{
Name: "l1-eth-rpc",
Usage: "HTTP provider URL for L1",
Usage: "HTTP provider URL for L1. Multiple alternative addresses are supported, separated by commas, and the first address is used by default",
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "L1_ETH_RPC"),
}
L2EthRpcFlag = cli.StringFlag{
Expand Down
4 changes: 2 additions & 2 deletions op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package metrics

import (
"context"
"github.com/ethereum/go-ethereum"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -185,7 +185,7 @@ func (m *Metrics) Document() []opmetrics.DocumentedMetric {
}

func (m *Metrics) StartBalanceMetrics(ctx context.Context,
l log.Logger, client *ethclient.Client, account common.Address) {
l log.Logger, client ethereum.ChainStateReader, account common.Address) {
opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account)
}

Expand Down
3 changes: 3 additions & 0 deletions op-batcher/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {}
func (*noopMetrics) RecordBatchTxSubmitted() {}
func (*noopMetrics) RecordBatchTxSuccess() {}
func (*noopMetrics) RecordBatchTxFailed() {}

func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) {
}
120 changes: 120 additions & 0 deletions op-e2e/actions/fallback_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package actions

import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog"
service_client "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
"math/big"
"testing"
"time"
)

func setupFallbackClientTest(t Testing, sd *e2eutils.SetupData, log log.Logger, l1Url string) (*L1Miner, *L1Replica, *L1Replica, *L2Engine, *L2Sequencer, *sources.FallbackClient) {
jwtPath := e2eutils.WriteDefaultJWT(t)

miner := NewL1MinerWithPort(t, log, sd.L1Cfg, 8545)
l1_2 := NewL1ReplicaWithPort(t, log, sd.L1Cfg, 8546)
l1_3 := NewL1ReplicaWithPort(t, log, sd.L1Cfg, 8547)
isMultiUrl, urlList := service_client.MultiUrlParse(l1Url)
require.True(t, isMultiUrl)
opts := []client.RPCOption{
client.WithHttpPollInterval(0),
client.WithDialBackoff(10),
}
rpc, err := client.NewRPC(t.Ctx(), log, urlList[0], opts...)
require.NoError(t, err)
fallbackClient := sources.NewFallbackClient(t.Ctx(), rpc, urlList, log, sd.RollupCfg.L1ChainID, sd.RollupCfg.Genesis.L1, func(url string) (client.RPC, error) {
return client.NewRPC(t.Ctx(), log, url, opts...)
})
l1F, err := sources.NewL1Client(fallbackClient, log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

sequencer := NewL2Sequencer(t, log, l1F, l2Cl, sd.RollupCfg, 0)
return miner, l1_2, l1_3, engine, sequencer, fallbackClient.(*sources.FallbackClient)
}

func TestL1FallbackClient_SwitchUrl(gt *testing.T) {
t := NewDefaultTesting(gt)
p := &e2eutils.TestParams{
MaxSequencerDrift: 300,
SequencerWindowSize: 200,
ChannelTimeout: 120,
L1BlockTime: 12,
}
dp := e2eutils.MakeDeployParams(t, p)
sd := e2eutils.Setup(t, dp, defaultAlloc)
logT := testlog.Logger(t, log.LvlDebug)
miner, l1_2, _, engine, sequencer, fallbackClient := setupFallbackClientTest(t, sd, logT, "http://127.0.0.1:8545,http://127.0.0.1:8546,http://127.0.0.1:8547")
miner.ActL1SetFeeRecipient(common.Address{'A'})

sequencer.ActL2PipelineFull(t)

signer := types.LatestSigner(sd.L2Cfg.Config)
cl := engine.EthClient()
aliceTx := func() {
n, err := cl.PendingNonceAt(t.Ctx(), dp.Addresses.Alice)
require.NoError(t, err)
tx := types.MustSignNewTx(dp.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: sd.L2Cfg.Config.ChainID,
Nonce: n,
GasTipCap: big.NewInt(2 * params.GWei),
GasFeeCap: new(big.Int).Add(miner.l1Chain.CurrentBlock().BaseFee, big.NewInt(2*params.GWei)),
Gas: params.TxGas,
To: &dp.Addresses.Bob,
Value: e2eutils.Ether(2),
})
require.NoError(gt, cl.SendTransaction(t.Ctx(), tx))
}
makeL2BlockWithAliceTx := func() {
aliceTx()
sequencer.ActL2StartBlock(t)
engine.ActL2IncludeTx(dp.Addresses.Alice)(t) // include a test tx from alice
sequencer.ActL2EndBlock(t)
}

errRpc := miner.RPCClient().CallContext(t.Ctx(), nil, "admin_stopHTTP")
require.NoError(t, errRpc)

l2BlockCount := 0
for i := 0; i < 6; i++ {
miner.ActL1StartBlock(12)(t)
miner.ActL1EndBlock(t)
newBlock := miner.l1Chain.GetBlockByHash(miner.l1Chain.CurrentBlock().Hash())
_, err := l1_2.l1Chain.InsertChain([]*types.Block{newBlock})
require.NoError(t, err)

sequencer.L2Verifier.l1State.HandleNewL1HeadBlock(eth.L1BlockRef{
Hash: newBlock.Hash(),
Number: newBlock.NumberU64(),
ParentHash: newBlock.ParentHash(),
Time: newBlock.Time(),
})
origin := miner.l1Chain.CurrentBlock()

for sequencer.SyncStatus().UnsafeL2.Time+sd.RollupCfg.BlockTime < origin.Time {
makeL2BlockWithAliceTx()
//require.Equal(t, uint64(i), sequencer.SyncStatus().UnsafeL2.L1Origin.Number, "no L1 origin change before time matches")
l2BlockCount++
if l2BlockCount == 23 {
require.Equal(t, 1, fallbackClient.GetCurrentIndex(), "fallback client should switch url to second url")
errRpc2 := miner.RPCClient().CallContext(t.Ctx(), nil, "admin_startHTTP", "127.0.0.1", 8545, "*", "eth,net,web3,debug,admin,txpool", "*")
require.NoError(t, errRpc2)
}
if l2BlockCount == 34 {
require.Equal(t, 0, fallbackClient.GetCurrentIndex(), "fallback client should recover url to first url")
}
time.Sleep(500 * time.Millisecond)
}
}
}
7 changes: 7 additions & 0 deletions op-e2e/actions/l1_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func NewL1Miner(t Testing, log log.Logger, genesis *core.Genesis) *L1Miner {
}
}

func NewL1MinerWithPort(t Testing, log log.Logger, genesis *core.Genesis, port int) *L1Miner {
rep := NewL1ReplicaWithPort(t, log, genesis, port)
return &L1Miner{
L1Replica: *rep,
}
}

// ActL1StartBlock returns an action to build a new L1 block on top of the head block,
// with timeDelta added to the head block time.
func (s *L1Miner) ActL1StartBlock(timeDelta uint64) Action {
Expand Down
45 changes: 45 additions & 0 deletions op-e2e/actions/l1_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,51 @@ func NewL1Replica(t Testing, log log.Logger, genesis *core.Genesis) *L1Replica {
}
}

func NewL1ReplicaWithPort(t Testing, log log.Logger, genesis *core.Genesis, port int) *L1Replica {
ethCfg := &ethconfig.Config{
NetworkId: genesis.Config.ChainID.Uint64(),
Genesis: genesis,
RollupDisableTxPoolGossip: true,
}
nodeCfg := &node.Config{
Name: "l1-geth",
WSHost: "127.0.0.1",
WSPort: port,
HTTPHost: "127.0.0.1",
HTTPPort: port,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal"},
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal"},
DataDir: "", // in-memory
P2P: p2p.Config{
NoDiscovery: true,
NoDial: true,
},
}
n, err := node.New(nodeCfg)
require.NoError(t, err)
t.Cleanup(func() {
_ = n.Close()
})

backend, err := eth.New(n, ethCfg)
require.NoError(t, err)
backend.Merger().FinalizePoS()

n.RegisterAPIs(tracers.APIs(backend.APIBackend))

require.NoError(t, n.Start(), "failed to start L1 geth node")
return &L1Replica{
log: log,
node: n,
eth: backend,
l1Chain: backend.BlockChain(),
l1Database: backend.ChainDb(),
l1Cfg: genesis,
l1Signer: types.LatestSigner(genesis.Config),
failL1RPC: nil,
}
}

// ActL1RewindToParent rewinds the L1 chain to parent block of head
func (s *L1Replica) ActL1RewindToParent(t Testing) {
s.ActL1RewindDepth(1)(t)
Expand Down
2 changes: 1 addition & 1 deletion op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
/* Required Flags */
L1NodeAddr = cli.StringFlag{
Name: "l1",
Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required)",
Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required). Multiple alternative addresses are supported, separated by commas, and the first address is used by default",
Value: "http://127.0.0.1:8545",
EnvVar: prefixEnvVar("L1_ETH_RPC"),
}
Expand Down
10 changes: 10 additions & 0 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Metricer interface {
RecordIPUnban()
RecordDial(allow bool)
RecordAccept(allow bool)
RecordL1UrlSwitchEvent()
}

// Metrics tracks all the metrics for the op-node.
Expand All @@ -99,6 +100,7 @@ type Metrics struct {
DerivationErrors *EventMetrics
SequencingErrors *EventMetrics
PublishingErrors *EventMetrics
L1UrlSwitchEvent *EventMetrics

P2PReqDurationSeconds *prometheus.HistogramVec
P2PReqTotal *prometheus.CounterVec
Expand Down Expand Up @@ -236,6 +238,7 @@ func NewMetrics(procName string) *Metrics {
DerivationErrors: NewEventMetrics(factory, ns, "derivation_errors", "derivation errors"),
SequencingErrors: NewEventMetrics(factory, ns, "sequencing_errors", "sequencing errors"),
PublishingErrors: NewEventMetrics(factory, ns, "publishing_errors", "p2p publishing errors"),
L1UrlSwitchEvent: NewEventMetrics(factory, ns, "l1_url_switch", "L1 URL switch events"),

SequencerInconsistentL1Origin: NewEventMetrics(factory, ns, "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"),
SequencerResets: NewEventMetrics(factory, ns, "sequencer_resets", "sequencer resets"),
Expand Down Expand Up @@ -725,6 +728,10 @@ func (m *Metrics) RecordAccept(allow bool) {
}
}

func (m *Metrics) RecordL1UrlSwitchEvent() {
m.L1UrlSwitchEvent.RecordEvent()
}

type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)
Expand Down Expand Up @@ -845,3 +852,6 @@ func (n *noopMetricer) RecordDial(allow bool) {

func (n *noopMetricer) RecordAccept(allow bool) {
}

func (n *noopMetricer) RecordL1UrlSwitchEvent() {
}
19 changes: 19 additions & 0 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources"
service_client "github.com/ethereum-optimism/optimism/op-service/client"

"github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -175,6 +176,11 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
}

isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr)
if isMultiUrl {
return fallbackClientWrap(ctx, log, urlList, cfg, rollupCfg, opts...)
}

l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
Expand All @@ -184,6 +190,19 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
return l1Node, rpcCfg, nil
}

func fallbackClientWrap(ctx context.Context, logger log.Logger, urlList []string, cfg *L1EndpointConfig, rollupCfg *rollup.Config, opts ...client.RPCOption) (client.RPC, *sources.L1ClientConfig, error) {
l1Node, err := client.NewRPC(ctx, logger, urlList[0], opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", urlList[0], err)
}
l1Node = sources.NewFallbackClient(ctx, l1Node, urlList, logger, rollupCfg.L1ChainID, rollupCfg.Genesis.L1, func(url string) (client.RPC, error) {
return client.NewRPC(ctx, logger, url, opts...)
})
rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind)
rpcCfg.MaxRequestsPerBatch = cfg.BatchSize
return l1Node, rpcCfg, nil
}

// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
type PreparedL1Endpoint struct {
Client client.RPC
Expand Down
7 changes: 7 additions & 0 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
}
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
})

if fallbackClient, ok := l1Node.(*sources.FallbackClient); ok {
fallbackClient.RegisterSubscribeFunc(func() (event.Subscription, error) {
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
}, &n.l1HeadsSub)
fallbackClient.RegisterMetrics(n.metrics)
}
go func() {
err, ok := <-n.l1HeadsSub.Err()
if !ok {
Expand Down
Loading