Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add loggers for ephemeral errors #1876

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 33 additions & 36 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/ephemeralerror"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand All @@ -57,20 +58,20 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
batchPostErrorLogger ephemeralerror.Logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already part of the BatchPoster struct, how about just errorLogger?

// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
Expand Down Expand Up @@ -238,17 +239,18 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe
return nil, err
}
b := &BatchPoster{
l1Reader: l1Reader,
inbox: inbox,
streamer: streamer,
syncMonitor: syncMonitor,
config: config,
bridge: bridge,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: deployInfo.SequencerInbox,
daWriter: daWriter,
redisLock: redisLock,
l1Reader: l1Reader,
inbox: inbox,
streamer: streamer,
syncMonitor: syncMonitor,
config: config,
bridge: bridge,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: deployInfo.SequencerInbox,
daWriter: daWriter,
redisLock: redisLock,
batchPostErrorLogger: ephemeralerror.NewTimeEphemeralErrorLogger(log.Debug, log.Error, time.Minute),
}
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
Expand Down Expand Up @@ -981,22 +983,17 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
posted, err := b.maybePostSequencerBatch(ctx)
ephemeralError := errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace)
if !ephemeralError {
b.firstEphemeralError = time.Time{}
b.batchPostErrorLogger.Reset()
}
if err != nil {
b.building = nil
logLevel := log.Error
if ephemeralError {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
logLevel = log.Debug
} else if time.Since(b.firstEphemeralError) < time.Minute {
logLevel = log.Debug
}
// The inbox tracker may just not be caught up.
// Use an ephemeral logger in case this error disappears naturally.
b.batchPostErrorLogger.Error("error posting batch", "err", err)
} else {
log.Error("error posting batch", "err", err)
}
logLevel("error posting batch", "err", err)
return b.config().ErrorDelay
} else if posted {
return 0
Expand Down
13 changes: 6 additions & 7 deletions das/syncing_fallback_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/ephemeralerror"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
Expand Down Expand Up @@ -370,25 +371,23 @@ func (s *l1SyncService) readMore(ctx context.Context) error {
func (s *l1SyncService) mainThread(ctx context.Context) {
headerChan, unsubscribe := s.l1Reader.Subscribe(false)
defer unsubscribe()
errCount := 0
syncErrorLogger := ephemeralerror.NewCountEphemeralErrorLogger(ephemeralerror.NoLog, log.Error, 5)
for {
err := s.readMore(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
errCount++
if errCount > 5 {
log.Error("error trying to sync from L1", "err", err)
}
syncErrorLogger.Error("error trying to sync from L1", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(s.config.DelayOnError * time.Duration(errCount)):
case <-time.After(s.config.DelayOnError):
}
continue
}
errCount = 0
syncErrorLogger.Reset()

if s.catchingUp {
// we're behind. Don't wait.
continue
Expand Down
91 changes: 91 additions & 0 deletions util/ephemeralerror/ephemeralerror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ephemeralerror

import (
"math"
"time"

"go.uber.org/atomic"
)

type LogFn func(msg string, ctx ...interface{})

func NoLog(msg string, ctx ...interface{}) {
}

const notTriggered = math.MinInt64

type Logger interface {
Error(msg string, ctx ...interface{})
Reset()
}

type TimeEphemeralErrorLogger struct {
logFnBeforeTriggered LogFn
logFnAfterTriggered LogFn
continuousDurationTrigger time.Duration

firstTriggerTime atomic.Int64
}

func NewTimeEphemeralErrorLogger(
logFnBeforeTriggered LogFn,
logFnAfterTriggered LogFn,
continuousDurationTrigger time.Duration,
) *TimeEphemeralErrorLogger {
e := TimeEphemeralErrorLogger{
logFnBeforeTriggered: logFnBeforeTriggered,
logFnAfterTriggered: logFnAfterTriggered,
continuousDurationTrigger: continuousDurationTrigger,
}
e.Reset()
return &e
}

func (e *TimeEphemeralErrorLogger) Error(msg string, ctx ...interface{}) {
now := time.Now()
first := e.firstTriggerTime.CompareAndSwap(notTriggered, now.UnixMicro())
if !first && e.firstTriggerTime.Load() < now.Add(-e.continuousDurationTrigger).UnixMicro() {
e.logFnAfterTriggered(msg, ctx)
} else {
e.logFnBeforeTriggered(msg, ctx)
}
}

func (e *TimeEphemeralErrorLogger) Reset() {
e.firstTriggerTime.Store(notTriggered)
}

type CountEphemeralErrorLogger struct {
logFnBeforeTriggered LogFn
logFnAfterTriggered LogFn
errorCountTrigger int64

errorCount atomic.Int64
}

func NewCountEphemeralErrorLogger(
logFnBeforeTriggered LogFn,
logFnAfterTriggered LogFn,
errorCountTrigger int64,
) *CountEphemeralErrorLogger {
e := CountEphemeralErrorLogger{
logFnBeforeTriggered: logFnBeforeTriggered,
logFnAfterTriggered: logFnAfterTriggered,
errorCountTrigger: errorCountTrigger,
}
e.Reset()
return &e
}

func (e *CountEphemeralErrorLogger) Error(msg string, ctx ...interface{}) {
if e.errorCount.Add(1) > e.errorCountTrigger {
e.logFnAfterTriggered(msg, ctx)
} else {
e.logFnBeforeTriggered(msg, ctx)
}

}

func (e *CountEphemeralErrorLogger) Reset() {
e.errorCount.Store(0)
}
80 changes: 80 additions & 0 deletions util/ephemeralerror/ephemeralerror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ephemeralerror

import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestCountEphemeralError(t *testing.T) {
var warnCount, errorCount atomic.Int64
e := NewCountEphemeralErrorLogger(
func(msg string, ctx ...interface{}) { warnCount.Add(1) },
func(msg string, ctx ...interface{}) { errorCount.Add(1) },
10,
)

for run := 0; run < 1000; run++ {
var errorCountTrigger, nEvents int64 = rand.Int63n(100), rand.Int63n(100)
e.errorCountTrigger = errorCountTrigger
expectedWarns := errorCountTrigger
expectedErrors := nEvents - expectedWarns

if expectedErrors < 0 {
expectedWarns = nEvents
expectedErrors = 0
}

wg := sync.WaitGroup{}
for i := int64(0); i < nEvents; i++ {
wg.Add(1)
go func() {
e.Error("bbq!")
wg.Done()
}()
}

wg.Wait()

if warnCount.Load() != expectedWarns || errorCount.Load() != expectedErrors {
t.Fatalf("unexpected warnCount, errorCount (%d, %d), expected (%d, %d), %d", warnCount.Load(), errorCount.Load(), expectedWarns, expectedErrors, nEvents)
}

e.Reset()
warnCount.Store(0)
errorCount.Store(0)
}
}

func TestTimeEphemeralError(t *testing.T) {
var warnCount, errorCount atomic.Int64
e := NewTimeEphemeralErrorLogger(
func(msg string, ctx ...interface{}) { warnCount.Add(1) },
func(msg string, ctx ...interface{}) { errorCount.Add(1) },
time.Second,
)

for run := 0; run < 10; run++ {
totalDuration := (time.Duration(rand.Int63n(20)) + 5) * time.Millisecond * 50
e.continuousDurationTrigger = totalDuration

var expectedWarns, expectedErrors int64 = rand.Int63n(9) + 1, rand.Int63n(9) + 1
totalEvents := expectedWarns + expectedErrors
period := totalDuration / time.Duration(expectedWarns)
for i := int64(0); i < totalEvents; i++ {
e.Error("bbq!")
time.Sleep(period)
}

if warnCount.Load() != expectedWarns || errorCount.Load() != expectedErrors {
t.Fatalf("unexpected warnCount, errorCount (%d, %d), expected (%d, %d), %v, %v", warnCount.Load(), errorCount.Load(), expectedWarns, expectedErrors, totalDuration, period)
}

e.Reset()
warnCount.Store(0)
errorCount.Store(0)
}

}