Skip to content

Commit

Permalink
Merge branch 'master' into blockvalidator-pause-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Jan 8, 2024
2 parents c02e840 + 521fe53 commit 0e75be2
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 72 deletions.
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Parameters
Licensor: Offchain Labs

Licensed Work: Arbitrum Nitro
The Licensed Work is (c) 2021-2023 Offchain Labs
The Licensed Work is (c) 2021-2024 Offchain Labs

Additional Use Grant: You may use the Licensed Work in a production environment solely
to provide a point of interface to permit end users or applications
Expand All @@ -30,7 +30,7 @@ Additional Use Grant: You may use the Licensed Work in a production environment



Change Date: Dec 31, 2027
Change Date: Dec 31, 2028

Change License: Apache License Version 2.0

Expand Down
3 changes: 2 additions & 1 deletion arbitrator/jit/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl WasmEnv {
Ok(env)
}

pub fn send_results(&mut self, error: Option<String>) {
pub fn send_results(&mut self, error: Option<String>, memory_used: u64) {
let writer = match &mut self.process.socket {
Some((writer, _)) => writer,
None => return,
Expand All @@ -307,6 +307,7 @@ impl WasmEnv {
check!(socket::write_u64(writer, self.small_globals[1]));
check!(socket::write_bytes32(writer, &self.large_globals[0]));
check!(socket::write_bytes32(writer, &self.large_globals[1]));
check!(socket::write_u64(writer, memory_used));
check!(writer.flush());
}
}
Expand Down
3 changes: 2 additions & 1 deletion arbitrator/jit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ fn main() {
true => None,
false => Some(message),
};
let memory_used = memory.size().0 as u64 * 65_536;

env.send_results(error);
env.send_results(error, memory_used);
}

// require a usize be at least 32 bits wide
Expand Down
6 changes: 3 additions & 3 deletions arbitrator/jit/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape {

let value = match (object, method_name.as_slice()) {
(Ref(GO_ID), b"_makeFuncWrapper") => {
let arg = match args.get(0) {
let arg = match args.first() {
Some(arg) => arg,
None => fail!(
"Go trying to call Go._makeFuncWrapper with bad args {:?}",
Expand Down Expand Up @@ -415,7 +415,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape {
(Ref(CRYPTO_ID), b"getRandomValues") => {
let name = "crypto.getRandomValues";

let id = match args.get(0) {
let id = match args.first() {
Some(Ref(x)) => x,
_ => fail!("Go trying to call {name} with bad args {:?}", args),
};
Expand Down Expand Up @@ -456,7 +456,7 @@ pub fn js_value_new(mut env: WasmEnvMut, sp: u32) {
let args_len = sp.read_u64(2);
let args = sp.read_value_slice(args_ptr, args_len);
match class {
UINT8_ARRAY_ID => match args.get(0) {
UINT8_ARRAY_ID => match args.first() {
Some(JsValue::Number(size)) => {
let id = pool.insert(DynamicObject::Uint8Array(vec![0; *size as usize]));
sp.write_u64(4, GoValue::Object(id).encode());
Expand Down
2 changes: 1 addition & 1 deletion arbitrator/prover/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl Module {
bin.memories.len() <= 1,
"Multiple memories are not supported"
);
if let Some(limits) = bin.memories.get(0) {
if let Some(limits) = bin.memories.first() {
let page_size = Memory::PAGE_SIZE;
let initial = limits.initial; // validate() checks this is less than max::u32
let allowed = u32::MAX as u64 / Memory::PAGE_SIZE - 1; // we require the size remain *below* 2^32
Expand Down
6 changes: 3 additions & 3 deletions arbitrator/wasm-libraries/go-stub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub unsafe extern "C" fn go__syscall_js_valueNew(sp: GoStack) {
let args_len = sp.read_u64(2);
let args = read_value_slice(args_ptr, args_len);
if class == UINT8_ARRAY_ID {
if let Some(InterpValue::Number(size)) = args.get(0) {
if let Some(InterpValue::Number(size)) = args.first() {
let id = DynamicObjectPool::singleton()
.insert(DynamicObject::Uint8Array(vec![0; *size as usize]));
sp.write_u64(4, GoValue::Object(id).encode());
Expand Down Expand Up @@ -321,7 +321,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result<GoValue, String> {
let args_len = sp.read_u64(4);
let args = read_value_slice(args_ptr, args_len);
if object == InterpValue::Ref(GO_ID) && &method_name == b"_makeFuncWrapper" {
let id = args.get(0).ok_or_else(|| {
let id = args.first().ok_or_else(|| {
format!(
"Go attempting to call Go._makeFuncWrapper with bad args {:?}",
args,
Expand Down Expand Up @@ -405,7 +405,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result<GoValue, String> {
))
}
} else if object == InterpValue::Ref(CRYPTO_ID) && &method_name == b"getRandomValues" {
let id = match args.get(0) {
let id = match args.first() {
Some(InterpValue::Ref(x)) => *x,
_ => {
return Err(format!(
Expand Down
74 changes: 37 additions & 37 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
Expand All @@ -58,23 +59,22 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
messagesPerBatch *arbmath.MovingAverage[uint64]
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
Expand Down Expand Up @@ -1145,6 +1145,18 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
b.redisLock.Start(ctxIn)
b.StopWaiter.Start(ctxIn, b)
b.LaunchThread(b.pollForReverts)
commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0)
exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute)
storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute)
normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute)
accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute)
resetAllEphemeralErrs := func() {
commonEphemeralErrorHandler.Reset()
exceedMaxMempoolSizeEphemeralErrorHandler.Reset()
storageRaceEphemeralErrorHandler.Reset()
normalGasEstimationFailedEphemeralErrorHandler.Reset()
accumulatorNotFoundEphemeralErrorHandler.Reset()
}
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) {
Expand Down Expand Up @@ -1172,12 +1184,12 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
if !couldLock {
log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind")
b.building = nil
b.firstEphemeralError = time.Time{}
resetAllEphemeralErrs()
return b.config().PollInterval
}
posted, err := b.maybePostSequencerBatch(ctx)
if err == nil {
b.firstEphemeralError = time.Time{}
resetAllEphemeralErrs()
}
if err != nil {
if ctx.Err() != nil {
Expand All @@ -1186,28 +1198,16 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
}
b.building = nil
logLevel := log.Error
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
}
// Likely the inbox tracker just isn't caught up, or there's some other ephemeral error.
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
sinceFirstEphemeralError := time.Since(b.firstEphemeralError)
logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel)
// If the error matches one of these, it's only logged at debug for the first minute,
// then at warn for the next 4 minutes, then at error. If the error isn't one of these,
// it'll be logged at warn for the first minute, then at error.
ignoreAtFirst := errors.Is(err, dataposter.ErrExceedsMaxMempoolSize) ||
errors.Is(err, storage.ErrStorageRace) ||
errors.Is(err, ErrNormalGasEstimationFailed) ||
errors.Is(err, AccumulatorNotFoundErr)
if sinceFirstEphemeralError < time.Minute {
if ignoreAtFirst {
logLevel = log.Debug
} else {
logLevel = log.Warn
}
} else if sinceFirstEphemeralError < time.Minute*5 && ignoreAtFirst {
logLevel = log.Warn
}
logLevel = exceedMaxMempoolSizeEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = storageRaceEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = normalGasEstimationFailedEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = accumulatorNotFoundEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel("error posting batch", "err", err)
return b.config().ErrorDelay
} else if posted {
Expand Down
8 changes: 4 additions & 4 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,6 @@ func (n *Node) Start(ctx context.Context) error {
}

func (n *Node) StopAndWait() {
if n.Execution != nil {
n.Execution.StopAndWait()
}
if n.MaintenanceRunner != nil && n.MaintenanceRunner.Started() {
n.MaintenanceRunner.StopAndWait()
}
Expand Down Expand Up @@ -917,7 +914,10 @@ func (n *Node) StopAndWait() {
if n.DASLifecycleManager != nil {
n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second)
}
if n.Execution != nil {
n.Execution.StopAndWait()
}
if err := n.Stack.Close(); err != nil {
log.Error("error on stak close", "err", err)
log.Error("error on stack close", "err", err)
}
}
4 changes: 2 additions & 2 deletions cmd/genericconf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func (c *RpcConfig) Apply(stackConf *node.Config) {
}

func RpcConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (-1 means no limit)")
f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch")
f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (0 means no limit)")
f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch (0 means no limit)")
}
9 changes: 7 additions & 2 deletions staker/staker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/staker/txbuilder"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -401,6 +402,7 @@ func (s *Staker) Start(ctxIn context.Context) {
}
s.StopWaiter.Start(ctxIn, s)
backoff := time.Second
ephemeralErrorHandler := util.NewEphemeralErrorHandler(10*time.Minute, "is ahead of on-chain nonce", 0)
s.CallIteratively(func(ctx context.Context) (returningWait time.Duration) {
defer func() {
panicErr := recover()
Expand Down Expand Up @@ -433,6 +435,7 @@ func (s *Staker) Start(ctxIn context.Context) {
}
}
if err == nil {
ephemeralErrorHandler.Reset()
backoff = time.Second
stakerLastSuccessfulActionGauge.Update(time.Now().Unix())
stakerActionSuccessCounter.Inc(1)
Expand All @@ -444,12 +447,14 @@ func (s *Staker) Start(ctxIn context.Context) {
}
stakerActionFailureCounter.Inc(1)
backoff *= 2
logLevel := log.Error
if backoff > time.Minute {
backoff = time.Minute
log.Error("error acting as staker", "err", err)
} else {
log.Warn("error acting as staker", "err", err)
logLevel = log.Warn
}
logLevel = ephemeralErrorHandler.LogLevel(err, logLevel)
logLevel("error acting as staker", "err", err)
return backoff
})
s.CallIteratively(func(ctx context.Context) time.Duration {
Expand Down
67 changes: 67 additions & 0 deletions util/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package util

import (
"strings"
"time"

"github.com/ethereum/go-ethereum/log"
)

// EphemeralErrorHandler handles errors that are ephemeral in nature i.h these are errors
// that we would like to log as a warning unless they repeat for more than a certain duration of time.
type EphemeralErrorHandler struct {
Duration time.Duration
ErrorString string
FirstOccurrence *time.Time

IgnoreDuration time.Duration
IgnoredErrLogLevel func(string, ...interface{}) // Default IgnoredErrLogLevel is log.Debug
}

func NewEphemeralErrorHandler(duration time.Duration, errorString string, ignoreDuration time.Duration) *EphemeralErrorHandler {
return &EphemeralErrorHandler{
Duration: duration,
ErrorString: errorString,
FirstOccurrence: &time.Time{},
IgnoreDuration: ignoreDuration,
IgnoredErrLogLevel: log.Debug,
}
}

// LogLevel method defaults to returning the input currentLogLevel if the given error doesnt contain the errorSubstring,
// but if it does, then returns one of the corresponding loglevels as follows
// - IgnoredErrLogLevel - if the error has been repeating for less than the IgnoreDuration of time. Defaults to log.Debug
// - log.Warn - if the error has been repeating for less than the given duration of time
// - log.Error - Otherwise
//
// # Usage Examples
//
// ephemeralErrorHandler.Loglevel(err, log.Error)("msg")
// ephemeralErrorHandler.Loglevel(err, log.Error)("msg", "key1", val1, "key2", val2)
// ephemeralErrorHandler.Loglevel(err, log.Error)("msg", "key1", val1)
func (h *EphemeralErrorHandler) LogLevel(err error, currentLogLevel func(msg string, ctx ...interface{})) func(string, ...interface{}) {
if h.ErrorString != "" && !strings.Contains(err.Error(), h.ErrorString) {
h.Reset()
return currentLogLevel
}

if *h.FirstOccurrence == (time.Time{}) {
*h.FirstOccurrence = time.Now()
}

if h.IgnoreDuration != 0 && time.Since(*h.FirstOccurrence) < h.IgnoreDuration {
if h.IgnoredErrLogLevel != nil {
return h.IgnoredErrLogLevel
}
return log.Debug
}

if time.Since(*h.FirstOccurrence) < h.Duration {
return log.Warn
}
return log.Error
}

func (h *EphemeralErrorHandler) Reset() {
*h.FirstOccurrence = time.Time{}
}
Loading

0 comments on commit 0e75be2

Please sign in to comment.