Skip to content

Commit

Permalink
Merge branch 'andrius/bump-gas-on-retry' into 'main'
Browse files Browse the repository at this point in the history
Submitter: increase gas price multiplier on retry

See merge request flarenetwork/flare-system-client!42
  • Loading branch information
mboben committed Oct 3, 2024
2 parents 352f062 + 005d052 commit 3b831a6
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 13 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ api_endpoint = "http://localhost:3000/ftso2"
enabled = true # (optional) set to false to disable a specific submitter, default: true
start_offset = "5s" # start fetching data and submitting txs after this offset from the start of the epoch
tx_submit_retries = 1 # (optional) number of retries for submitting txs, default: 1
tx_submit_timeout = "5s" # (optional) timeout for waiting tx to be mined, default: 5s
data_fetch_retries = 1 # (optional) number of retries for fetching data from the API, default: 1
data_fetch_timeout = "5s" # (optional) timeout for fetching data from the API, default: 5s

[submit2]
enabled = true
start_offset = "15s" # start fetching data and submitting txs after this offset from the start of the NEXT epoch
tx_submit_retries = 1
tx_submit_timeout = "5s"
data_fetch_retries = 1
data_fetch_timeout = "5s"

Expand Down
8 changes: 8 additions & 0 deletions client/config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type CredentialsConfig struct {

var defaultSubmitConfig = SubmitConfig{
Enabled: true,
TxSubmitRetries: 1,
TxSubmitTimeout: 5 * time.Second,
DataFetchRetries: 1,
DataFetchTimeout: 5 * time.Second,
}
Expand All @@ -72,6 +74,7 @@ type SubmitConfig struct {
Enabled bool `toml:"enabled"`
StartOffset time.Duration `toml:"start_offset"` // offset from the start of the epoch
TxSubmitRetries int `toml:"tx_submit_retries"`
TxSubmitTimeout time.Duration `toml:"tx_submit_timeout"`
DataFetchRetries int `toml:"data_fetch_retries"`
DataFetchTimeout time.Duration `toml:"data_fetch_timeout"`
}
Expand Down Expand Up @@ -137,6 +140,8 @@ func newConfig() *ClientConfig {
SubmitSignatures: SubmitSignaturesConfig{
SubmitConfig: defaultSubmitConfig,
},
SubmitGas: GasConfig{GasPriceFixed: big.NewInt(0)},
RegisterGas: GasConfig{GasPriceFixed: big.NewInt(0)},
Uptime: UptimeConfig{
SigningWindow: 2,
},
Expand Down Expand Up @@ -187,5 +192,8 @@ func validateGasConfig(cfg *GasConfig) error {
if cfg.GasPriceFixed.Cmp(common.Big0) != 0 && cfg.GasPriceMultiplier != 0.0 {
return errors.New("only one of gas_price_fixed and gas_price_multiplier can be set to a non-zero value")
}
if cfg.GasPriceMultiplier != 0.0 && cfg.GasPriceMultiplier < 1 {
return errors.New("if set, gas_price_multiplier value cannot be less than 1")
}
return nil
}
2 changes: 1 addition & 1 deletion client/finalizer/relay_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type relayEthClientImpl struct {
}

func (eth relayEthClientImpl) SendRawTx(privateKey *ecdsa.PrivateKey, to common.Address, data []byte, dryRun bool) error {
return chain.SendRawTx(eth.client, privateKey, to, data, dryRun, &config.GasConfig{GasPriceFixed: common.Big0})
return chain.SendRawTx(eth.client, privateKey, to, data, dryRun, &config.GasConfig{GasPriceFixed: common.Big0}, chain.DefaultTxTimeout)
}

type signingPolicyListenerResponse struct {
Expand Down
5 changes: 3 additions & 2 deletions client/protocol/protocol_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSubmitter(t *testing.T) {

base := SubmitterBase{
ethClient: &ethClient,
gasConfig: &clientConfig.GasConfig{},
gasConfig: &clientConfig.GasConfig{GasPriceFixed: common.Big0},
protocolContext: &protocolContext{
submitPrivateKey: privKey,
signerPrivateKey: privKey,
Expand All @@ -76,6 +76,7 @@ func TestSubmitter(t *testing.T) {
epoch: &utils.Epoch{Start: time.Unix(0, 0), Period: time.Hour},
subProtocols: []*SubProtocol{subProtocol},
submitRetries: 1,
submitTimeout: 1 * time.Second,
dataFetchRetries: 1,
dataFetchTimeout: 1 * time.Second,
name: "test",
Expand Down Expand Up @@ -170,7 +171,7 @@ type sentTxInfo struct {
}

func (c *testEthClient) SendRawTx(
privateKey *ecdsa.PrivateKey, to common.Address, payload []byte, gasConfig *clientConfig.GasConfig,
privateKey *ecdsa.PrivateKey, to common.Address, payload []byte, _ *clientConfig.GasConfig, _ time.Duration,
) error {
c.sentTxs = append(c.sentTxs, &sentTxInfo{
privateKey: privateKey,
Expand Down
40 changes: 32 additions & 8 deletions client/protocol/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"flare-tlc/utils"
"flare-tlc/utils/chain"
"fmt"
"math"
"time"

mapset "github.com/deckarep/golang-set/v2"
Expand All @@ -31,24 +32,25 @@ type SubmitterBase struct {
subProtocols []*SubProtocol

startOffset time.Duration
submitRetries int // number of retries for submitting tx
name string // e.g., "submit1", "submit2", "submit3", "signatureSubmitter"
submitRetries int // number of retries for submitting tx
submitTimeout time.Duration // timeout for waiting for tx to be mined
name string // e.g., "submit1", "submit2", "submit3", "signatureSubmitter"
submitPrivateKey *ecdsa.PrivateKey

dataFetchRetries int // number of retries for fetching data of each provider
dataFetchTimeout time.Duration // timeout for fetching data of each provider
}

type submitterEthClient interface {
SendRawTx(*ecdsa.PrivateKey, common.Address, []byte, *config.GasConfig) error
SendRawTx(*ecdsa.PrivateKey, common.Address, []byte, *config.GasConfig, time.Duration) error
}

type submitterEthClientImpl struct {
ethClient *ethclient.Client
}

func (c submitterEthClientImpl) SendRawTx(privateKey *ecdsa.PrivateKey, to common.Address, payload []byte, gasConfig *config.GasConfig) error {
return chain.SendRawTx(c.ethClient, privateKey, to, payload, true, gasConfig)
func (c submitterEthClientImpl) SendRawTx(privateKey *ecdsa.PrivateKey, to common.Address, payload []byte, gasConfig *config.GasConfig, timeout time.Duration) error {
return chain.SendRawTx(c.ethClient, privateKey, to, payload, false, gasConfig, timeout)
}

type Submitter struct {
Expand All @@ -64,13 +66,15 @@ type SignatureSubmitter struct {
}

func (s *SubmitterBase) submit(payload []byte) bool {
sendResult := <-shared.ExecuteWithRetry(func() (any, error) {
err := s.ethClient.SendRawTx(s.submitPrivateKey, s.protocolContext.submitContractAddress, payload, s.gasConfig)
sendResult := <-shared.ExecuteWithRetryAttempts(func(ri int) (any, error) {
gasConfig := gasConfigForAttempt(s.gasConfig, ri)
logger.Debug("[Attempt %d] Submitter %s sending tx with gas config: %+v, timeout: %s", ri, s.name, gasConfig, s.submitTimeout)
err := s.ethClient.SendRawTx(s.submitPrivateKey, s.protocolContext.submitContractAddress, payload, gasConfig, s.submitTimeout)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error sending submit tx for submitter %s tx", s.name))
}
return nil, nil
}, s.submitRetries, shared.TxRetryInterval)
}, s.submitRetries, 1*time.Second)
if sendResult.Success {
logger.Info("Submitter %s successfully sent tx", s.name)
}
Expand Down Expand Up @@ -98,6 +102,7 @@ func newSubmitter(
subProtocols: subProtocols,
startOffset: submitCfg.StartOffset,
submitRetries: max(1, submitCfg.TxSubmitRetries),
submitTimeout: max(1*time.Second, submitCfg.TxSubmitTimeout),
name: name,
submitPrivateKey: pc.submitPrivateKey,
dataFetchRetries: submitCfg.DataFetchRetries,
Expand Down Expand Up @@ -176,6 +181,7 @@ func newSignatureSubmitter(
selector: selector,
subProtocols: subProtocols,
submitRetries: max(1, submitCfg.TxSubmitRetries),
submitTimeout: max(1*time.Second, submitCfg.TxSubmitTimeout),
name: "submitSignatures",
submitPrivateKey: pc.submitSignaturesPrivateKey,
dataFetchTimeout: submitCfg.DataFetchTimeout,
Expand Down Expand Up @@ -269,3 +275,21 @@ func (s *SignatureSubmitter) RunEpoch(currentEpoch int64) {
}
}
}

// gasConfigForAttempt bumps up the gas price multiplier for each retry attempt by 50%,
// up to a maximum of 10x the original value.
//
// Note: If GasPriceFixed is used, the retry multiplier will not be applied.
func gasConfigForAttempt(cfg *config.GasConfig, ri int) *config.GasConfig {
if cfg.GasPriceFixed.Cmp(common.Big0) != 0 {
return cfg
}

retryMultiplier := min(10.0, math.Pow(1.5, float64(ri)))

return &config.GasConfig{
GasPriceMultiplier: max(1.0, cfg.GasPriceMultiplier) * float32(retryMultiplier),
GasPriceFixed: cfg.GasPriceFixed,
GasLimit: cfg.GasLimit,
}
}
89 changes: 89 additions & 0 deletions client/protocol/submitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package protocol

import (
"flare-tlc/client/config"
"math/big"
"testing"
)

func TestGasConfigForAttempt(t *testing.T) {
tests := []struct {
name string
cfg config.GasConfig
ri int
expected config.GasConfig
}{
{
name: "retry 0",
cfg: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 1.0,
},
ri: 0,
expected: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 1.0,
},
},
{
name: "retry 1",
cfg: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 1.0,
},
ri: 1,
expected: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 1.5,
},
},
{
name: "retry 1 - no config",
cfg: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 0,
},
ri: 1,
expected: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 1.5,
},
},
{
name: "retry 2",
cfg: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 1.0,
},
ri: 2,
expected: config.GasConfig{
GasPriceFixed: big.NewInt(0),
GasPriceMultiplier: 2.25,
},
},
{
name: "retry 1 - fixed",
cfg: config.GasConfig{
GasPriceFixed: big.NewInt(100),
GasPriceMultiplier: 0,
},
ri: 1,
expected: config.GasConfig{
GasPriceFixed: big.NewInt(100),
GasPriceMultiplier: 0,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := gasConfigForAttempt(&tt.cfg, tt.ri)
if got.GasPriceFixed.Cmp(tt.expected.GasPriceFixed) != 0 {
t.Errorf("GasPriceFixed = %v, want %v", got.GasPriceFixed, tt.expected.GasPriceFixed)
}
if got.GasPriceMultiplier != tt.expected.GasPriceMultiplier {
t.Errorf("GasPriceMultiplier = %v, want %v", got.GasPriceMultiplier, tt.expected.GasPriceMultiplier)
}
})
}
}
18 changes: 18 additions & 0 deletions client/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ func ExecuteWithRetry[T any](f func() (T, error), maxRetries int, delay time.Dur
return out
}

func ExecuteWithRetryAttempts[T any](f func(int) (T, error), maxRetries int, delay time.Duration) <-chan ExecuteStatus[T] {
out := make(chan ExecuteStatus[T])
go func() {
for ri := 0; ri < maxRetries; ri++ {
result, err := f(ri)
if err == nil {
out <- ExecuteStatus[T]{Success: true, Value: result}
return
} else {
logger.Error("error executing in retry no. %d: %v", ri, err)
}
time.Sleep(delay)
}
out <- ExecuteStatus[T]{Success: false, Message: "max retries reached"}
}()
return out
}

// ExistsAsSubstring returns true if any of the strings in the slice is a substring of s
func ExistsAsSubstring(slice []string, s string) bool {
for _, item := range slice {
Expand Down
4 changes: 2 additions & 2 deletions utils/chain/tx_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func unpackError(result []byte) (string, error) {
return vs[0].(string), nil
}

func SendRawTx(client *ethclient.Client, privateKey *ecdsa.PrivateKey, toAddress common.Address, data []byte, dryRun bool, gasConfig *config.GasConfig) error {
func SendRawTx(client *ethclient.Client, privateKey *ecdsa.PrivateKey, toAddress common.Address, data []byte, dryRun bool, gasConfig *config.GasConfig, timeout time.Duration) error {
publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
Expand Down Expand Up @@ -133,7 +133,7 @@ func SendRawTx(client *ethclient.Client, privateKey *ecdsa.PrivateKey, toAddress
verifier := NewTxVerifier(client)

logger.Debug("Waiting for tx to be mined...")
err = verifier.WaitUntilMined(fromAddress, signedTx, DefaultTxTimeout)
err = verifier.WaitUntilMined(fromAddress, signedTx, timeout)
if err != nil {
return err
}
Expand Down

0 comments on commit 3b831a6

Please sign in to comment.