From 7985488ab71341ea4ac7ff4e82ba7b4ec8b9b595 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Wed, 13 Dec 2023 19:42:57 +0530 Subject: [PATCH 01/15] Pause nitro node block validator validation when memory is running low --- .../resourcemanager/resource_management.go | 32 ++++++------- .../resource_management_test.go | 8 ++-- staker/block_validator.go | 47 +++++++++++++++++++ 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/arbnode/resourcemanager/resource_management.go b/arbnode/resourcemanager/resource_management.go index cb1ae9d6ea..aba823cc25 100644 --- a/arbnode/resourcemanager/resource_management.go +++ b/arbnode/resourcemanager/resource_management.go @@ -39,14 +39,14 @@ func Init(conf *Config) error { return nil } - limit, err := parseMemLimit(conf.MemFreeLimit) + limit, err := ParseMemLimit(conf.MemFreeLimit) if err != nil { return err } node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) { - var c limitChecker - c, err := newCgroupsMemoryLimitCheckerIfSupported(limit) + var c LimitChecker + c, err := NewCgroupsMemoryLimitCheckerIfSupported(limit) if errors.Is(err, errNotSupported) { log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling") c = &trivialLimitChecker{} @@ -57,7 +57,7 @@ func Init(conf *Config) error { return nil } -func parseMemLimit(limitStr string) (int, error) { +func ParseMemLimit(limitStr string) (int, error) { var ( limit int = 1 s string @@ -105,10 +105,10 @@ func ConfigAddOptions(prefix string, f *pflag.FlagSet) { // limit check. type httpServer struct { inner http.Handler - c limitChecker + c LimitChecker } -func newHttpServer(inner http.Handler, c limitChecker) *httpServer { +func newHttpServer(inner http.Handler, c LimitChecker) *httpServer { return &httpServer{inner: inner, c: c} } @@ -116,7 +116,7 @@ func newHttpServer(inner http.Handler, c limitChecker) *httpServer { // limit is exceeded, in which case it returns a HTTP 429 error. func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { start := time.Now() - exceeded, err := s.c.isLimitExceeded() + exceeded, err := s.c.IsLimitExceeded() limitCheckDurationHistogram.Update(time.Since(start).Nanoseconds()) if err != nil { log.Error("Error checking memory limit", "err", err, "checker", s.c.String()) @@ -130,19 +130,19 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.inner.ServeHTTP(w, req) } -type limitChecker interface { - isLimitExceeded() (bool, error) +type LimitChecker interface { + IsLimitExceeded() (bool, error) String() string } -func isSupported(c limitChecker) bool { - _, err := c.isLimitExceeded() +func isSupported(c LimitChecker) bool { + _, err := c.IsLimitExceeded() return err == nil } -// newCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether +// NewCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether // Cgroups V1 or V2 is supported for checking system memory limits. -func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) { +func NewCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) { c := newCgroupsMemoryLimitChecker(cgroupsV1MemoryFiles, memLimitBytes) if isSupported(c) { log.Info("Cgroups v1 detected, enabling memory limit RPC throttling") @@ -161,7 +161,7 @@ func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryL // trivialLimitChecker checks no limits, so its limits are never exceeded. type trivialLimitChecker struct{} -func (_ trivialLimitChecker) isLimitExceeded() (bool, error) { +func (_ trivialLimitChecker) IsLimitExceeded() (bool, error) { return false, nil } @@ -202,7 +202,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) * } } -// isLimitExceeded checks if the system memory free is less than the limit. +// IsLimitExceeded checks if the system memory free is less than the limit. // It returns true if the limit is exceeded. // // container_memory_working_set_bytes in prometheus is calculated as @@ -223,7 +223,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) * // free memory for the page cache, to avoid cache thrashing on chain state // access. How much "reasonable" is will depend on access patterns, state // size, and your application's tolerance for latency. -func (c *cgroupsMemoryLimitChecker) isLimitExceeded() (bool, error) { +func (c *cgroupsMemoryLimitChecker) IsLimitExceeded() (bool, error) { var limit, usage, active, inactive int var err error if limit, err = readIntFromFile(c.files.limitFile); err != nil { diff --git a/arbnode/resourcemanager/resource_management_test.go b/arbnode/resourcemanager/resource_management_test.go index 4f52ad017e..4495396063 100644 --- a/arbnode/resourcemanager/resource_management_test.go +++ b/arbnode/resourcemanager/resource_management_test.go @@ -52,7 +52,7 @@ func makeCgroupsTestDir(cgroupDir string) cgroupsMemoryFiles { func TestCgroupsFailIfCantOpen(t *testing.T) { testFiles := makeCgroupsTestDir(t.TempDir()) c := newCgroupsMemoryLimitChecker(testFiles, 1024*1024*512) - if _, err := c.isLimitExceeded(); err == nil { + if _, err := c.IsLimitExceeded(); err == nil { t.Fatal("Should fail open if can't read files") } } @@ -124,7 +124,7 @@ func TestCgroupsMemoryLimit(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { testFiles := makeCgroupsTestDir(t.TempDir()) - memLimit, err := parseMemLimit(tc.memLimit) + memLimit, err := ParseMemLimit(tc.memLimit) if err != nil { t.Fatalf("Parsing memory limit failed: %v", err) } @@ -132,12 +132,12 @@ func TestCgroupsMemoryLimit(t *testing.T) { if err := updateFakeCgroupFiles(c, tc.sysLimit, tc.usage, tc.inactive, tc.active); err != nil { t.Fatalf("Updating cgroup files: %v", err) } - exceeded, err := c.isLimitExceeded() + exceeded, err := c.IsLimitExceeded() if err != nil { t.Fatalf("Checking if limit exceeded: %v", err) } if exceeded != tc.want { - t.Errorf("isLimitExceeded() = %t, want %t", exceeded, tc.want) + t.Errorf("IsLimitExceeded() = %t, want %t", exceeded, tc.want) } }, ) diff --git a/staker/block_validator.go b/staker/block_validator.go index 61e5ed519b..d54fa60c11 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/rpcclient" @@ -75,6 +76,8 @@ type BlockValidator struct { testingProgressMadeChan chan struct{} fatalErr chan<- error + + MemoryFreeLimitChecker resourcemanager.LimitChecker } type BlockValidatorConfig struct { @@ -87,6 +90,7 @@ type BlockValidatorConfig struct { PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` } func (c *BlockValidatorConfig) Validate() error { @@ -109,6 +113,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error") BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f) + f.String(prefix+".memory-free-limit", DefaultBlockValidatorConfig.MemoryFreeLimit, "minimum free-memory limit after reaching which the blockvalidator pauses validation") } func BlockValidatorDangerousConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -125,6 +130,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, + MemoryFreeLimit: "1GB", } var TestBlockValidatorConfig = BlockValidatorConfig{ @@ -137,6 +143,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, + MemoryFreeLimit: "1GB", } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ @@ -215,6 +222,19 @@ func NewBlockValidator( } streamer.SetBlockValidator(ret) inbox.SetBlockValidator(ret) + if config().MemoryFreeLimit != "" { + limit, err := resourcemanager.ParseMemLimit(config().MemoryFreeLimit) + if err != nil { + return nil, fmt.Errorf("failed to parse MemoryFreeLimit string from config: %w", err) + } + limtchecker, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(limit) + if err != nil { + log.Warn("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported") + } + if limtchecker != nil { + ret.MemoryFreeLimitChecker = limtchecker + } + } return ret, nil } @@ -521,6 +541,15 @@ func (v *BlockValidator) iterativeValidationEntryCreator(ctx context.Context, ig } func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, error) { + if v.MemoryFreeLimitChecker != nil { + exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded() + if err != nil { + log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err) + } + if exceeded { + return false, nil + } + } v.reorgMutex.RLock() pos := v.recordSent() created := v.created() @@ -550,6 +579,15 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro return true, nil } for pos <= recordUntil { + if v.MemoryFreeLimitChecker != nil { + exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded() + if err != nil { + log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err) + } + if exceeded { + return false, nil + } + } validationStatus, found := v.validations.Load(pos) if !found { return false, fmt.Errorf("not found entry for pos %d", pos) @@ -699,6 +737,15 @@ validationsLoop: log.Trace("advanceValidations: no more room", "pos", pos) return nil, nil } + if v.MemoryFreeLimitChecker != nil { + exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded() + if err != nil { + log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err) + } + if exceeded { + return nil, nil + } + } if currentStatus == Prepared { input, err := validationStatus.Entry.ToInput() if err != nil && ctx.Err() == nil { From 7e4e4e7f77e8fb4c7b473dc0eab746e2aab95cd9 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 15 Dec 2023 15:36:01 +0530 Subject: [PATCH 02/15] address PR comments --- staker/block_validator.go | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index d54fa60c11..e649b112b5 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -91,9 +91,25 @@ type BlockValidatorConfig struct { FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` + + memoryFreeLimit int } func (c *BlockValidatorConfig) Validate() error { + if c.MemoryFreeLimit == "default" { + c.memoryFreeLimit = 1073741824 // 1GB + _, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(c.memoryFreeLimit) + if err != nil { + log.Warn("Cgroups V1 or V2 is unsupported, memory-free-limit feature inside block-validator is disabled") + c.MemoryFreeLimit = "" + } + } else if c.MemoryFreeLimit != "" { + limit, err := resourcemanager.ParseMemLimit(c.MemoryFreeLimit) + if err != nil { + return fmt.Errorf("failed to parse block-validator config memory-free-limit string: %w", err) + } + c.memoryFreeLimit = limit + } return c.ValidationServer.Validate() } @@ -113,7 +129,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error") BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f) - f.String(prefix+".memory-free-limit", DefaultBlockValidatorConfig.MemoryFreeLimit, "minimum free-memory limit after reaching which the blockvalidator pauses validation") + f.String(prefix+".memory-free-limit", DefaultBlockValidatorConfig.MemoryFreeLimit, "minimum free-memory limit after reaching which the blockvalidator pauses validation. Enabled by default as 1GB, to disable provide empty string") } func BlockValidatorDangerousConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -130,7 +146,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, - MemoryFreeLimit: "1GB", + MemoryFreeLimit: "default", } var TestBlockValidatorConfig = BlockValidatorConfig{ @@ -143,7 +159,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, - MemoryFreeLimit: "1GB", + MemoryFreeLimit: "default", } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ @@ -223,17 +239,11 @@ func NewBlockValidator( streamer.SetBlockValidator(ret) inbox.SetBlockValidator(ret) if config().MemoryFreeLimit != "" { - limit, err := resourcemanager.ParseMemLimit(config().MemoryFreeLimit) - if err != nil { - return nil, fmt.Errorf("failed to parse MemoryFreeLimit string from config: %w", err) - } - limtchecker, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(limit) + limtchecker, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(config().memoryFreeLimit) if err != nil { - log.Warn("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported") - } - if limtchecker != nil { - ret.MemoryFreeLimitChecker = limtchecker + return nil, fmt.Errorf("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported") } + ret.MemoryFreeLimitChecker = limtchecker } return ret, nil } From efe6f059557baa9b9377cd52bdb736a80d5db34d Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 15 Dec 2023 21:09:40 +0530 Subject: [PATCH 03/15] Add loglevel for ephemeral errors, make data poster nonce ahead of on-chain error an ephemeral error --- arbnode/batch_poster.go | 48 ++++++++++++++++++++--------------------- staker/staker.go | 9 ++++++-- util/log.go | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 27 deletions(-) create mode 100644 util/log.go diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 7a4cfc21c2..513f573644 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -37,6 +37,7 @@ import ( "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/solgen/go/bridgegen" + "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/redisutil" @@ -57,22 +58,22 @@ type batchPosterPosition struct { type BatchPoster struct { stopwaiter.StopWaiter - l1Reader *headerreader.HeaderReader - inbox *InboxTracker - streamer *TransactionStreamer - config BatchPosterConfigFetcher - seqInbox *bridgegen.SequencerInbox - bridge *bridgegen.Bridge - syncMonitor *SyncMonitor - seqInboxABI *abi.ABI - seqInboxAddr common.Address - bridgeAddr common.Address - gasRefunderAddr common.Address - building *buildingBatch - daWriter das.DataAvailabilityServiceWriter - dataPoster *dataposter.DataPoster - redisLock *redislock.Simple - firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred + l1Reader *headerreader.HeaderReader + inbox *InboxTracker + streamer *TransactionStreamer + config BatchPosterConfigFetcher + seqInbox *bridgegen.SequencerInbox + bridge *bridgegen.Bridge + syncMonitor *SyncMonitor + seqInboxABI *abi.ABI + seqInboxAddr common.Address + bridgeAddr common.Address + gasRefunderAddr common.Address + building *buildingBatch + daWriter das.DataAvailabilityServiceWriter + dataPoster *dataposter.DataPoster + redisLock *redislock.Simple + // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. backlog uint64 @@ -1103,6 +1104,8 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) b.LaunchThread(b.pollForReverts) + commonEphemeralError := time.Time{} + exceedMaxMempoolSizeEphemeralError := time.Time{} b.CallIteratively(func(ctx context.Context) time.Duration { var err error if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) { @@ -1127,21 +1130,16 @@ func (b *BatchPoster) Start(ctxIn context.Context) { } posted, err := b.maybePostSequencerBatch(ctx) if err == nil { - b.firstEphemeralError = time.Time{} + commonEphemeralError = time.Time{} + exceedMaxMempoolSizeEphemeralError = time.Time{} } if err != nil { b.building = nil logLevel := log.Error // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. - if b.firstEphemeralError == (time.Time{}) { - b.firstEphemeralError = time.Now() - logLevel = log.Warn - } else if time.Since(b.firstEphemeralError) < time.Minute { - logLevel = log.Warn - } else if time.Since(b.firstEphemeralError) < time.Minute*5 && strings.Contains(err.Error(), "will exceed max mempool size") { - logLevel = log.Warn - } + logLevel = util.LogLevelEphemeralError(err, "", time.Minute, &commonEphemeralError, logLevel) + logLevel = util.LogLevelEphemeralError(err, "will exceed max mempool size", 5*time.Minute, &exceedMaxMempoolSizeEphemeralError, logLevel) logLevel("error posting batch", "err", err) return b.config().ErrorDelay } else if posted { diff --git a/staker/staker.go b/staker/staker.go index 4f35c1bc9a..522f5e0c59 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -25,6 +25,7 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/staker/txbuilder" + "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -406,6 +407,7 @@ func (s *Staker) Start(ctxIn context.Context) { } s.StopWaiter.Start(ctxIn, s) backoff := time.Second + ephemeralError := time.Time{} s.CallIteratively(func(ctx context.Context) (returningWait time.Duration) { defer func() { panicErr := recover() @@ -438,6 +440,7 @@ func (s *Staker) Start(ctxIn context.Context) { } } if err == nil { + ephemeralError = time.Time{} backoff = time.Second stakerLastSuccessfulActionGauge.Update(time.Now().Unix()) stakerActionSuccessCounter.Inc(1) @@ -449,12 +452,14 @@ func (s *Staker) Start(ctxIn context.Context) { } stakerActionFailureCounter.Inc(1) backoff *= 2 + logLevel := log.Error if backoff > time.Minute { backoff = time.Minute - log.Error("error acting as staker", "err", err) } else { - log.Warn("error acting as staker", "err", err) + logLevel = log.Warn } + logLevel = util.LogLevelEphemeralError(err, "is ahead of on-chain nonce", 10*time.Minute, &ephemeralError, logLevel) + logLevel("error acting as staker", "err", err) return backoff }) s.CallIteratively(func(ctx context.Context) time.Duration { diff --git a/util/log.go b/util/log.go new file mode 100644 index 0000000000..4e0453638e --- /dev/null +++ b/util/log.go @@ -0,0 +1,42 @@ +package util + +import ( + "strings" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +// LogLevelEphemeralError is a convenient intermediary level between log levels Warn and Error +// +// For a given error, errorSubstring, duration, firstOccuranceTime and logLevel +// the function defaults to returning the given logLevel if the error doesnt contain the errorSubstring, +// but if it does, then returns one of the corresponding loglevels as follows +// - Warn: For firstOccuranceTime of error being less than the duration amount of time from Now +// - Error: Otherwise +// +// # Usage Examples +// +// log.LogLevelEphemeralError(err, "not supported yet", 5*time.Minute, &firstEphemeralError, log.Error)("msg") +// log.LogLevelEphemeralError(err, "not supported yet", 5*time.Minute, &firstEphemeralError, log.Error)("msg", "key1", val1) +// log.LogLevelEphemeralError(err, "not supported yet", 5*time.Minute, &firstEphemeralError, log.Error)("msg", "key1", val1, "key2", val2) +func LogLevelEphemeralError( + err error, + errorSubstring string, + ephemeralDuration time.Duration, + firstOccuranceTime *time.Time, + currentLogLevel func(msg string, ctx ...interface{})) func(string, ...interface{}) { + if strings.Contains(err.Error(), errorSubstring) || errorSubstring == "" { + logLevel := log.Error + if *firstOccuranceTime == (time.Time{}) { + *firstOccuranceTime = time.Now() + logLevel = log.Warn + } else if time.Since(*firstOccuranceTime) < ephemeralDuration { + logLevel = log.Warn + } + return logLevel + } else { + *firstOccuranceTime = time.Time{} + return currentLogLevel + } +} From e25a59ecb379bbc2b1ec6fd91d90c88165a00936 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Mon, 18 Dec 2023 17:46:01 +0530 Subject: [PATCH 04/15] fix typo --- util/log.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/util/log.go b/util/log.go index 4e0453638e..9bf508e532 100644 --- a/util/log.go +++ b/util/log.go @@ -9,10 +9,10 @@ import ( // LogLevelEphemeralError is a convenient intermediary level between log levels Warn and Error // -// For a given error, errorSubstring, duration, firstOccuranceTime and logLevel +// For a given error, errorSubstring, duration, firstOccurrenceTime and logLevel // the function defaults to returning the given logLevel if the error doesnt contain the errorSubstring, // but if it does, then returns one of the corresponding loglevels as follows -// - Warn: For firstOccuranceTime of error being less than the duration amount of time from Now +// - Warn: For firstOccurrenceTime of error being less than the duration amount of time from Now // - Error: Otherwise // // # Usage Examples @@ -24,19 +24,19 @@ func LogLevelEphemeralError( err error, errorSubstring string, ephemeralDuration time.Duration, - firstOccuranceTime *time.Time, + firstOccurrenceTime *time.Time, currentLogLevel func(msg string, ctx ...interface{})) func(string, ...interface{}) { if strings.Contains(err.Error(), errorSubstring) || errorSubstring == "" { logLevel := log.Error - if *firstOccuranceTime == (time.Time{}) { - *firstOccuranceTime = time.Now() + if *firstOccurrenceTime == (time.Time{}) { + *firstOccurrenceTime = time.Now() logLevel = log.Warn - } else if time.Since(*firstOccuranceTime) < ephemeralDuration { + } else if time.Since(*firstOccurrenceTime) < ephemeralDuration { logLevel = log.Warn } return logLevel } else { - *firstOccuranceTime = time.Time{} + *firstOccurrenceTime = time.Time{} return currentLogLevel } } From a4b4bddd3a8ba743c7665932071f380f5e93fe55 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 20 Dec 2023 20:28:47 +0530 Subject: [PATCH 05/15] Add a metric for memory used by the replay binary WASM in JIT --- arbitrator/jit/src/machine.rs | 3 ++- arbitrator/jit/src/main.rs | 3 ++- validator/server_jit/jit_machine.go | 33 +++++++++++++++++++------- validator/server_jit/machine_loader.go | 12 ++++++---- validator/server_jit/spawner.go | 10 ++++++-- 5 files changed, 43 insertions(+), 18 deletions(-) diff --git a/arbitrator/jit/src/machine.rs b/arbitrator/jit/src/machine.rs index ed22e12ef9..c9119dd16e 100644 --- a/arbitrator/jit/src/machine.rs +++ b/arbitrator/jit/src/machine.rs @@ -280,7 +280,7 @@ impl WasmEnv { Ok(env) } - pub fn send_results(&mut self, error: Option) { + pub fn send_results(&mut self, error: Option, memory_used: u64) { let writer = match &mut self.process.socket { Some((writer, _)) => writer, None => return, @@ -307,6 +307,7 @@ impl WasmEnv { check!(socket::write_u64(writer, self.small_globals[1])); check!(socket::write_bytes32(writer, &self.large_globals[0])); check!(socket::write_bytes32(writer, &self.large_globals[1])); + check!(socket::write_u64(writer, memory_used)); check!(writer.flush()); } } diff --git a/arbitrator/jit/src/main.rs b/arbitrator/jit/src/main.rs index 513cd067c4..968da2a978 100644 --- a/arbitrator/jit/src/main.rs +++ b/arbitrator/jit/src/main.rs @@ -114,8 +114,9 @@ fn main() { true => None, false => Some(message), }; + let memory_used = memory.size().0 as u64 * 65_536; - env.send_results(error); + env.send_results(error, memory_used); } // require a usize be at least 32 bits wide diff --git a/validator/server_jit/jit_machine.go b/validator/server_jit/jit_machine.go index f763ce3ea0..a41e249cdb 100644 --- a/validator/server_jit/jit_machine.go +++ b/validator/server_jit/jit_machine.go @@ -16,17 +16,21 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/validator" ) +var jitWasmMemoryUsage = metrics.NewRegisteredHistogram("jit/wasm/memoryusage", nil, metrics.NewBoundedHistogramSample()) + type JitMachine struct { - binary string - process *exec.Cmd - stdin io.WriteCloser + binary string + process *exec.Cmd + stdin io.WriteCloser + wasmMemoryUsageLimit int } -func createJitMachine(jitBinary string, binaryPath string, cranelift bool, moduleRoot common.Hash, fatalErrChan chan error) (*JitMachine, error) { +func createJitMachine(jitBinary string, binaryPath string, cranelift bool, wasmMemoryUsageLimit int, moduleRoot common.Hash, fatalErrChan chan error) (*JitMachine, error) { invocation := []string{"--binary", binaryPath, "--forks"} if cranelift { invocation = append(invocation, "--cranelift") @@ -45,9 +49,10 @@ func createJitMachine(jitBinary string, binaryPath string, cranelift bool, modul }() machine := &JitMachine{ - binary: binaryPath, - process: process, - stdin: stdin, + binary: binaryPath, + process: process, + stdin: stdin, + wasmMemoryUsageLimit: wasmMemoryUsageLimit, } return machine, nil } @@ -258,8 +263,18 @@ func (machine *JitMachine) prove( if state.BlockHash, err = readHash(); err != nil { return state, err } - state.SendRoot, err = readHash() - return state, err + if state.SendRoot, err = readHash(); err != nil { + return state, err + } + memoryUsed, err := readUint64() + if err != nil { + return state, fmt.Errorf("failed to read memory usage from Jit machine: %w", err) + } + if memoryUsed > uint64(machine.wasmMemoryUsageLimit) { + log.Warn("memory used by jit wasm exceeds the wasm memory usage limit", "limit", machine.wasmMemoryUsageLimit, "memoryUsed", memoryUsed) + } + jitWasmMemoryUsage.Update(int64(memoryUsed)) + return state, nil default: message := "inter-process communication failure" log.Error("Jit Machine Failure", "message", message) diff --git a/validator/server_jit/machine_loader.go b/validator/server_jit/machine_loader.go index 5705a9a387..3a831928b7 100644 --- a/validator/server_jit/machine_loader.go +++ b/validator/server_jit/machine_loader.go @@ -13,13 +13,15 @@ import ( ) type JitMachineConfig struct { - ProverBinPath string - JitCranelift bool + ProverBinPath string + JitCranelift bool + WasmMemoryUsageLimit int } var DefaultJitMachineConfig = JitMachineConfig{ - JitCranelift: true, - ProverBinPath: "replay.wasm", + JitCranelift: true, + ProverBinPath: "replay.wasm", + WasmMemoryUsageLimit: 4294967296, } func getJitPath() (string, error) { @@ -57,7 +59,7 @@ func NewJitMachineLoader(config *JitMachineConfig, locator *server_common.Machin } createMachineThreadFunc := func(ctx context.Context, moduleRoot common.Hash) (*JitMachine, error) { binPath := filepath.Join(locator.GetMachinePath(moduleRoot), config.ProverBinPath) - return createJitMachine(jitPath, binPath, config.JitCranelift, moduleRoot, fatalErrChan) + return createJitMachine(jitPath, binPath, config.JitCranelift, config.WasmMemoryUsageLimit, moduleRoot, fatalErrChan) } return &JitMachineLoader{ MachineLoader: *server_common.NewMachineLoader[JitMachine](locator, createMachineThreadFunc), diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index ff1749506a..6489821b5b 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -18,18 +18,23 @@ import ( type JitSpawnerConfig struct { Workers int `koanf:"workers" reload:"hot"` Cranelift bool `koanf:"cranelift"` + + // TODO: change WasmMemoryUsageLimit to a string and use resourcemanager.ParseMemLimit + WasmMemoryUsageLimit int `koanf:"wasm-memory-usage-limit"` } type JitSpawnerConfigFecher func() *JitSpawnerConfig var DefaultJitSpawnerConfig = JitSpawnerConfig{ - Workers: 0, - Cranelift: true, + Workers: 0, + Cranelift: true, + WasmMemoryUsageLimit: 4294967296, // 2^32 WASM memeory limit } func JitSpawnerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".workers", DefaultJitSpawnerConfig.Workers, "number of concurrent validation threads") f.Bool(prefix+".cranelift", DefaultJitSpawnerConfig.Cranelift, "use Cranelift instead of LLVM when validating blocks using the jit-accelerated block validator") + f.Int(prefix+".wasm-memory-usage-limit", DefaultJitSpawnerConfig.WasmMemoryUsageLimit, "if memory used by a jit wasm exceeds this limit, a warning is logged") } type JitSpawner struct { @@ -44,6 +49,7 @@ func NewJitSpawner(locator *server_common.MachineLocator, config JitSpawnerConfi // TODO - preload machines machineConfig := DefaultJitMachineConfig machineConfig.JitCranelift = config().Cranelift + machineConfig.WasmMemoryUsageLimit = config().WasmMemoryUsageLimit loader, err := NewJitMachineLoader(&machineConfig, locator, fatalErrChan) if err != nil { return nil, err From 4eeb994a8773c374968a1e6f44d9e627875a12b2 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Thu, 21 Dec 2023 09:56:33 -0800 Subject: [PATCH 06/15] Fix help text for batch limits We switched to using geth's batch limiting rather than our own, and the help text was wrong. --- cmd/genericconf/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/genericconf/config.go b/cmd/genericconf/config.go index 6b44ace974..50aafbe223 100644 --- a/cmd/genericconf/config.go +++ b/cmd/genericconf/config.go @@ -121,6 +121,6 @@ func (c *RpcConfig) Apply(stackConf *node.Config) { } func RpcConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (-1 means no limit)") - f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch") + f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (0 means no limit)") + f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch (0 means no limit)") } From ce3a42560fa3a64560475c2cc927254a8ddec231 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Sat, 23 Dec 2023 07:12:36 +0530 Subject: [PATCH 07/15] fix typo --- arbnode/batch_poster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 4c708c302c..827609ee6f 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -1164,7 +1164,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel) - logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = exceedMaxMempoolSizeEphemeralErrorHandler.LogLevel(err, logLevel) logLevel("error posting batch", "err", err) return b.config().ErrorDelay } else if posted { From c95a374c37742d7cfe9e247e7396765e86c1d305 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Fri, 22 Dec 2023 22:11:54 -0700 Subject: [PATCH 08/15] Stop execution node after everything else --- arbnode/node.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index accca10e47..c26d78b89e 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -864,9 +864,6 @@ func (n *Node) Start(ctx context.Context) error { } func (n *Node) StopAndWait() { - if n.Execution != nil { - n.Execution.StopAndWait() - } if n.MaintenanceRunner != nil && n.MaintenanceRunner.Started() { n.MaintenanceRunner.StopAndWait() } @@ -919,7 +916,10 @@ func (n *Node) StopAndWait() { if n.DASLifecycleManager != nil { n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second) } + if n.Execution != nil { + n.Execution.StopAndWait() + } if err := n.Stack.Close(); err != nil { - log.Error("error on stak close", "err", err) + log.Error("error on stack close", "err", err) } } From 3d4111ca9b6bd9759c66c1182b0e693ae827da9b Mon Sep 17 00:00:00 2001 From: Rachel Bousfield Date: Sat, 30 Dec 2023 17:21:57 -0600 Subject: [PATCH 09/15] update change date --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index 0739c2d8bf..e16976d32d 100644 --- a/LICENSE +++ b/LICENSE @@ -30,7 +30,7 @@ Additional Use Grant: You may use the Licensed Work in a production environment -Change Date: Dec 31, 2027 +Change Date: Dec 31, 2028 Change License: Apache License Version 2.0 From 172765d50b35c7fbd77031893bf37c35a3a50f12 Mon Sep 17 00:00:00 2001 From: Rachel Bousfield Date: Sat, 30 Dec 2023 17:45:29 -0600 Subject: [PATCH 10/15] update license year --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index e16976d32d..c96c4d6cb2 100644 --- a/LICENSE +++ b/LICENSE @@ -10,7 +10,7 @@ Parameters Licensor: Offchain Labs Licensed Work: Arbitrum Nitro - The Licensed Work is (c) 2021-2023 Offchain Labs + The Licensed Work is (c) 2021-2024 Offchain Labs Additional Use Grant: You may use the Licensed Work in a production environment solely to provide a point of interface to permit end users or applications From ef961aef3af2e5b1651024428af51a85f0d2655b Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 2 Jan 2024 11:26:11 +0530 Subject: [PATCH 11/15] enable EphemeralErrorHandler to ignore error for a certain duration --- arbnode/batch_poster.go | 26 ++++++++++++++++++++------ staker/staker.go | 2 +- util/log.go | 20 ++++++++++++++++---- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 005012c5a1..8885dab13d 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -1145,8 +1145,18 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) b.LaunchThread(b.pollForReverts) - commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "") - exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, "will exceed max mempool size") + commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0, nil) + exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute, log.Debug) + storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute, log.Debug) + normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute, log.Debug) + accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute, log.Debug) + resetAllEphemeralErrs := func() { + commonEphemeralErrorHandler.Reset() + exceedMaxMempoolSizeEphemeralErrorHandler.Reset() + storageRaceEphemeralErrorHandler.Reset() + normalGasEstimationFailedEphemeralErrorHandler.Reset() + accumulatorNotFoundEphemeralErrorHandler.Reset() + } b.CallIteratively(func(ctx context.Context) time.Duration { var err error if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) { @@ -1174,14 +1184,12 @@ func (b *BatchPoster) Start(ctxIn context.Context) { if !couldLock { log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind") b.building = nil - commonEphemeralErrorHandler.Reset() - exceedMaxMempoolSizeEphemeralErrorHandler.Reset() + resetAllEphemeralErrs() return b.config().PollInterval } posted, err := b.maybePostSequencerBatch(ctx) if err == nil { - commonEphemeralErrorHandler.Reset() - exceedMaxMempoolSizeEphemeralErrorHandler.Reset() + resetAllEphemeralErrs() } if err != nil { if ctx.Err() != nil { @@ -1193,7 +1201,13 @@ func (b *BatchPoster) Start(ctxIn context.Context) { // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel) + // If the error matches one of these, it's only logged at debug for the first minute, + // then at warn for the next 4 minutes, then at error. If the error isn't one of these, + // it'll be logged at warn for the first minute, then at error. logLevel = exceedMaxMempoolSizeEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = storageRaceEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = normalGasEstimationFailedEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = accumulatorNotFoundEphemeralErrorHandler.LogLevel(err, logLevel) logLevel("error posting batch", "err", err) return b.config().ErrorDelay } else if posted { diff --git a/staker/staker.go b/staker/staker.go index 7a2342c260..002dd1f754 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -402,7 +402,7 @@ func (s *Staker) Start(ctxIn context.Context) { } s.StopWaiter.Start(ctxIn, s) backoff := time.Second - ephemeralErrorHandler := util.NewEphemeralErrorHandler(10*time.Minute, "is ahead of on-chain nonce") + ephemeralErrorHandler := util.NewEphemeralErrorHandler(10*time.Minute, "is ahead of on-chain nonce", 0, nil) s.CallIteratively(func(ctx context.Context) (returningWait time.Duration) { defer func() { panicErr := recover() diff --git a/util/log.go b/util/log.go index b68f503f54..0d87b55977 100644 --- a/util/log.go +++ b/util/log.go @@ -13,13 +13,18 @@ type EphemeralErrorHandler struct { Duration time.Duration ErrorString string FirstOccurrence *time.Time + + IgnoreDuration time.Duration + IgnoredErrLogLevel func(string, ...interface{}) // Default IgnoredErrLogLevel is log.Debug } -func NewEphemeralErrorHandler(duration time.Duration, errorString string) *EphemeralErrorHandler { +func NewEphemeralErrorHandler(duration time.Duration, errorString string, ignoreDuration time.Duration, ignoredErrLogLevel func(msg string, ctx ...interface{})) *EphemeralErrorHandler { return &EphemeralErrorHandler{ - Duration: duration, - ErrorString: errorString, - FirstOccurrence: &time.Time{}, + Duration: duration, + ErrorString: errorString, + FirstOccurrence: &time.Time{}, + IgnoreDuration: ignoreDuration, + IgnoredErrLogLevel: ignoredErrLogLevel, } } @@ -39,6 +44,13 @@ func (h *EphemeralErrorHandler) LogLevel(err error, currentLogLevel func(msg str return currentLogLevel } + if h.IgnoreDuration != 0 && time.Since(*h.FirstOccurrence) < h.IgnoreDuration { + if h.IgnoredErrLogLevel != nil { + return h.IgnoredErrLogLevel + } + return log.Debug + } + logLevel := log.Error if *h.FirstOccurrence == (time.Time{}) { *h.FirstOccurrence = time.Now() From b19326290ce9518ebfd704b8a87eb2021b9a2769 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 2 Jan 2024 20:01:07 +0530 Subject: [PATCH 12/15] code refactor --- arbnode/batch_poster.go | 10 +++--- staker/staker.go | 2 +- util/log.go | 21 +++++++------ util/log_test.go | 70 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 16 deletions(-) create mode 100644 util/log_test.go diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 8885dab13d..4bd0e2490a 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -1145,11 +1145,11 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) b.LaunchThread(b.pollForReverts) - commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0, nil) - exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute, log.Debug) - storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute, log.Debug) - normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute, log.Debug) - accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute, log.Debug) + commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0) + exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute) + storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute) + normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute) + accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute) resetAllEphemeralErrs := func() { commonEphemeralErrorHandler.Reset() exceedMaxMempoolSizeEphemeralErrorHandler.Reset() diff --git a/staker/staker.go b/staker/staker.go index 002dd1f754..2a95e9c9f7 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -402,7 +402,7 @@ func (s *Staker) Start(ctxIn context.Context) { } s.StopWaiter.Start(ctxIn, s) backoff := time.Second - ephemeralErrorHandler := util.NewEphemeralErrorHandler(10*time.Minute, "is ahead of on-chain nonce", 0, nil) + ephemeralErrorHandler := util.NewEphemeralErrorHandler(10*time.Minute, "is ahead of on-chain nonce", 0) s.CallIteratively(func(ctx context.Context) (returningWait time.Duration) { defer func() { panicErr := recover() diff --git a/util/log.go b/util/log.go index 0d87b55977..dbeed8051d 100644 --- a/util/log.go +++ b/util/log.go @@ -18,18 +18,19 @@ type EphemeralErrorHandler struct { IgnoredErrLogLevel func(string, ...interface{}) // Default IgnoredErrLogLevel is log.Debug } -func NewEphemeralErrorHandler(duration time.Duration, errorString string, ignoreDuration time.Duration, ignoredErrLogLevel func(msg string, ctx ...interface{})) *EphemeralErrorHandler { +func NewEphemeralErrorHandler(duration time.Duration, errorString string, ignoreDuration time.Duration) *EphemeralErrorHandler { return &EphemeralErrorHandler{ Duration: duration, ErrorString: errorString, FirstOccurrence: &time.Time{}, IgnoreDuration: ignoreDuration, - IgnoredErrLogLevel: ignoredErrLogLevel, + IgnoredErrLogLevel: log.Debug, } } -// LogLevel method defaults to returning the input currentLogLevel if the givenerror doesnt contain the errorSubstring, +// LogLevel method defaults to returning the input currentLogLevel if the given error doesnt contain the errorSubstring, // but if it does, then returns one of the corresponding loglevels as follows +// - IgnoredErrLogLevel - if the error has been repeating for less than the IgnoreDuration of time. Defaults to log.Debug // - log.Warn - if the error has been repeating for less than the given duration of time // - log.Error - Otherwise // @@ -44,6 +45,10 @@ func (h *EphemeralErrorHandler) LogLevel(err error, currentLogLevel func(msg str return currentLogLevel } + if *h.FirstOccurrence == (time.Time{}) { + *h.FirstOccurrence = time.Now() + } + if h.IgnoreDuration != 0 && time.Since(*h.FirstOccurrence) < h.IgnoreDuration { if h.IgnoredErrLogLevel != nil { return h.IgnoredErrLogLevel @@ -51,14 +56,10 @@ func (h *EphemeralErrorHandler) LogLevel(err error, currentLogLevel func(msg str return log.Debug } - logLevel := log.Error - if *h.FirstOccurrence == (time.Time{}) { - *h.FirstOccurrence = time.Now() - logLevel = log.Warn - } else if time.Since(*h.FirstOccurrence) < h.Duration { - logLevel = log.Warn + if time.Since(*h.FirstOccurrence) < h.Duration { + return log.Warn } - return logLevel + return log.Error } func (h *EphemeralErrorHandler) Reset() { diff --git a/util/log_test.go b/util/log_test.go new file mode 100644 index 0000000000..f8007373f2 --- /dev/null +++ b/util/log_test.go @@ -0,0 +1,70 @@ +package util + +import ( + "errors" + "reflect" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +func compareFunctions(f1, f2 func(msg string, ctx ...interface{})) bool { + return reflect.ValueOf(f1).Pointer() == reflect.ValueOf(f2).Pointer() +} +func TestSimple(t *testing.T) { + allErrHandler := NewEphemeralErrorHandler(2500*time.Millisecond, "", time.Second) + err := errors.New("sample error") + logLevel := allErrHandler.LogLevel(err, log.Error) + if !compareFunctions(log.Debug, logLevel) { + t.Fatalf("incorrect loglevel output. Want: Debug") + } + + time.Sleep(1 * time.Second) + logLevel = allErrHandler.LogLevel(err, log.Error) + if !compareFunctions(log.Warn, logLevel) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + + time.Sleep(2 * time.Second) + logLevel = allErrHandler.LogLevel(err, log.Error) + if !compareFunctions(log.Error, logLevel) { + t.Fatalf("incorrect loglevel output. Want: Error") + } +} + +func TestComplex(t *testing.T) { + // Simulation: errorA happens continuously for 2 seconds and then errorB happens + errorAHandler := NewEphemeralErrorHandler(time.Second, "errorA", 0) + errorBHandler := NewEphemeralErrorHandler(1500*time.Millisecond, "errorB", 0) + + // Computes result of chaining two ephemeral error handlers for a given recurring error + chainingErrHandlers := func(err error) func(string, ...interface{}) { + logLevel := log.Error + logLevel = errorAHandler.LogLevel(err, logLevel) + logLevel = errorBHandler.LogLevel(err, logLevel) + return logLevel + } + + errA := errors.New("this is a sample errorA") + if !compareFunctions(log.Warn, chainingErrHandlers(errA)) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + time.Sleep(2 * time.Second) + if !compareFunctions(log.Error, chainingErrHandlers(errA)) { + t.Fatalf("incorrect loglevel output. Want: Error") + } + + errB := errors.New("this is a sample errorB") + if !compareFunctions(log.Warn, chainingErrHandlers(errB)) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + if !compareFunctions(log.Warn, chainingErrHandlers(errA)) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + + errC := errors.New("random error") + if !compareFunctions(log.Error, chainingErrHandlers(errC)) { + t.Fatalf("incorrect loglevel output. Want: Error") + } +} From e8ec476514ba6b9286380eb2d8190296c628767a Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Thu, 4 Jan 2024 17:55:21 -0700 Subject: [PATCH 13/15] Make clippy happy --- arbitrator/prover/src/machine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbitrator/prover/src/machine.rs b/arbitrator/prover/src/machine.rs index 0849312f3d..6ca552d83c 100644 --- a/arbitrator/prover/src/machine.rs +++ b/arbitrator/prover/src/machine.rs @@ -362,7 +362,7 @@ impl Module { bin.memories.len() <= 1, "Multiple memories are not supported" ); - if let Some(limits) = bin.memories.get(0) { + if let Some(limits) = bin.memories.first() { let page_size = Memory::PAGE_SIZE; let initial = limits.initial; // validate() checks this is less than max::u32 let allowed = u32::MAX as u64 / Memory::PAGE_SIZE - 1; // we require the size remain *below* 2^32 From b9e9bcfecc6f857e20bb9b46d18847927af57a35 Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Thu, 4 Jan 2024 18:22:06 -0700 Subject: [PATCH 14/15] Make clippy happy --- arbitrator/jit/src/syscall.rs | 6 +++--- arbitrator/wasm-libraries/go-stub/src/lib.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/arbitrator/jit/src/syscall.rs b/arbitrator/jit/src/syscall.rs index c81641a7f8..4f657eeefa 100644 --- a/arbitrator/jit/src/syscall.rs +++ b/arbitrator/jit/src/syscall.rs @@ -337,7 +337,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape { let value = match (object, method_name.as_slice()) { (Ref(GO_ID), b"_makeFuncWrapper") => { - let arg = match args.get(0) { + let arg = match args.first() { Some(arg) => arg, None => fail!( "Go trying to call Go._makeFuncWrapper with bad args {:?}", @@ -415,7 +415,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape { (Ref(CRYPTO_ID), b"getRandomValues") => { let name = "crypto.getRandomValues"; - let id = match args.get(0) { + let id = match args.first() { Some(Ref(x)) => x, _ => fail!("Go trying to call {name} with bad args {:?}", args), }; @@ -456,7 +456,7 @@ pub fn js_value_new(mut env: WasmEnvMut, sp: u32) { let args_len = sp.read_u64(2); let args = sp.read_value_slice(args_ptr, args_len); match class { - UINT8_ARRAY_ID => match args.get(0) { + UINT8_ARRAY_ID => match args.first() { Some(JsValue::Number(size)) => { let id = pool.insert(DynamicObject::Uint8Array(vec![0; *size as usize])); sp.write_u64(4, GoValue::Object(id).encode()); diff --git a/arbitrator/wasm-libraries/go-stub/src/lib.rs b/arbitrator/wasm-libraries/go-stub/src/lib.rs index df77893fcb..1a5d1963c7 100644 --- a/arbitrator/wasm-libraries/go-stub/src/lib.rs +++ b/arbitrator/wasm-libraries/go-stub/src/lib.rs @@ -218,7 +218,7 @@ pub unsafe extern "C" fn go__syscall_js_valueNew(sp: GoStack) { let args_len = sp.read_u64(2); let args = read_value_slice(args_ptr, args_len); if class == UINT8_ARRAY_ID { - if let Some(InterpValue::Number(size)) = args.get(0) { + if let Some(InterpValue::Number(size)) = args.first() { let id = DynamicObjectPool::singleton() .insert(DynamicObject::Uint8Array(vec![0; *size as usize])); sp.write_u64(4, GoValue::Object(id).encode()); @@ -321,7 +321,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result { let args_len = sp.read_u64(4); let args = read_value_slice(args_ptr, args_len); if object == InterpValue::Ref(GO_ID) && &method_name == b"_makeFuncWrapper" { - let id = args.get(0).ok_or_else(|| { + let id = args.first().ok_or_else(|| { format!( "Go attempting to call Go._makeFuncWrapper with bad args {:?}", args, @@ -405,7 +405,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result { )) } } else if object == InterpValue::Ref(CRYPTO_ID) && &method_name == b"getRandomValues" { - let id = match args.get(0) { + let id = match args.first() { Some(InterpValue::Ref(x)) => *x, _ => { return Err(format!( From c02e8401caec041e2d8b856ecc8189f910a21475 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Fri, 5 Jan 2024 10:23:33 +0530 Subject: [PATCH 15/15] address PR comments --- staker/block_validator.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index e649b112b5..352335a5db 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -98,11 +98,6 @@ type BlockValidatorConfig struct { func (c *BlockValidatorConfig) Validate() error { if c.MemoryFreeLimit == "default" { c.memoryFreeLimit = 1073741824 // 1GB - _, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(c.memoryFreeLimit) - if err != nil { - log.Warn("Cgroups V1 or V2 is unsupported, memory-free-limit feature inside block-validator is disabled") - c.MemoryFreeLimit = "" - } } else if c.MemoryFreeLimit != "" { limit, err := resourcemanager.ParseMemLimit(c.MemoryFreeLimit) if err != nil { @@ -241,9 +236,14 @@ func NewBlockValidator( if config().MemoryFreeLimit != "" { limtchecker, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(config().memoryFreeLimit) if err != nil { - return nil, fmt.Errorf("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported") + if config().MemoryFreeLimit == "default" { + log.Warn("Cgroups V1 or V2 is unsupported, memory-free-limit feature inside block-validator is disabled") + } else { + return nil, fmt.Errorf("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported") + } + } else { + ret.MemoryFreeLimitChecker = limtchecker } - ret.MemoryFreeLimitChecker = limtchecker } return ret, nil }