Skip to content

Commit

Permalink
Merge branch 'master' into merkle-perf
Browse files Browse the repository at this point in the history
  • Loading branch information
eljobe committed May 27, 2024
2 parents db24c7c + 3deb9ca commit 2efab2a
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 38 deletions.
13 changes: 8 additions & 5 deletions cmd/daserver/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (
)

type DAServerConfig struct {
EnableRPC bool `koanf:"enable-rpc"`
RPCAddr string `koanf:"rpc-addr"`
RPCPort uint64 `koanf:"rpc-port"`
RPCServerTimeouts genericconf.HTTPServerTimeoutConfig `koanf:"rpc-server-timeouts"`
EnableRPC bool `koanf:"enable-rpc"`
RPCAddr string `koanf:"rpc-addr"`
RPCPort uint64 `koanf:"rpc-port"`
RPCServerTimeouts genericconf.HTTPServerTimeoutConfig `koanf:"rpc-server-timeouts"`
RPCServerBodyLimit int `koanf:"rpc-server-body-limit"`

EnableREST bool `koanf:"enable-rest"`
RESTAddr string `koanf:"rest-addr"`
Expand All @@ -58,6 +59,7 @@ var DefaultDAServerConfig = DAServerConfig{
RPCAddr: "localhost",
RPCPort: 9876,
RPCServerTimeouts: genericconf.HTTPServerTimeoutConfigDefault,
RPCServerBodyLimit: genericconf.HTTPServerBodyLimitDefault,
EnableREST: false,
RESTAddr: "localhost",
RESTPort: 9877,
Expand Down Expand Up @@ -88,6 +90,7 @@ func parseDAServer(args []string) (*DAServerConfig, error) {
f.Bool("enable-rpc", DefaultDAServerConfig.EnableRPC, "enable the HTTP-RPC server listening on rpc-addr and rpc-port")
f.String("rpc-addr", DefaultDAServerConfig.RPCAddr, "HTTP-RPC server listening interface")
f.Uint64("rpc-port", DefaultDAServerConfig.RPCPort, "HTTP-RPC server listening port")
f.Int("rpc-server-body-limit", DefaultDAServerConfig.RPCServerBodyLimit, "HTTP-RPC server maximum request body size in bytes; the default (0) uses geth's 5MB limit")
genericconf.HTTPServerTimeoutConfigAddOptions("rpc-server-timeouts", f)

f.Bool("enable-rest", DefaultDAServerConfig.EnableREST, "enable the REST server listening on rest-addr and rest-port")
Expand Down Expand Up @@ -250,7 +253,7 @@ func startup() error {
if serverConfig.EnableRPC {
log.Info("Starting HTTP-RPC server", "addr", serverConfig.RPCAddr, "port", serverConfig.RPCPort, "revision", vcsRevision, "vcs.time", vcsTime)

rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, daReader, daWriter, daHealthChecker)
rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, daReader, daWriter, daHealthChecker)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/genericconf/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var HTTPServerTimeoutConfigDefault = HTTPServerTimeoutConfig{
IdleTimeout: 120 * time.Second,
}

const HTTPServerBodyLimitDefault = 0 // Use default from go-ethereum

func (c HTTPConfig) Apply(stackConf *node.Config) {
stackConf.HTTPHost = c.Addr
stackConf.HTTPPort = c.Port
Expand Down
9 changes: 6 additions & 3 deletions das/dasRpcServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,22 @@ type DASRPCServer struct {
daHealthChecker DataAvailabilityServiceHealthChecker
}

func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) {
func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) {
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr, portNum))
if err != nil {
return nil, err
}
return StartDASRPCServerOnListener(ctx, listener, rpcServerTimeouts, daReader, daWriter, daHealthChecker)
return StartDASRPCServerOnListener(ctx, listener, rpcServerTimeouts, rpcServerBodyLimit, daReader, daWriter, daHealthChecker)
}

func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) {
func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader DataAvailabilityServiceReader, daWriter DataAvailabilityServiceWriter, daHealthChecker DataAvailabilityServiceHealthChecker) (*http.Server, error) {
if daWriter == nil {
return nil, errors.New("No writer backend was configured for DAS RPC server. Has the BLS signing key been set up (--data-availability.key.key-dir or --data-availability.key.priv-key options)?")
}
rpcServer := rpc.NewServer()
if rpcServerBodyLimit > 0 {
rpcServer.SetHTTPBodyLimit(rpcServerBodyLimit)
}
err := rpcServer.RegisterName("das", &DASRPCServer{
daReader: daReader,
daWriter: daWriter,
Expand Down
2 changes: 1 addition & 1 deletion das/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestRPC(t *testing.T) {
testhelpers.RequireImpl(t, err)
localDas, err := NewSignAfterStoreDASWriterWithSeqInboxCaller(privKey, nil, storageService, "")
testhelpers.RequireImpl(t, err)
dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, storageService, localDas, storageService)
dasServer, err := StartDASRPCServerOnListener(ctx, lis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, storageService, localDas, storageService)
defer func() {
if err := dasServer.Shutdown(ctx); err != nil {
panic(err)
Expand Down
57 changes: 56 additions & 1 deletion execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@
package gethexec

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/big"
"os"
"path"
"runtime/debug"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -76,6 +82,7 @@ type SequencerConfig struct {
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"`
ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"`
EnableProfiling bool `koanf:"enable-profiling"`
expectedSurplusSoftThreshold int
expectedSurplusHardThreshold int
}
Expand Down Expand Up @@ -125,6 +132,7 @@ var DefaultSequencerConfig = SequencerConfig{
NonceFailureCacheExpiry: time.Second,
ExpectedSurplusSoftThreshold: "default",
ExpectedSurplusHardThreshold: "default",
EnableProfiling: true,
}

var TestSequencerConfig = SequencerConfig{
Expand All @@ -142,6 +150,7 @@ var TestSequencerConfig = SequencerConfig{
NonceFailureCacheExpiry: time.Second,
ExpectedSurplusSoftThreshold: "default",
ExpectedSurplusHardThreshold: "default",
EnableProfiling: false,
}

func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -159,6 +168,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high")
f.String(prefix+".expected-surplus-soft-threshold", DefaultSequencerConfig.ExpectedSurplusSoftThreshold, "if expected surplus is lower than this value, warnings are posted")
f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, new incoming transactions will be denied")
f.Bool(prefix+".enable-profiling", DefaultSequencerConfig.EnableProfiling, "enable CPU profiling and tracing")
}

type txQueueItem struct {
Expand Down Expand Up @@ -327,6 +337,7 @@ type Sequencer struct {
expectedSurplusMutex sync.RWMutex
expectedSurplus int64
expectedSurplusUpdated bool
enableProfiling bool
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand All @@ -353,6 +364,7 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
enableProfiling: config.EnableProfiling,
}
s.nonceFailures = &nonceFailureCache{
containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict),
Expand Down Expand Up @@ -758,6 +770,44 @@ func (s *Sequencer) precheckNonces(queueItems []txQueueItem) []txQueueItem {
return outputQueueItems
}

// createBlockWithProfiling runs create block with tracing and CPU profiling
// enabled. If the block creation takes longer than 5 seconds, it keeps both
// and prints out filenames in an error log line.
func (s *Sequencer) createBlockWithProfiling(ctx context.Context) bool {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res := s.createBlock(ctx)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res
}
return res
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Debug("Block creation took longer than 5 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBatchSize int
Expand Down Expand Up @@ -1088,7 +1138,12 @@ func (s *Sequencer) Start(ctxIn context.Context) error {

s.CallIteratively(func(ctx context.Context) time.Duration {
nextBlock := time.Now().Add(s.config().MaxBlockSpeed)
madeBlock := s.createBlock(ctx)
var madeBlock bool
if s.enableProfiling {
madeBlock = s.createBlockWithProfiling(ctx)
} else {
madeBlock = s.createBlock(ctx)
}
if madeBlock {
// Note: this may return a negative duration, but timers are fine with that (they treat negative durations as 0).
return time.Until(nextBlock)
Expand Down
29 changes: 29 additions & 0 deletions pubsub/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pubsub

import (
"context"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
)

// CreateStream tries to create stream with given name, if it already exists
// does not return an error.
func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error {
_, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result()
if err != nil && !StreamExists(ctx, streamName, client) {
return err
}
return nil
}

// StreamExists returns whether there are any consumer group for specified
// redis stream.
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
return false
}
return got != nil
}
8 changes: 8 additions & 0 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func heartBeatKey(id string) string {
return fmt.Sprintf("consumer:%s:heartbeat", id)
}

func (c *Consumer[Request, Response]) RedisClient() redis.UniversalClient {
return c.client
}

func (c *Consumer[Request, Response]) StreamName() string {
return c.redisStream
}

func (c *Consumer[Request, Response]) heartBeatKey() string {
return heartBeatKey(c.id)
}
Expand Down
2 changes: 1 addition & 1 deletion staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
}
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
err := v.redisValidator.Initialize(ctx, moduleRoots)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions system_tests/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops
redisURL = redisutil.CreateTestRedis(ctx, t)
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL
} else {
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.ValidationClientConfig{}
}

AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL)
Expand Down
2 changes: 1 addition & 1 deletion system_tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ func setupConfigWithDAS(
Require(t, err)
restLis, err := net.Listen("tcp", "localhost:0")
Require(t, err)
_, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daWriter, daHealthChecker)
_, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, daReader, daWriter, daHealthChecker)
Require(t, err)
_, err = das.NewRestfulDasServerOnListener(restLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daHealthChecker)
Require(t, err)
Expand Down
4 changes: 2 additions & 2 deletions system_tests/das_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func startLocalDASServer(
Require(t, err)
rpcLis, err := net.Listen("tcp", "localhost:0")
Require(t, err)
rpcServer, err := das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, storageService, daWriter, storageService)
rpcServer, err := das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, storageService, daWriter, storageService)
Require(t, err)
restLis, err := net.Listen("tcp", "localhost:0")
Require(t, err)
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) {
defer lifecycleManager.StopAndWaitUntil(time.Second)
rpcLis, err := net.Listen("tcp", "localhost:0")
Require(t, err)
_, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daWriter, daHealthChecker)
_, err = das.StartDASRPCServerOnListener(ctx, rpcLis, genericconf.HTTPServerTimeoutConfigDefault, genericconf.HTTPServerBodyLimitDefault, daReader, daWriter, daHealthChecker)
Require(t, err)
restLis, err := net.Listen("tcp", "localhost:0")
Require(t, err)
Expand Down
13 changes: 12 additions & 1 deletion validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ValidationClientConfig struct {
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
CreateStreams bool `koanf:"create-streams"`
}

func (c ValidationClientConfig) Enabled() bool {
Expand All @@ -34,19 +35,22 @@ var DefaultValidationClientConfig = ValidationClientConfig{
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.DefaultProducerConfig,
CreateStreams: true,
}

var TestValidationClientConfig = ValidationClientConfig{
Name: "test redis validation client",
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}

func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}

// ValidationClient implements validation client through redis streams.
Expand All @@ -59,6 +63,7 @@ type ValidationClient struct {
producerConfig pubsub.ProducerConfig
redisClient redis.UniversalClient
moduleRoots []common.Hash
createStreams bool
}

func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) {
Expand All @@ -75,11 +80,17 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error)
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
producerConfig: cfg.ProducerConfig,
redisClient: redisClient,
createStreams: cfg.CreateStreams,
}, nil
}

func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error {
func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
for _, mr := range moduleRoots {
if c.createStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already existsw for module root", "hash", mr)
continue
Expand Down
2 changes: 1 addition & 1 deletion validator/server_common/machine_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) {
for _, dir := range dirs {
fInfo, err := os.Stat(dir)
if err != nil {
log.Warn("Getting file info", "error", err)
log.Warn("Getting file info", "dir", dir, "error", err)
continue
}
if !fInfo.IsDir() {
Expand Down
Loading

0 comments on commit 2efab2a

Please sign in to comment.