Skip to content

Commit

Permalink
Merge pull request #2366 from OffchainLabs/increase-profiling-threshold
Browse files Browse the repository at this point in the history
Profile only SequenceTransactions [NIT-2578]
  • Loading branch information
PlasmaPower authored Jun 5, 2024
2 parents f0b1013 + d77f405 commit 1bb0cd2
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 63 deletions.
44 changes: 44 additions & 0 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ package gethexec
*/
import "C"
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path"
"runtime/pprof"
"runtime/trace"
"sync"
"testing"
"time"
Expand All @@ -27,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbos"
"github.com/offchainlabs/nitro/arbos/arbosState"
"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -334,6 +340,44 @@ func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMess
})
}

// SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and
// CPU profiling enabled. If the block creation takes longer than 2 seconds, it
// keeps both and prints out filenames in an error log line.
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res, err := s.SequenceTransactions(header, txes, hooks)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res, err
}
return res, err
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Info("Transactions sequencing took longer than 2 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
Expand Down
84 changes: 21 additions & 63 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@
package gethexec

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/big"
"os"
"path"
"runtime/debug"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -82,7 +76,7 @@ type SequencerConfig struct {
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"`
ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"`
EnableProfiling bool `koanf:"enable-profiling"`
EnableProfiling bool `koanf:"enable-profiling" reload:"hot"`
expectedSurplusSoftThreshold int
expectedSurplusHardThreshold int
}
Expand Down Expand Up @@ -341,7 +335,6 @@ type Sequencer struct {
expectedSurplusMutex sync.RWMutex
expectedSurplus int64
expectedSurplusUpdated bool
enableProfiling bool
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand All @@ -368,7 +361,6 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
enableProfiling: config.EnableProfiling,
}
s.nonceFailures = &nonceFailureCache{
containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict),
Expand Down Expand Up @@ -416,11 +408,12 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context
}

func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
config := s.config()
// Only try to acquire Rlock and check for hard threshold if l1reader is not nil
// And hard threshold was enabled, this prevents spamming of read locks when not needed
if s.l1Reader != nil && s.config().ExpectedSurplusHardThreshold != "default" {
if s.l1Reader != nil && config.ExpectedSurplusHardThreshold != "default" {
s.expectedSurplusMutex.RLock()
if s.expectedSurplusUpdated && s.expectedSurplus < int64(s.config().expectedSurplusHardThreshold) {
if s.expectedSurplusUpdated && s.expectedSurplus < int64(config.expectedSurplusHardThreshold) {
return errors.New("currently not accepting transactions due to expected surplus being below threshold")
}
s.expectedSurplusMutex.RUnlock()
Expand Down Expand Up @@ -459,7 +452,7 @@ func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Tran
return err
}

queueTimeout := s.config().QueueTimeout
queueTimeout := config.QueueTimeout
queueCtx, cancelFunc := ctxWithTimeout(parentCtx, queueTimeout)
defer cancelFunc()

Expand Down Expand Up @@ -787,44 +780,6 @@ func (s *Sequencer) precheckNonces(queueItems []txQueueItem, totalBlockSize int)
return outputQueueItems
}

// createBlockWithProfiling runs create block with tracing and CPU profiling
// enabled. If the block creation takes longer than 5 seconds, it keeps both
// and prints out filenames in an error log line.
func (s *Sequencer) createBlockWithProfiling(ctx context.Context) bool {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res := s.createBlock(ctx)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res
}
return res
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Debug("Block creation took longer than 5 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBlockSize int
Expand Down Expand Up @@ -972,7 +927,15 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
}

start := time.Now()
block, err := s.execEngine.SequenceTransactions(header, txes, hooks)
var (
block *types.Block
err error
)
if config.EnableProfiling {
block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
} else {
block, err = s.execEngine.SequenceTransactions(header, txes, hooks)
}
elapsed := time.Since(start)
blockCreationTimer.Update(elapsed)
if elapsed >= time.Second*5 {
Expand Down Expand Up @@ -1104,16 +1067,17 @@ func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) {
unusedL1GasChargeGauge.Update(backlogL1GasCharged)
currentSurplusGauge.Update(surplus)
expectedSurplusGauge.Update(expectedSurplus)
if s.config().ExpectedSurplusSoftThreshold != "default" && expectedSurplus < int64(s.config().expectedSurplusSoftThreshold) {
log.Warn("expected surplus is below soft threshold", "value", expectedSurplus, "threshold", s.config().expectedSurplusSoftThreshold)
config := s.config()
if config.ExpectedSurplusSoftThreshold != "default" && expectedSurplus < int64(config.expectedSurplusSoftThreshold) {
log.Warn("expected surplus is below soft threshold", "value", expectedSurplus, "threshold", config.expectedSurplusSoftThreshold)
}
return expectedSurplus, nil
}

func (s *Sequencer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)

if (s.config().ExpectedSurplusHardThreshold != "default" || s.config().ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil {
config := s.config()
if (config.ExpectedSurplusHardThreshold != "default" || config.ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil {
return errors.New("expected surplus soft/hard thresholds are enabled but l1Reader is nil")
}

Expand All @@ -1125,7 +1089,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error {

expectedSurplus, err := s.updateExpectedSurplus(ctxIn)
if err != nil {
if s.config().ExpectedSurplusHardThreshold != "default" {
if config.ExpectedSurplusHardThreshold != "default" {
return fmt.Errorf("expected-surplus-hard-threshold is enabled but error fetching initial expected surplus value: %w", err)
}
log.Error("expected-surplus-soft-threshold is enabled but error fetching initial expected surplus value", "err", err)
Expand Down Expand Up @@ -1168,13 +1132,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error {

s.CallIteratively(func(ctx context.Context) time.Duration {
nextBlock := time.Now().Add(s.config().MaxBlockSpeed)
var madeBlock bool
if s.enableProfiling {
madeBlock = s.createBlockWithProfiling(ctx)
} else {
madeBlock = s.createBlock(ctx)
}
if madeBlock {
if s.createBlock(ctx) {
// Note: this may return a negative duration, but timers are fine with that (they treat negative durations as 0).
return time.Until(nextBlock)
}
Expand Down

0 comments on commit 1bb0cd2

Please sign in to comment.