Skip to content

Commit

Permalink
Merge branch 'master' into retryable-gas-usage-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
PlasmaPower authored Nov 27, 2023
2 parents 8faa55a + a1cd46c commit 25bcb40
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 54 deletions.
89 changes: 59 additions & 30 deletions execution/gethexec/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net"
"net/http"
"regexp"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -79,19 +80,23 @@ func AddOptionsForForwarderConfigImpl(prefix string, defaultConfig *ForwarderCon
}

type TxForwarder struct {
ctx context.Context

enabled atomic.Bool
target string
timeout time.Duration
transport *http.Transport
rpcClient *rpc.Client
ethClient *ethclient.Client

healthMutex sync.Mutex
healthErr error
healthChecked time.Time

targets []string
rpcClients []*rpc.Client
ethClients []*ethclient.Client
tryNewForwarderErrors *regexp.Regexp
}

func NewForwarder(target string, config *ForwarderConfig) *TxForwarder {
func NewForwarder(targets []string, config *ForwarderConfig) *TxForwarder {
dialer := net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 2 * time.Second,
Expand All @@ -116,34 +121,45 @@ func NewForwarder(target string, config *ForwarderConfig) *TxForwarder {
ExpectContinueTimeout: 1 * time.Second,
}
return &TxForwarder{
target: target,
timeout: config.ConnectionTimeout,
transport: transport,
targets: targets,
timeout: config.ConnectionTimeout,
transport: transport,
tryNewForwarderErrors: regexp.MustCompile(`(?i)(^http:|^json:|^i/0|timeout exceeded|no such host)`),
}
}

func (f *TxForwarder) ctxWithTimeout(inctx context.Context) (context.Context, context.CancelFunc) {
func (f *TxForwarder) ctxWithTimeout() (context.Context, context.CancelFunc) {
if f.timeout == time.Duration(0) {
return context.WithCancel(inctx)
return context.WithCancel(f.ctx)
}
return context.WithTimeout(inctx, f.timeout)
return context.WithTimeout(f.ctx, f.timeout)
}

func (f *TxForwarder) PublishTransaction(inctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
if !f.enabled.Load() {
return ErrNoSequencer
}
ctx, cancelFunc := f.ctxWithTimeout(inctx)
ctx, cancelFunc := f.ctxWithTimeout()
defer cancelFunc()
if options == nil {
return f.ethClient.SendTransaction(ctx, tx)
for pos, rpcClient := range f.rpcClients {
var err error
if options == nil {
err = f.ethClients[pos].SendTransaction(ctx, tx)
} else {
err = arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options)
}
if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
return err
}
log.Info("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
return arbitrum.SendConditionalTransactionRPC(ctx, f.rpcClient, tx, options)
return errors.New("failed to publish transaction to any of the forwarding targets")
}

const cacheUpstreamHealth = 2 * time.Second
const maxHealthTimeout = 10 * time.Second

// CheckHealth returns health of the highest priority forwarding target
func (f *TxForwarder) CheckHealth(inctx context.Context) error {
if !f.enabled.Load() {
return ErrNoSequencer
Expand All @@ -157,28 +173,41 @@ func (f *TxForwarder) CheckHealth(inctx context.Context) error {
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
f.healthErr = f.rpcClient.CallContext(ctx, nil, "arb_checkPublisherHealth")
f.healthErr = f.rpcClients[0].CallContext(ctx, nil, "arb_checkPublisherHealth")
f.healthChecked = time.Now()
}
return f.healthErr
}

func (f *TxForwarder) Initialize(inctx context.Context) error {
if f.target == "" {
f.rpcClient = nil
f.ethClient = nil
f.enabled.Store(false)
return nil
if f.ctx == nil {
f.ctx = inctx
}
ctx, cancelFunc := f.ctxWithTimeout(inctx)
ctx, cancelFunc := f.ctxWithTimeout()
defer cancelFunc()
rpcClient, err := rpc.DialTransport(ctx, f.target, f.transport)
if err != nil {
return err
var targets []string
var lastError error
for _, target := range f.targets {
if target == "" {
continue
}
rpcClient, err := rpc.DialTransport(ctx, target, f.transport)
if err != nil {
log.Warn("error initializing a forwarding client in txForwarder", "forwarding url", target, "err", err)
lastError = err
continue
}
targets = append(targets, target)
ethClient := ethclient.NewClient(rpcClient)
f.rpcClients = append(f.rpcClients, rpcClient)
f.ethClients = append(f.ethClients, ethClient)
}
f.targets = targets
if len(f.rpcClients) > 0 {
f.enabled.Store(true)
} else {
return lastError
}
f.rpcClient = rpcClient
f.ethClient = ethclient.NewClient(rpcClient)
f.enabled.Store(true)
return nil
}

Expand All @@ -192,8 +221,8 @@ func (f *TxForwarder) Start(ctx context.Context) error {
}

func (f *TxForwarder) StopAndWait() {
if f.ethClient != nil {
f.ethClient.Close() // internally closes also the rpc client
for _, ethClient := range f.ethClients {
ethClient.Close() // internally closes also the rpc client
}
}

Expand Down Expand Up @@ -346,7 +375,7 @@ func (f *RedisTxForwarder) update(ctx context.Context) time.Duration {
}
var newForwarder *TxForwarder
for {
newForwarder = NewForwarder(newSequencerUrl, f.config)
newForwarder = NewForwarder([]string{newSequencerUrl}, f.config)
err := newForwarder.Initialize(ctx)
if err == nil {
break
Expand Down
46 changes: 25 additions & 21 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ func DangerousConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type Config struct {
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
Dangerous DangerousConfig `koanf:"dangerous"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
Dangerous DangerousConfig `koanf:"dangerous"`

forwardingTarget string
}
Expand Down Expand Up @@ -77,6 +78,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
headerreader.AddOptions(prefix+".parent-chain-reader", f)
arbitrum.RecordingDatabaseConfigAddOptions(prefix+".recording-database", f)
f.String(prefix+".forwarding-target", ConfigDefault.ForwardingTarget, "transaction forwarding target URL, or \"null\" to disable forwarding (iff not sequencer)")
f.StringSlice(prefix+".secondary-forwarding-target", ConfigDefault.SecondaryForwardingTarget, "secondary transaction forwarding target URL")
AddOptionsForNodeForwarderConfig(prefix+".forwarder", f)
TxPreCheckerConfigAddOptions(prefix+".tx-pre-checker", f)
CachingConfigAddOptions(prefix+".caching", f)
Expand All @@ -85,16 +87,17 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
}

var ConfigDefault = Config{
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
ForwardingTarget: "",
TxPreChecker: DefaultTxPreCheckerConfig,
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: DefaultCachingConfig,
Dangerous: DefaultDangerousConfig,
Forwarder: DefaultNodeForwarderConfig,
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
ForwardingTarget: "",
SecondaryForwardingTarget: []string{},
TxPreChecker: DefaultTxPreCheckerConfig,
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: DefaultCachingConfig,
Dangerous: DefaultDangerousConfig,
Forwarder: DefaultNodeForwarderConfig,
}

func ConfigDefaultNonSequencerTest() *Config {
Expand Down Expand Up @@ -175,7 +178,8 @@ func CreateExecutionNode(
} else if config.forwardingTarget == "" {
txPublisher = NewTxDropper()
} else {
txPublisher = NewForwarder(config.forwardingTarget, &config.Forwarder)
targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
txPublisher = NewForwarder(targets, &config.Forwarder)
}
}

Expand Down
6 changes: 3 additions & 3 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,20 +490,20 @@ func (s *Sequencer) ForwardTarget() string {
if s.forwarder == nil {
return ""
}
return s.forwarder.target
return s.forwarder.targets[0]
}

func (s *Sequencer) ForwardTo(url string) error {
s.activeMutex.Lock()
defer s.activeMutex.Unlock()
if s.forwarder != nil {
if s.forwarder.target == url {
if s.forwarder.targets[0] == url {
log.Warn("attempted to update sequencer forward target with existing target", "url", url)
return nil
}
s.forwarder.Disable()
}
s.forwarder = NewForwarder(url, &s.config().Forwarder)
s.forwarder = NewForwarder([]string{url}, &s.config().Forwarder)
err := s.forwarder.Initialize(s.GetContext())
if err != nil {
log.Error("failed to set forward agent", "err", err)
Expand Down

0 comments on commit 25bcb40

Please sign in to comment.