diff --git a/LICENSE b/LICENSE index 0739c2d8bf..c96c4d6cb2 100644 --- a/LICENSE +++ b/LICENSE @@ -10,7 +10,7 @@ Parameters Licensor: Offchain Labs Licensed Work: Arbitrum Nitro - The Licensed Work is (c) 2021-2023 Offchain Labs + The Licensed Work is (c) 2021-2024 Offchain Labs Additional Use Grant: You may use the Licensed Work in a production environment solely to provide a point of interface to permit end users or applications @@ -30,7 +30,7 @@ Additional Use Grant: You may use the Licensed Work in a production environment -Change Date: Dec 31, 2027 +Change Date: Dec 31, 2028 Change License: Apache License Version 2.0 diff --git a/arbitrator/jit/src/machine.rs b/arbitrator/jit/src/machine.rs index ed22e12ef9..c9119dd16e 100644 --- a/arbitrator/jit/src/machine.rs +++ b/arbitrator/jit/src/machine.rs @@ -280,7 +280,7 @@ impl WasmEnv { Ok(env) } - pub fn send_results(&mut self, error: Option) { + pub fn send_results(&mut self, error: Option, memory_used: u64) { let writer = match &mut self.process.socket { Some((writer, _)) => writer, None => return, @@ -307,6 +307,7 @@ impl WasmEnv { check!(socket::write_u64(writer, self.small_globals[1])); check!(socket::write_bytes32(writer, &self.large_globals[0])); check!(socket::write_bytes32(writer, &self.large_globals[1])); + check!(socket::write_u64(writer, memory_used)); check!(writer.flush()); } } diff --git a/arbitrator/jit/src/main.rs b/arbitrator/jit/src/main.rs index 513cd067c4..968da2a978 100644 --- a/arbitrator/jit/src/main.rs +++ b/arbitrator/jit/src/main.rs @@ -114,8 +114,9 @@ fn main() { true => None, false => Some(message), }; + let memory_used = memory.size().0 as u64 * 65_536; - env.send_results(error); + env.send_results(error, memory_used); } // require a usize be at least 32 bits wide diff --git a/arbitrator/jit/src/syscall.rs b/arbitrator/jit/src/syscall.rs index c81641a7f8..4f657eeefa 100644 --- a/arbitrator/jit/src/syscall.rs +++ b/arbitrator/jit/src/syscall.rs @@ -337,7 +337,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape { let value = match (object, method_name.as_slice()) { (Ref(GO_ID), b"_makeFuncWrapper") => { - let arg = match args.get(0) { + let arg = match args.first() { Some(arg) => arg, None => fail!( "Go trying to call Go._makeFuncWrapper with bad args {:?}", @@ -415,7 +415,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape { (Ref(CRYPTO_ID), b"getRandomValues") => { let name = "crypto.getRandomValues"; - let id = match args.get(0) { + let id = match args.first() { Some(Ref(x)) => x, _ => fail!("Go trying to call {name} with bad args {:?}", args), }; @@ -456,7 +456,7 @@ pub fn js_value_new(mut env: WasmEnvMut, sp: u32) { let args_len = sp.read_u64(2); let args = sp.read_value_slice(args_ptr, args_len); match class { - UINT8_ARRAY_ID => match args.get(0) { + UINT8_ARRAY_ID => match args.first() { Some(JsValue::Number(size)) => { let id = pool.insert(DynamicObject::Uint8Array(vec![0; *size as usize])); sp.write_u64(4, GoValue::Object(id).encode()); diff --git a/arbitrator/prover/src/machine.rs b/arbitrator/prover/src/machine.rs index 0849312f3d..6ca552d83c 100644 --- a/arbitrator/prover/src/machine.rs +++ b/arbitrator/prover/src/machine.rs @@ -362,7 +362,7 @@ impl Module { bin.memories.len() <= 1, "Multiple memories are not supported" ); - if let Some(limits) = bin.memories.get(0) { + if let Some(limits) = bin.memories.first() { let page_size = Memory::PAGE_SIZE; let initial = limits.initial; // validate() checks this is less than max::u32 let allowed = u32::MAX as u64 / Memory::PAGE_SIZE - 1; // we require the size remain *below* 2^32 diff --git a/arbitrator/wasm-libraries/go-stub/src/lib.rs b/arbitrator/wasm-libraries/go-stub/src/lib.rs index df77893fcb..1a5d1963c7 100644 --- a/arbitrator/wasm-libraries/go-stub/src/lib.rs +++ b/arbitrator/wasm-libraries/go-stub/src/lib.rs @@ -218,7 +218,7 @@ pub unsafe extern "C" fn go__syscall_js_valueNew(sp: GoStack) { let args_len = sp.read_u64(2); let args = read_value_slice(args_ptr, args_len); if class == UINT8_ARRAY_ID { - if let Some(InterpValue::Number(size)) = args.get(0) { + if let Some(InterpValue::Number(size)) = args.first() { let id = DynamicObjectPool::singleton() .insert(DynamicObject::Uint8Array(vec![0; *size as usize])); sp.write_u64(4, GoValue::Object(id).encode()); @@ -321,7 +321,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result { let args_len = sp.read_u64(4); let args = read_value_slice(args_ptr, args_len); if object == InterpValue::Ref(GO_ID) && &method_name == b"_makeFuncWrapper" { - let id = args.get(0).ok_or_else(|| { + let id = args.first().ok_or_else(|| { format!( "Go attempting to call Go._makeFuncWrapper with bad args {:?}", args, @@ -405,7 +405,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result { )) } } else if object == InterpValue::Ref(CRYPTO_ID) && &method_name == b"getRandomValues" { - let id = match args.get(0) { + let id = match args.first() { Some(InterpValue::Ref(x)) => *x, _ => { return Err(format!( diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 8c0fdd332a..4bd0e2490a 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -38,6 +38,7 @@ import ( "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/solgen/go/bridgegen" + "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/redisutil" @@ -58,23 +59,22 @@ type batchPosterPosition struct { type BatchPoster struct { stopwaiter.StopWaiter - l1Reader *headerreader.HeaderReader - inbox *InboxTracker - streamer *TransactionStreamer - config BatchPosterConfigFetcher - seqInbox *bridgegen.SequencerInbox - bridge *bridgegen.Bridge - syncMonitor *SyncMonitor - seqInboxABI *abi.ABI - seqInboxAddr common.Address - bridgeAddr common.Address - gasRefunderAddr common.Address - building *buildingBatch - daWriter das.DataAvailabilityServiceWriter - dataPoster *dataposter.DataPoster - redisLock *redislock.Simple - firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred - messagesPerBatch *arbmath.MovingAverage[uint64] + l1Reader *headerreader.HeaderReader + inbox *InboxTracker + streamer *TransactionStreamer + config BatchPosterConfigFetcher + seqInbox *bridgegen.SequencerInbox + bridge *bridgegen.Bridge + syncMonitor *SyncMonitor + seqInboxABI *abi.ABI + seqInboxAddr common.Address + bridgeAddr common.Address + gasRefunderAddr common.Address + building *buildingBatch + daWriter das.DataAvailabilityServiceWriter + dataPoster *dataposter.DataPoster + redisLock *redislock.Simple + messagesPerBatch *arbmath.MovingAverage[uint64] // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. @@ -1145,6 +1145,18 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) b.LaunchThread(b.pollForReverts) + commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0) + exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute) + storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute) + normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute) + accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute) + resetAllEphemeralErrs := func() { + commonEphemeralErrorHandler.Reset() + exceedMaxMempoolSizeEphemeralErrorHandler.Reset() + storageRaceEphemeralErrorHandler.Reset() + normalGasEstimationFailedEphemeralErrorHandler.Reset() + accumulatorNotFoundEphemeralErrorHandler.Reset() + } b.CallIteratively(func(ctx context.Context) time.Duration { var err error if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) { @@ -1172,12 +1184,12 @@ func (b *BatchPoster) Start(ctxIn context.Context) { if !couldLock { log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind") b.building = nil - b.firstEphemeralError = time.Time{} + resetAllEphemeralErrs() return b.config().PollInterval } posted, err := b.maybePostSequencerBatch(ctx) if err == nil { - b.firstEphemeralError = time.Time{} + resetAllEphemeralErrs() } if err != nil { if ctx.Err() != nil { @@ -1186,28 +1198,16 @@ func (b *BatchPoster) Start(ctxIn context.Context) { } b.building = nil logLevel := log.Error - if b.firstEphemeralError == (time.Time{}) { - b.firstEphemeralError = time.Now() - } - // Likely the inbox tracker just isn't caught up, or there's some other ephemeral error. + // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. - sinceFirstEphemeralError := time.Since(b.firstEphemeralError) + logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel) // If the error matches one of these, it's only logged at debug for the first minute, // then at warn for the next 4 minutes, then at error. If the error isn't one of these, // it'll be logged at warn for the first minute, then at error. - ignoreAtFirst := errors.Is(err, dataposter.ErrExceedsMaxMempoolSize) || - errors.Is(err, storage.ErrStorageRace) || - errors.Is(err, ErrNormalGasEstimationFailed) || - errors.Is(err, AccumulatorNotFoundErr) - if sinceFirstEphemeralError < time.Minute { - if ignoreAtFirst { - logLevel = log.Debug - } else { - logLevel = log.Warn - } - } else if sinceFirstEphemeralError < time.Minute*5 && ignoreAtFirst { - logLevel = log.Warn - } + logLevel = exceedMaxMempoolSizeEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = storageRaceEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = normalGasEstimationFailedEphemeralErrorHandler.LogLevel(err, logLevel) + logLevel = accumulatorNotFoundEphemeralErrorHandler.LogLevel(err, logLevel) logLevel("error posting batch", "err", err) return b.config().ErrorDelay } else if posted { diff --git a/arbnode/node.go b/arbnode/node.go index f5063715a3..78c6c816e6 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -862,9 +862,6 @@ func (n *Node) Start(ctx context.Context) error { } func (n *Node) StopAndWait() { - if n.Execution != nil { - n.Execution.StopAndWait() - } if n.MaintenanceRunner != nil && n.MaintenanceRunner.Started() { n.MaintenanceRunner.StopAndWait() } @@ -917,7 +914,10 @@ func (n *Node) StopAndWait() { if n.DASLifecycleManager != nil { n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second) } + if n.Execution != nil { + n.Execution.StopAndWait() + } if err := n.Stack.Close(); err != nil { - log.Error("error on stak close", "err", err) + log.Error("error on stack close", "err", err) } } diff --git a/arbnode/resourcemanager/resource_management.go b/arbnode/resourcemanager/resource_management.go index cb1ae9d6ea..aba823cc25 100644 --- a/arbnode/resourcemanager/resource_management.go +++ b/arbnode/resourcemanager/resource_management.go @@ -39,14 +39,14 @@ func Init(conf *Config) error { return nil } - limit, err := parseMemLimit(conf.MemFreeLimit) + limit, err := ParseMemLimit(conf.MemFreeLimit) if err != nil { return err } node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) { - var c limitChecker - c, err := newCgroupsMemoryLimitCheckerIfSupported(limit) + var c LimitChecker + c, err := NewCgroupsMemoryLimitCheckerIfSupported(limit) if errors.Is(err, errNotSupported) { log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling") c = &trivialLimitChecker{} @@ -57,7 +57,7 @@ func Init(conf *Config) error { return nil } -func parseMemLimit(limitStr string) (int, error) { +func ParseMemLimit(limitStr string) (int, error) { var ( limit int = 1 s string @@ -105,10 +105,10 @@ func ConfigAddOptions(prefix string, f *pflag.FlagSet) { // limit check. type httpServer struct { inner http.Handler - c limitChecker + c LimitChecker } -func newHttpServer(inner http.Handler, c limitChecker) *httpServer { +func newHttpServer(inner http.Handler, c LimitChecker) *httpServer { return &httpServer{inner: inner, c: c} } @@ -116,7 +116,7 @@ func newHttpServer(inner http.Handler, c limitChecker) *httpServer { // limit is exceeded, in which case it returns a HTTP 429 error. func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { start := time.Now() - exceeded, err := s.c.isLimitExceeded() + exceeded, err := s.c.IsLimitExceeded() limitCheckDurationHistogram.Update(time.Since(start).Nanoseconds()) if err != nil { log.Error("Error checking memory limit", "err", err, "checker", s.c.String()) @@ -130,19 +130,19 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.inner.ServeHTTP(w, req) } -type limitChecker interface { - isLimitExceeded() (bool, error) +type LimitChecker interface { + IsLimitExceeded() (bool, error) String() string } -func isSupported(c limitChecker) bool { - _, err := c.isLimitExceeded() +func isSupported(c LimitChecker) bool { + _, err := c.IsLimitExceeded() return err == nil } -// newCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether +// NewCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether // Cgroups V1 or V2 is supported for checking system memory limits. -func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) { +func NewCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) { c := newCgroupsMemoryLimitChecker(cgroupsV1MemoryFiles, memLimitBytes) if isSupported(c) { log.Info("Cgroups v1 detected, enabling memory limit RPC throttling") @@ -161,7 +161,7 @@ func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryL // trivialLimitChecker checks no limits, so its limits are never exceeded. type trivialLimitChecker struct{} -func (_ trivialLimitChecker) isLimitExceeded() (bool, error) { +func (_ trivialLimitChecker) IsLimitExceeded() (bool, error) { return false, nil } @@ -202,7 +202,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) * } } -// isLimitExceeded checks if the system memory free is less than the limit. +// IsLimitExceeded checks if the system memory free is less than the limit. // It returns true if the limit is exceeded. // // container_memory_working_set_bytes in prometheus is calculated as @@ -223,7 +223,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) * // free memory for the page cache, to avoid cache thrashing on chain state // access. How much "reasonable" is will depend on access patterns, state // size, and your application's tolerance for latency. -func (c *cgroupsMemoryLimitChecker) isLimitExceeded() (bool, error) { +func (c *cgroupsMemoryLimitChecker) IsLimitExceeded() (bool, error) { var limit, usage, active, inactive int var err error if limit, err = readIntFromFile(c.files.limitFile); err != nil { diff --git a/arbnode/resourcemanager/resource_management_test.go b/arbnode/resourcemanager/resource_management_test.go index 4f52ad017e..4495396063 100644 --- a/arbnode/resourcemanager/resource_management_test.go +++ b/arbnode/resourcemanager/resource_management_test.go @@ -52,7 +52,7 @@ func makeCgroupsTestDir(cgroupDir string) cgroupsMemoryFiles { func TestCgroupsFailIfCantOpen(t *testing.T) { testFiles := makeCgroupsTestDir(t.TempDir()) c := newCgroupsMemoryLimitChecker(testFiles, 1024*1024*512) - if _, err := c.isLimitExceeded(); err == nil { + if _, err := c.IsLimitExceeded(); err == nil { t.Fatal("Should fail open if can't read files") } } @@ -124,7 +124,7 @@ func TestCgroupsMemoryLimit(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { testFiles := makeCgroupsTestDir(t.TempDir()) - memLimit, err := parseMemLimit(tc.memLimit) + memLimit, err := ParseMemLimit(tc.memLimit) if err != nil { t.Fatalf("Parsing memory limit failed: %v", err) } @@ -132,12 +132,12 @@ func TestCgroupsMemoryLimit(t *testing.T) { if err := updateFakeCgroupFiles(c, tc.sysLimit, tc.usage, tc.inactive, tc.active); err != nil { t.Fatalf("Updating cgroup files: %v", err) } - exceeded, err := c.isLimitExceeded() + exceeded, err := c.IsLimitExceeded() if err != nil { t.Fatalf("Checking if limit exceeded: %v", err) } if exceeded != tc.want { - t.Errorf("isLimitExceeded() = %t, want %t", exceeded, tc.want) + t.Errorf("IsLimitExceeded() = %t, want %t", exceeded, tc.want) } }, ) diff --git a/cmd/genericconf/config.go b/cmd/genericconf/config.go index 6b44ace974..50aafbe223 100644 --- a/cmd/genericconf/config.go +++ b/cmd/genericconf/config.go @@ -121,6 +121,6 @@ func (c *RpcConfig) Apply(stackConf *node.Config) { } func RpcConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (-1 means no limit)") - f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch") + f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (0 means no limit)") + f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch (0 means no limit)") } diff --git a/staker/block_validator.go b/staker/block_validator.go index 61e5ed519b..352335a5db 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/rpcclient" @@ -75,6 +76,8 @@ type BlockValidator struct { testingProgressMadeChan chan struct{} fatalErr chan<- error + + MemoryFreeLimitChecker resourcemanager.LimitChecker } type BlockValidatorConfig struct { @@ -87,9 +90,21 @@ type BlockValidatorConfig struct { PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` + + memoryFreeLimit int } func (c *BlockValidatorConfig) Validate() error { + if c.MemoryFreeLimit == "default" { + c.memoryFreeLimit = 1073741824 // 1GB + } else if c.MemoryFreeLimit != "" { + limit, err := resourcemanager.ParseMemLimit(c.MemoryFreeLimit) + if err != nil { + return fmt.Errorf("failed to parse block-validator config memory-free-limit string: %w", err) + } + c.memoryFreeLimit = limit + } return c.ValidationServer.Validate() } @@ -109,6 +124,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error") BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f) + f.String(prefix+".memory-free-limit", DefaultBlockValidatorConfig.MemoryFreeLimit, "minimum free-memory limit after reaching which the blockvalidator pauses validation. Enabled by default as 1GB, to disable provide empty string") } func BlockValidatorDangerousConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -125,6 +141,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, + MemoryFreeLimit: "default", } var TestBlockValidatorConfig = BlockValidatorConfig{ @@ -137,6 +154,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, + MemoryFreeLimit: "default", } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ @@ -215,6 +233,18 @@ func NewBlockValidator( } streamer.SetBlockValidator(ret) inbox.SetBlockValidator(ret) + if config().MemoryFreeLimit != "" { + limtchecker, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(config().memoryFreeLimit) + if err != nil { + if config().MemoryFreeLimit == "default" { + log.Warn("Cgroups V1 or V2 is unsupported, memory-free-limit feature inside block-validator is disabled") + } else { + return nil, fmt.Errorf("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported") + } + } else { + ret.MemoryFreeLimitChecker = limtchecker + } + } return ret, nil } @@ -521,6 +551,15 @@ func (v *BlockValidator) iterativeValidationEntryCreator(ctx context.Context, ig } func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, error) { + if v.MemoryFreeLimitChecker != nil { + exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded() + if err != nil { + log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err) + } + if exceeded { + return false, nil + } + } v.reorgMutex.RLock() pos := v.recordSent() created := v.created() @@ -550,6 +589,15 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro return true, nil } for pos <= recordUntil { + if v.MemoryFreeLimitChecker != nil { + exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded() + if err != nil { + log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err) + } + if exceeded { + return false, nil + } + } validationStatus, found := v.validations.Load(pos) if !found { return false, fmt.Errorf("not found entry for pos %d", pos) @@ -699,6 +747,15 @@ validationsLoop: log.Trace("advanceValidations: no more room", "pos", pos) return nil, nil } + if v.MemoryFreeLimitChecker != nil { + exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded() + if err != nil { + log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err) + } + if exceeded { + return nil, nil + } + } if currentStatus == Prepared { input, err := validationStatus.Entry.ToInput() if err != nil && ctx.Err() == nil { diff --git a/staker/staker.go b/staker/staker.go index 356584bbc7..33e4e49f2c 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -26,6 +26,7 @@ import ( "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/rollupgen" "github.com/offchainlabs/nitro/staker/txbuilder" + "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -416,6 +417,7 @@ func (s *Staker) Start(ctxIn context.Context) { } s.StopWaiter.Start(ctxIn, s) backoff := time.Second + ephemeralErrorHandler := util.NewEphemeralErrorHandler(10*time.Minute, "is ahead of on-chain nonce", 0) s.CallIteratively(func(ctx context.Context) (returningWait time.Duration) { defer func() { panicErr := recover() @@ -455,6 +457,7 @@ func (s *Staker) Start(ctxIn context.Context) { } } if err == nil { + ephemeralErrorHandler.Reset() backoff = time.Second stakerLastSuccessfulActionGauge.Update(time.Now().Unix()) stakerActionSuccessCounter.Inc(1) @@ -466,12 +469,14 @@ func (s *Staker) Start(ctxIn context.Context) { } stakerActionFailureCounter.Inc(1) backoff *= 2 + logLevel := log.Error if backoff > time.Minute { backoff = time.Minute - log.Error("error acting as staker", "err", err) } else { - log.Warn("error acting as staker", "err", err) + logLevel = log.Warn } + logLevel = ephemeralErrorHandler.LogLevel(err, logLevel) + logLevel("error acting as staker", "err", err) return backoff }) s.CallIteratively(func(ctx context.Context) time.Duration { diff --git a/util/log.go b/util/log.go new file mode 100644 index 0000000000..dbeed8051d --- /dev/null +++ b/util/log.go @@ -0,0 +1,67 @@ +package util + +import ( + "strings" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +// EphemeralErrorHandler handles errors that are ephemeral in nature i.h these are errors +// that we would like to log as a warning unless they repeat for more than a certain duration of time. +type EphemeralErrorHandler struct { + Duration time.Duration + ErrorString string + FirstOccurrence *time.Time + + IgnoreDuration time.Duration + IgnoredErrLogLevel func(string, ...interface{}) // Default IgnoredErrLogLevel is log.Debug +} + +func NewEphemeralErrorHandler(duration time.Duration, errorString string, ignoreDuration time.Duration) *EphemeralErrorHandler { + return &EphemeralErrorHandler{ + Duration: duration, + ErrorString: errorString, + FirstOccurrence: &time.Time{}, + IgnoreDuration: ignoreDuration, + IgnoredErrLogLevel: log.Debug, + } +} + +// LogLevel method defaults to returning the input currentLogLevel if the given error doesnt contain the errorSubstring, +// but if it does, then returns one of the corresponding loglevels as follows +// - IgnoredErrLogLevel - if the error has been repeating for less than the IgnoreDuration of time. Defaults to log.Debug +// - log.Warn - if the error has been repeating for less than the given duration of time +// - log.Error - Otherwise +// +// # Usage Examples +// +// ephemeralErrorHandler.Loglevel(err, log.Error)("msg") +// ephemeralErrorHandler.Loglevel(err, log.Error)("msg", "key1", val1, "key2", val2) +// ephemeralErrorHandler.Loglevel(err, log.Error)("msg", "key1", val1) +func (h *EphemeralErrorHandler) LogLevel(err error, currentLogLevel func(msg string, ctx ...interface{})) func(string, ...interface{}) { + if h.ErrorString != "" && !strings.Contains(err.Error(), h.ErrorString) { + h.Reset() + return currentLogLevel + } + + if *h.FirstOccurrence == (time.Time{}) { + *h.FirstOccurrence = time.Now() + } + + if h.IgnoreDuration != 0 && time.Since(*h.FirstOccurrence) < h.IgnoreDuration { + if h.IgnoredErrLogLevel != nil { + return h.IgnoredErrLogLevel + } + return log.Debug + } + + if time.Since(*h.FirstOccurrence) < h.Duration { + return log.Warn + } + return log.Error +} + +func (h *EphemeralErrorHandler) Reset() { + *h.FirstOccurrence = time.Time{} +} diff --git a/util/log_test.go b/util/log_test.go new file mode 100644 index 0000000000..f8007373f2 --- /dev/null +++ b/util/log_test.go @@ -0,0 +1,70 @@ +package util + +import ( + "errors" + "reflect" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +func compareFunctions(f1, f2 func(msg string, ctx ...interface{})) bool { + return reflect.ValueOf(f1).Pointer() == reflect.ValueOf(f2).Pointer() +} +func TestSimple(t *testing.T) { + allErrHandler := NewEphemeralErrorHandler(2500*time.Millisecond, "", time.Second) + err := errors.New("sample error") + logLevel := allErrHandler.LogLevel(err, log.Error) + if !compareFunctions(log.Debug, logLevel) { + t.Fatalf("incorrect loglevel output. Want: Debug") + } + + time.Sleep(1 * time.Second) + logLevel = allErrHandler.LogLevel(err, log.Error) + if !compareFunctions(log.Warn, logLevel) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + + time.Sleep(2 * time.Second) + logLevel = allErrHandler.LogLevel(err, log.Error) + if !compareFunctions(log.Error, logLevel) { + t.Fatalf("incorrect loglevel output. Want: Error") + } +} + +func TestComplex(t *testing.T) { + // Simulation: errorA happens continuously for 2 seconds and then errorB happens + errorAHandler := NewEphemeralErrorHandler(time.Second, "errorA", 0) + errorBHandler := NewEphemeralErrorHandler(1500*time.Millisecond, "errorB", 0) + + // Computes result of chaining two ephemeral error handlers for a given recurring error + chainingErrHandlers := func(err error) func(string, ...interface{}) { + logLevel := log.Error + logLevel = errorAHandler.LogLevel(err, logLevel) + logLevel = errorBHandler.LogLevel(err, logLevel) + return logLevel + } + + errA := errors.New("this is a sample errorA") + if !compareFunctions(log.Warn, chainingErrHandlers(errA)) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + time.Sleep(2 * time.Second) + if !compareFunctions(log.Error, chainingErrHandlers(errA)) { + t.Fatalf("incorrect loglevel output. Want: Error") + } + + errB := errors.New("this is a sample errorB") + if !compareFunctions(log.Warn, chainingErrHandlers(errB)) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + if !compareFunctions(log.Warn, chainingErrHandlers(errA)) { + t.Fatalf("incorrect loglevel output. Want: Warn") + } + + errC := errors.New("random error") + if !compareFunctions(log.Error, chainingErrHandlers(errC)) { + t.Fatalf("incorrect loglevel output. Want: Error") + } +} diff --git a/validator/server_jit/jit_machine.go b/validator/server_jit/jit_machine.go index f763ce3ea0..a41e249cdb 100644 --- a/validator/server_jit/jit_machine.go +++ b/validator/server_jit/jit_machine.go @@ -16,17 +16,21 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/validator" ) +var jitWasmMemoryUsage = metrics.NewRegisteredHistogram("jit/wasm/memoryusage", nil, metrics.NewBoundedHistogramSample()) + type JitMachine struct { - binary string - process *exec.Cmd - stdin io.WriteCloser + binary string + process *exec.Cmd + stdin io.WriteCloser + wasmMemoryUsageLimit int } -func createJitMachine(jitBinary string, binaryPath string, cranelift bool, moduleRoot common.Hash, fatalErrChan chan error) (*JitMachine, error) { +func createJitMachine(jitBinary string, binaryPath string, cranelift bool, wasmMemoryUsageLimit int, moduleRoot common.Hash, fatalErrChan chan error) (*JitMachine, error) { invocation := []string{"--binary", binaryPath, "--forks"} if cranelift { invocation = append(invocation, "--cranelift") @@ -45,9 +49,10 @@ func createJitMachine(jitBinary string, binaryPath string, cranelift bool, modul }() machine := &JitMachine{ - binary: binaryPath, - process: process, - stdin: stdin, + binary: binaryPath, + process: process, + stdin: stdin, + wasmMemoryUsageLimit: wasmMemoryUsageLimit, } return machine, nil } @@ -258,8 +263,18 @@ func (machine *JitMachine) prove( if state.BlockHash, err = readHash(); err != nil { return state, err } - state.SendRoot, err = readHash() - return state, err + if state.SendRoot, err = readHash(); err != nil { + return state, err + } + memoryUsed, err := readUint64() + if err != nil { + return state, fmt.Errorf("failed to read memory usage from Jit machine: %w", err) + } + if memoryUsed > uint64(machine.wasmMemoryUsageLimit) { + log.Warn("memory used by jit wasm exceeds the wasm memory usage limit", "limit", machine.wasmMemoryUsageLimit, "memoryUsed", memoryUsed) + } + jitWasmMemoryUsage.Update(int64(memoryUsed)) + return state, nil default: message := "inter-process communication failure" log.Error("Jit Machine Failure", "message", message) diff --git a/validator/server_jit/machine_loader.go b/validator/server_jit/machine_loader.go index 5705a9a387..3a831928b7 100644 --- a/validator/server_jit/machine_loader.go +++ b/validator/server_jit/machine_loader.go @@ -13,13 +13,15 @@ import ( ) type JitMachineConfig struct { - ProverBinPath string - JitCranelift bool + ProverBinPath string + JitCranelift bool + WasmMemoryUsageLimit int } var DefaultJitMachineConfig = JitMachineConfig{ - JitCranelift: true, - ProverBinPath: "replay.wasm", + JitCranelift: true, + ProverBinPath: "replay.wasm", + WasmMemoryUsageLimit: 4294967296, } func getJitPath() (string, error) { @@ -57,7 +59,7 @@ func NewJitMachineLoader(config *JitMachineConfig, locator *server_common.Machin } createMachineThreadFunc := func(ctx context.Context, moduleRoot common.Hash) (*JitMachine, error) { binPath := filepath.Join(locator.GetMachinePath(moduleRoot), config.ProverBinPath) - return createJitMachine(jitPath, binPath, config.JitCranelift, moduleRoot, fatalErrChan) + return createJitMachine(jitPath, binPath, config.JitCranelift, config.WasmMemoryUsageLimit, moduleRoot, fatalErrChan) } return &JitMachineLoader{ MachineLoader: *server_common.NewMachineLoader[JitMachine](locator, createMachineThreadFunc), diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index ff1749506a..6489821b5b 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -18,18 +18,23 @@ import ( type JitSpawnerConfig struct { Workers int `koanf:"workers" reload:"hot"` Cranelift bool `koanf:"cranelift"` + + // TODO: change WasmMemoryUsageLimit to a string and use resourcemanager.ParseMemLimit + WasmMemoryUsageLimit int `koanf:"wasm-memory-usage-limit"` } type JitSpawnerConfigFecher func() *JitSpawnerConfig var DefaultJitSpawnerConfig = JitSpawnerConfig{ - Workers: 0, - Cranelift: true, + Workers: 0, + Cranelift: true, + WasmMemoryUsageLimit: 4294967296, // 2^32 WASM memeory limit } func JitSpawnerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".workers", DefaultJitSpawnerConfig.Workers, "number of concurrent validation threads") f.Bool(prefix+".cranelift", DefaultJitSpawnerConfig.Cranelift, "use Cranelift instead of LLVM when validating blocks using the jit-accelerated block validator") + f.Int(prefix+".wasm-memory-usage-limit", DefaultJitSpawnerConfig.WasmMemoryUsageLimit, "if memory used by a jit wasm exceeds this limit, a warning is logged") } type JitSpawner struct { @@ -44,6 +49,7 @@ func NewJitSpawner(locator *server_common.MachineLocator, config JitSpawnerConfi // TODO - preload machines machineConfig := DefaultJitMachineConfig machineConfig.JitCranelift = config().Cranelift + machineConfig.WasmMemoryUsageLimit = config().WasmMemoryUsageLimit loader, err := NewJitMachineLoader(&machineConfig, locator, fatalErrChan) if err != nil { return nil, err