Skip to content

Commit

Permalink
Add loggers for ephemeral errors
Browse files Browse the repository at this point in the history
These loggers can be set up to log to one underlying logging function if
if the time or count threshold since the first time it was logged with
has not yet been reached, and another log function for if the threshold
has been reached. An example use case would be to log a message at Debug
level, until the same message has been seen 5 times, or for 1 minute,
after which it would be logged at Error level until reset.
  • Loading branch information
Tristan-Wilson committed Sep 21, 2023
1 parent b359dbc commit a322ae1
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 43 deletions.
69 changes: 33 additions & 36 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/ephemeralerror"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand All @@ -57,20 +58,20 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
batchPostErrorLogger ephemeralerror.Logger
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
Expand Down Expand Up @@ -238,17 +239,18 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe
return nil, err
}
b := &BatchPoster{
l1Reader: l1Reader,
inbox: inbox,
streamer: streamer,
syncMonitor: syncMonitor,
config: config,
bridge: bridge,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: deployInfo.SequencerInbox,
daWriter: daWriter,
redisLock: redisLock,
l1Reader: l1Reader,
inbox: inbox,
streamer: streamer,
syncMonitor: syncMonitor,
config: config,
bridge: bridge,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: deployInfo.SequencerInbox,
daWriter: daWriter,
redisLock: redisLock,
batchPostErrorLogger: ephemeralerror.NewTimeEphemeralErrorLogger(log.Debug, log.Error, time.Minute),
}
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
Expand Down Expand Up @@ -981,22 +983,17 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
posted, err := b.maybePostSequencerBatch(ctx)
ephemeralError := errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace)
if !ephemeralError {
b.firstEphemeralError = time.Time{}
b.batchPostErrorLogger.Reset()
}
if err != nil {
b.building = nil
logLevel := log.Error
if ephemeralError {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
logLevel = log.Debug
} else if time.Since(b.firstEphemeralError) < time.Minute {
logLevel = log.Debug
}
// The inbox tracker may just not be caught up.
// Use an ephemeral logger in case this error disappears naturally.
b.batchPostErrorLogger.Error("error posting batch", "err", err)
} else {
log.Error("error posting batch", "err", err)
}
logLevel("error posting batch", "err", err)
return b.config().ErrorDelay
} else if posted {
return 0
Expand Down
13 changes: 6 additions & 7 deletions das/syncing_fallback_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/ephemeralerror"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
Expand Down Expand Up @@ -370,25 +371,23 @@ func (s *l1SyncService) readMore(ctx context.Context) error {
func (s *l1SyncService) mainThread(ctx context.Context) {
headerChan, unsubscribe := s.l1Reader.Subscribe(false)
defer unsubscribe()
errCount := 0
syncErrorLogger := ephemeralerror.NewCountEphemeralErrorLogger(ephemeralerror.NoLog, log.Error, 5)
for {
err := s.readMore(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
errCount++
if errCount > 5 {
log.Error("error trying to sync from L1", "err", err)
}
syncErrorLogger.Error("error trying to sync from L1", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(s.config.DelayOnError * time.Duration(errCount)):
case <-time.After(s.config.DelayOnError):
}
continue
}
errCount = 0
syncErrorLogger.Reset()

if s.catchingUp {
// we're behind. Don't wait.
continue
Expand Down
91 changes: 91 additions & 0 deletions util/ephemeralerror/ephemeralerror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ephemeralerror

import (
"math"
"time"

"go.uber.org/atomic"
)

type LogFn func(msg string, ctx ...interface{})

func NoLog(msg string, ctx ...interface{}) {
}

const notTriggered = math.MinInt64

type Logger interface {
Error(msg string, ctx ...interface{})
Reset()
}

type TimeEphemeralErrorLogger struct {
logFnBeforeTriggered LogFn
logFnAfterTriggered LogFn
continuousDurationTrigger time.Duration

firstTriggerTime atomic.Int64
}

func NewTimeEphemeralErrorLogger(
logFnBeforeTriggered LogFn,
logFnAfterTriggered LogFn,
continuousDurationTrigger time.Duration,
) *TimeEphemeralErrorLogger {
e := TimeEphemeralErrorLogger{
logFnBeforeTriggered: logFnBeforeTriggered,
logFnAfterTriggered: logFnAfterTriggered,
continuousDurationTrigger: continuousDurationTrigger,
}
e.Reset()
return &e
}

func (e *TimeEphemeralErrorLogger) Error(msg string, ctx ...interface{}) {
now := time.Now()
first := e.firstTriggerTime.CompareAndSwap(notTriggered, now.UnixMicro())
if !first && e.firstTriggerTime.Load() < now.Add(-e.continuousDurationTrigger).UnixMicro() {
e.logFnAfterTriggered(msg, ctx)
} else {
e.logFnBeforeTriggered(msg, ctx)
}
}

func (e *TimeEphemeralErrorLogger) Reset() {
e.firstTriggerTime.Store(notTriggered)
}

type CountEphemeralErrorLogger struct {
logFnBeforeTriggered LogFn
logFnAfterTriggered LogFn
errorCountTrigger int64

errorCount atomic.Int64
}

func NewCountEphemeralErrorLogger(
logFnBeforeTriggered LogFn,
logFnAfterTriggered LogFn,
errorCountTrigger int64,
) *CountEphemeralErrorLogger {
e := CountEphemeralErrorLogger{
logFnBeforeTriggered: logFnBeforeTriggered,
logFnAfterTriggered: logFnAfterTriggered,
errorCountTrigger: errorCountTrigger,
}
e.Reset()
return &e
}

func (e *CountEphemeralErrorLogger) Error(msg string, ctx ...interface{}) {
if e.errorCount.Add(1) > e.errorCountTrigger {
e.logFnAfterTriggered(msg, ctx)
} else {
e.logFnBeforeTriggered(msg, ctx)
}

}

func (e *CountEphemeralErrorLogger) Reset() {
e.errorCount.Store(0)
}
80 changes: 80 additions & 0 deletions util/ephemeralerror/ephemeralerror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ephemeralerror

import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestCountEphemeralError(t *testing.T) {
var warnCount, errorCount atomic.Int64
e := NewCountEphemeralErrorLogger(
func(msg string, ctx ...interface{}) { warnCount.Add(1) },
func(msg string, ctx ...interface{}) { errorCount.Add(1) },
10,
)

for run := 0; run < 1000; run++ {
var errorCountTrigger, nEvents int64 = rand.Int63n(100), rand.Int63n(100)
e.errorCountTrigger = errorCountTrigger
expectedWarns := errorCountTrigger
expectedErrors := nEvents - expectedWarns

if expectedErrors < 0 {
expectedWarns = nEvents
expectedErrors = 0
}

wg := sync.WaitGroup{}
for i := int64(0); i < nEvents; i++ {
wg.Add(1)
go func() {
e.Error("bbq!")
wg.Done()
}()
}

wg.Wait()

if warnCount.Load() != expectedWarns || errorCount.Load() != expectedErrors {
t.Fatalf("unexpected warnCount, errorCount (%d, %d), expected (%d, %d), %d", warnCount.Load(), errorCount.Load(), expectedWarns, expectedErrors, nEvents)
}

e.Reset()
warnCount.Store(0)
errorCount.Store(0)
}
}

func TestTimeEphemeralError(t *testing.T) {
var warnCount, errorCount atomic.Int64
e := NewTimeEphemeralErrorLogger(
func(msg string, ctx ...interface{}) { warnCount.Add(1) },
func(msg string, ctx ...interface{}) { errorCount.Add(1) },
time.Second,
)

for run := 0; run < 10; run++ {
totalDuration := (time.Duration(rand.Int63n(20)) + 5) * time.Millisecond * 50
e.continuousDurationTrigger = totalDuration

var expectedWarns, expectedErrors int64 = rand.Int63n(9) + 1, rand.Int63n(9) + 1
totalEvents := expectedWarns + expectedErrors
period := totalDuration / time.Duration(expectedWarns)
for i := int64(0); i < totalEvents; i++ {
e.Error("bbq!")
time.Sleep(period)
}

if warnCount.Load() != expectedWarns || errorCount.Load() != expectedErrors {
t.Fatalf("unexpected warnCount, errorCount (%d, %d), expected (%d, %d), %v, %v", warnCount.Load(), errorCount.Load(), expectedWarns, expectedErrors, totalDuration, period)
}

e.Reset()
warnCount.Store(0)
errorCount.Store(0)
}

}

0 comments on commit a322ae1

Please sign in to comment.