Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backup transaction forwarding URLs #1975

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 62 additions & 30 deletions execution/gethexec/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net"
"net/http"
"regexp"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -79,19 +80,23 @@ func AddOptionsForForwarderConfigImpl(prefix string, defaultConfig *ForwarderCon
}

type TxForwarder struct {
ctx context.Context

enabled atomic.Bool
target string
timeout time.Duration
transport *http.Transport
rpcClient *rpc.Client
ethClient *ethclient.Client

healthMutex sync.Mutex
healthErr error
healthChecked time.Time

targets []string
rpcClients []*rpc.Client
ethClients []*ethclient.Client
tryNewForwarderErrors *regexp.Regexp
}

func NewForwarder(target string, config *ForwarderConfig) *TxForwarder {
func NewForwarder(targets []string, config *ForwarderConfig) *TxForwarder {
dialer := net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 2 * time.Second,
Expand All @@ -116,34 +121,45 @@ func NewForwarder(target string, config *ForwarderConfig) *TxForwarder {
ExpectContinueTimeout: 1 * time.Second,
}
return &TxForwarder{
target: target,
timeout: config.ConnectionTimeout,
transport: transport,
targets: targets,
timeout: config.ConnectionTimeout,
transport: transport,
tryNewForwarderErrors: regexp.MustCompile(`(?i)(http:|json:|.*Timeout exceeded).*`),
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (f *TxForwarder) ctxWithTimeout(inctx context.Context) (context.Context, context.CancelFunc) {
func (f *TxForwarder) ctxWithTimeout() (context.Context, context.CancelFunc) {
if f.timeout == time.Duration(0) {
return context.WithCancel(inctx)
return context.WithCancel(f.ctx)
}
return context.WithTimeout(inctx, f.timeout)
return context.WithTimeout(f.ctx, f.timeout)
}

func (f *TxForwarder) PublishTransaction(inctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
if !f.enabled.Load() {
return ErrNoSequencer
}
ctx, cancelFunc := f.ctxWithTimeout(inctx)
ctx, cancelFunc := f.ctxWithTimeout()
defer cancelFunc()
if options == nil {
return f.ethClient.SendTransaction(ctx, tx)
for pos, rpcClient := range f.rpcClients {
var err error
if options == nil {
err = f.ethClients[pos].SendTransaction(ctx, tx)
} else {
err = arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options)
}
if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
return err
}
log.Info("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
return arbitrum.SendConditionalTransactionRPC(ctx, f.rpcClient, tx, options)
return errors.New("failed to publish transaction to any of the forwarding targets")
}

const cacheUpstreamHealth = 2 * time.Second
const maxHealthTimeout = 10 * time.Second

// CheckHealth returns health of the highest priority forwarding target
func (f *TxForwarder) CheckHealth(inctx context.Context) error {
if !f.enabled.Load() {
return ErrNoSequencer
Expand All @@ -157,28 +173,44 @@ func (f *TxForwarder) CheckHealth(inctx context.Context) error {
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
f.healthErr = f.rpcClient.CallContext(ctx, nil, "arb_checkPublisherHealth")
f.healthErr = f.rpcClients[0].CallContext(ctx, nil, "arb_checkPublisherHealth")
f.healthChecked = time.Now()
}
return f.healthErr
}

func (f *TxForwarder) Initialize(inctx context.Context) error {
if f.target == "" {
f.rpcClient = nil
f.ethClient = nil
f.enabled.Store(false)
return nil
if f.ctx == nil {
f.ctx = inctx
}

pos := 0
for pos < len(f.targets) {
if f.targets[pos] == "" {
f.targets = append(f.targets[:pos], f.targets[pos+1:]...)
} else {
pos += 1
}
}
ctx, cancelFunc := f.ctxWithTimeout(inctx)

ctx, cancelFunc := f.ctxWithTimeout()
defer cancelFunc()
rpcClient, err := rpc.DialTransport(ctx, f.target, f.transport)
if err != nil {
return err
pos = 0
for pos < len(f.targets) {
rpcClient, err := rpc.DialTransport(ctx, f.targets[pos], f.transport)
if err != nil {
log.Warn("error initializing a forwarding client in txForwarder", "forwarding url", f.targets[pos], "err", err)
f.targets = append(f.targets[:pos], f.targets[pos+1:]...)
continue
}
ethClient := ethclient.NewClient(rpcClient)
f.rpcClients = append(f.rpcClients, rpcClient)
f.ethClients = append(f.ethClients, ethClient)
pos += 1
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if len(f.rpcClients) > 0 {
f.enabled.Store(true)
}
f.rpcClient = rpcClient
f.ethClient = ethclient.NewClient(rpcClient)
f.enabled.Store(true)
return nil
}

Expand All @@ -192,8 +224,8 @@ func (f *TxForwarder) Start(ctx context.Context) error {
}

func (f *TxForwarder) StopAndWait() {
if f.ethClient != nil {
f.ethClient.Close() // internally closes also the rpc client
for _, ethClient := range f.ethClients {
ethClient.Close() // internally closes also the rpc client
}
}

Expand Down Expand Up @@ -346,7 +378,7 @@ func (f *RedisTxForwarder) update(ctx context.Context) time.Duration {
}
var newForwarder *TxForwarder
for {
newForwarder = NewForwarder(newSequencerUrl, f.config)
newForwarder = NewForwarder([]string{newSequencerUrl}, f.config)
err := newForwarder.Initialize(ctx)
if err == nil {
break
Expand Down
46 changes: 25 additions & 21 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ func DangerousConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type Config struct {
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
Dangerous DangerousConfig `koanf:"dangerous"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
Dangerous DangerousConfig `koanf:"dangerous"`

forwardingTarget string
}
Expand Down Expand Up @@ -77,6 +78,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
headerreader.AddOptions(prefix+".parent-chain-reader", f)
arbitrum.RecordingDatabaseConfigAddOptions(prefix+".recording-database", f)
f.String(prefix+".forwarding-target", ConfigDefault.ForwardingTarget, "transaction forwarding target URL, or \"null\" to disable forwarding (iff not sequencer)")
f.StringSlice(prefix+".secondary-forwarding-target", ConfigDefault.SecondaryForwardingTarget, "secondary transaction forwarding target URL")
AddOptionsForNodeForwarderConfig(prefix+".forwarder", f)
TxPreCheckerConfigAddOptions(prefix+".tx-pre-checker", f)
CachingConfigAddOptions(prefix+".caching", f)
Expand All @@ -85,16 +87,17 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
}

var ConfigDefault = Config{
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
ForwardingTarget: "",
TxPreChecker: DefaultTxPreCheckerConfig,
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: DefaultCachingConfig,
Dangerous: DefaultDangerousConfig,
Forwarder: DefaultNodeForwarderConfig,
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
ForwardingTarget: "",
SecondaryForwardingTarget: []string{},
TxPreChecker: DefaultTxPreCheckerConfig,
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: DefaultCachingConfig,
Dangerous: DefaultDangerousConfig,
Forwarder: DefaultNodeForwarderConfig,
}

func ConfigDefaultNonSequencerTest() *Config {
Expand Down Expand Up @@ -176,7 +179,8 @@ func CreateExecutionNode(
} else if config.forwardingTarget == "" {
txPublisher = NewTxDropper()
} else {
txPublisher = NewForwarder(config.forwardingTarget, &config.Forwarder)
targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
txPublisher = NewForwarder(targets, &config.Forwarder)
}
}

Expand Down
6 changes: 3 additions & 3 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,20 +489,20 @@ func (s *Sequencer) ForwardTarget() string {
if s.forwarder == nil {
return ""
}
return s.forwarder.target
return s.forwarder.targets[0]
}

func (s *Sequencer) ForwardTo(url string) error {
s.activeMutex.Lock()
defer s.activeMutex.Unlock()
if s.forwarder != nil {
if s.forwarder.target == url {
if s.forwarder.targets[0] == url {
log.Warn("attempted to update sequencer forward target with existing target", "url", url)
return nil
}
s.forwarder.Disable()
}
s.forwarder = NewForwarder(url, &s.config().Forwarder)
s.forwarder = NewForwarder([]string{url}, &s.config().Forwarder)
err := s.forwarder.Initialize(s.GetContext())
if err != nil {
log.Error("failed to set forward agent", "err", err)
Expand Down
Loading