Skip to content

Commit

Permalink
chore(*): few tiny fixes (#2626)
Browse files Browse the repository at this point in the history
Few fixes and improvements:
- add a `expbackoff.Retry` util function. (will refactor existing
retries to this in next PR)
 - add debug logs to relayer cursors
 - Enable chaos errors in relayer.

issue: none
  • Loading branch information
corverroos authored Dec 4, 2024
1 parent 6b8a0a5 commit 1be29e1
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 13 deletions.
11 changes: 5 additions & 6 deletions e2e/app/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/omni-network/omni/e2e/docker"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/expbackoff"
"github.com/omni-network/omni/lib/log"
)

Expand Down Expand Up @@ -55,10 +56,8 @@ func CleanupDir(ctx context.Context, dir string) error {
}
}

err = os.RemoveAll(dir)
if err != nil {
return errors.Wrap(err, "remove dir")
}

return nil
// Retry remove all since it sometimes fails due to temp file locks.
return expbackoff.Retry(ctx, func() error {
return os.RemoveAll(dir)
})
}
2 changes: 1 addition & 1 deletion lib/chaos/chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

var (
networkErrorProb = map[netconf.ID]float64{
netconf.Devnet: 0.001, // 0.1% error rate in devnet (1 in 1_000)
netconf.Devnet: 0.05, // 5% error rate in devnet (5 in 100)
netconf.Staging: 0.001, // 0.1% error rate in staging (1 in 1_000)
netconf.Omega: 0.0001, // 0.01% error rate in omega (1 in 10_000)
netconf.Mainnet: 0.00, // No chaos errors in mainnet
Expand Down
4 changes: 4 additions & 0 deletions lib/expbackoff/expbackoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Config struct {
Jitter float64
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration

// retryCount defines the number of retries. Note this is only applicable to Retry.
retryCount int
}

// DefaultConfig is a backoff configuration with the default values specified
Expand All @@ -42,6 +45,7 @@ var DefaultConfig = Config{
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 120 * time.Second,
retryCount: defaultRetries,
}

// FastConfig is a common configuration for fast backoff.
Expand Down
46 changes: 46 additions & 0 deletions lib/expbackoff/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package expbackoff

import (
"context"

"github.com/omni-network/omni/lib/errors"
)

const defaultRetries = 3

// WithRetryCount sets the number of retries to n.
// Note this is only applicable for use with Retry.
func WithRetryCount(n int) func(config *Config) {
return func(c *Config) {
c.retryCount = n
}
}

// Retry calls the provided function multiple times (default=3) with backoff until:
// - The function returns nil (Retry returns nil)
// - The context is canceled (Retry returns the context error)
// - The retry count is exhausted (Retry returns the last error).
func Retry(ctx context.Context, fn func() error, opts ...func(*Config)) error {
var remaining int // Workaround to extract retry count from options.
opts = append(opts, func(c *Config) {
remaining = c.retryCount
})

backoff := New(ctx, opts...)
for {
remaining--

err := fn()
if err == nil {
return nil
} else if remaining <= 0 {
return errors.Wrap(err, "max retries")
}

backoff()

if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "retry timeout")
}
}
}
60 changes: 60 additions & 0 deletions lib/expbackoff/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//nolint:paralleltest // Parallel tests not supported since test-alias globals are used.
package expbackoff_test

import (
"context"
"io"
"testing"
"time"

"github.com/omni-network/omni/lib/expbackoff"

"github.com/stretchr/testify/require"
)

func Test(t *testing.T) {
// Disable backoff for testing.
expbackoff.SetAfterForT(t, func(d time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)
ch <- time.Now()

return ch
})

ctx := context.Background()

t.Run("default", func(t *testing.T) {
var count int
err := expbackoff.Retry(ctx, func() error {
count++
return io.EOF
})
require.ErrorIs(t, err, io.EOF)
require.Equal(t, 3, count) // Default backoff count
})

t.Run("with count", func(t *testing.T) {
const maxRetries = 5
var count int
err := expbackoff.Retry(ctx, func() error {
count++
return io.EOF
}, expbackoff.WithRetryCount(maxRetries))
require.ErrorIs(t, err, io.EOF)
require.Equal(t, maxRetries, count) // Default backoff count
})

t.Run("context cancel", func(t *testing.T) {
// Cancel the context
ctx, cancel := context.WithCancel(ctx)
cancel()

var count int
err := expbackoff.Retry(ctx, func() error {
count++
return io.EOF
})
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, 1, count) // No retries
})
}
2 changes: 2 additions & 0 deletions relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/omni-network/omni/halo/genutil/evm/predeploys"
"github.com/omni-network/omni/lib/buildinfo"
cprovider "github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/chaos"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/log"
Expand All @@ -28,6 +29,7 @@ func Run(ctx context.Context, cfg Config) error {
log.Info(ctx, "Starting relayer")

buildinfo.Instrument(ctx)
ctx = chaos.WithErrProbability(ctx, cfg.Network)

// Start metrics first, so app is "up"
monitorChan := serveMonitoring(cfg.MonitoringAddr)
Expand Down
15 changes: 9 additions & 6 deletions relayer/app/cursor/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (s *Store) Insert(
offsetsByShard[uint64(streamID.ShardID)] = msgs[len(msgs)-1].StreamOffset
}

ctx = log.WithCtx(ctx,
"src_chain_version", s.network.ChainVersionName(srcVersion),
"dest_chain", s.network.ChainName(destChain),
"attest_offset", attestOffset,
"stream_offsets", offsetsByShard,
)

c := &Cursor{
SrcChainId: srcVersion.ID,
ConfLevel: uint32(srcVersion.ConfLevel),
Expand All @@ -134,13 +141,9 @@ func (s *Store) Insert(
existing, err := s.db.Get(ctx, srcVersion.ID, uint32(srcVersion.ConfLevel), destChain, attestOffset)
if err != nil {
return errors.Wrap(err, "get cursor")
} else if !maps.Equal(offsetsByShard, existing.GetStreamOffsetsByShard()) { // For now just log a bug if this happens.
log.Error(ctx, "Unexpected existing cursor offset mismatch [BUG]", nil,
"src_chain_version", s.network.ChainVersionName(srcVersion),
"dest_chain", s.network.ChainName(destChain),
"attest_offset", attestOffset,
} else if !maps.Equal(offsetsByShard, existing.GetStreamOffsetsByShard()) { // For now just log an error if this happens.
log.Error(ctx, "Unexpected existing cursor offset mismatch", nil,
"existing", existing.GetStreamOffsetsByShard(),
"new", offsetsByShard,
)
}

Expand Down

0 comments on commit 1be29e1

Please sign in to comment.