Skip to content

Commit

Permalink
Merge branch 'master' into bold
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi authored Jan 9, 2024
2 parents a58cbfe + e065123 commit b7fb566
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 92 deletions.
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Parameters
Licensor: Offchain Labs

Licensed Work: Arbitrum Nitro
The Licensed Work is (c) 2021-2023 Offchain Labs
The Licensed Work is (c) 2021-2024 Offchain Labs

Additional Use Grant: You may use the Licensed Work in a production environment solely
to provide a point of interface to permit end users or applications
Expand All @@ -30,7 +30,7 @@ Additional Use Grant: You may use the Licensed Work in a production environment



Change Date: Dec 31, 2027
Change Date: Dec 31, 2028

Change License: Apache License Version 2.0

Expand Down
3 changes: 2 additions & 1 deletion arbitrator/jit/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl WasmEnv {
Ok(env)
}

pub fn send_results(&mut self, error: Option<String>) {
pub fn send_results(&mut self, error: Option<String>, memory_used: u64) {
let writer = match &mut self.process.socket {
Some((writer, _)) => writer,
None => return,
Expand All @@ -307,6 +307,7 @@ impl WasmEnv {
check!(socket::write_u64(writer, self.small_globals[1]));
check!(socket::write_bytes32(writer, &self.large_globals[0]));
check!(socket::write_bytes32(writer, &self.large_globals[1]));
check!(socket::write_u64(writer, memory_used));
check!(writer.flush());
}
}
Expand Down
3 changes: 2 additions & 1 deletion arbitrator/jit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ fn main() {
true => None,
false => Some(message),
};
let memory_used = memory.size().0 as u64 * 65_536;

env.send_results(error);
env.send_results(error, memory_used);
}

// require a usize be at least 32 bits wide
Expand Down
6 changes: 3 additions & 3 deletions arbitrator/jit/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape {

let value = match (object, method_name.as_slice()) {
(Ref(GO_ID), b"_makeFuncWrapper") => {
let arg = match args.get(0) {
let arg = match args.first() {
Some(arg) => arg,
None => fail!(
"Go trying to call Go._makeFuncWrapper with bad args {:?}",
Expand Down Expand Up @@ -415,7 +415,7 @@ pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape {
(Ref(CRYPTO_ID), b"getRandomValues") => {
let name = "crypto.getRandomValues";

let id = match args.get(0) {
let id = match args.first() {
Some(Ref(x)) => x,
_ => fail!("Go trying to call {name} with bad args {:?}", args),
};
Expand Down Expand Up @@ -456,7 +456,7 @@ pub fn js_value_new(mut env: WasmEnvMut, sp: u32) {
let args_len = sp.read_u64(2);
let args = sp.read_value_slice(args_ptr, args_len);
match class {
UINT8_ARRAY_ID => match args.get(0) {
UINT8_ARRAY_ID => match args.first() {
Some(JsValue::Number(size)) => {
let id = pool.insert(DynamicObject::Uint8Array(vec![0; *size as usize]));
sp.write_u64(4, GoValue::Object(id).encode());
Expand Down
2 changes: 1 addition & 1 deletion arbitrator/prover/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl Module {
bin.memories.len() <= 1,
"Multiple memories are not supported"
);
if let Some(limits) = bin.memories.get(0) {
if let Some(limits) = bin.memories.first() {
let page_size = Memory::PAGE_SIZE;
let initial = limits.initial; // validate() checks this is less than max::u32
let allowed = u32::MAX as u64 / Memory::PAGE_SIZE - 1; // we require the size remain *below* 2^32
Expand Down
6 changes: 3 additions & 3 deletions arbitrator/wasm-libraries/go-stub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub unsafe extern "C" fn go__syscall_js_valueNew(sp: GoStack) {
let args_len = sp.read_u64(2);
let args = read_value_slice(args_ptr, args_len);
if class == UINT8_ARRAY_ID {
if let Some(InterpValue::Number(size)) = args.get(0) {
if let Some(InterpValue::Number(size)) = args.first() {
let id = DynamicObjectPool::singleton()
.insert(DynamicObject::Uint8Array(vec![0; *size as usize]));
sp.write_u64(4, GoValue::Object(id).encode());
Expand Down Expand Up @@ -321,7 +321,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result<GoValue, String> {
let args_len = sp.read_u64(4);
let args = read_value_slice(args_ptr, args_len);
if object == InterpValue::Ref(GO_ID) && &method_name == b"_makeFuncWrapper" {
let id = args.get(0).ok_or_else(|| {
let id = args.first().ok_or_else(|| {
format!(
"Go attempting to call Go._makeFuncWrapper with bad args {:?}",
args,
Expand Down Expand Up @@ -405,7 +405,7 @@ unsafe fn value_call_impl(sp: &mut GoStack) -> Result<GoValue, String> {
))
}
} else if object == InterpValue::Ref(CRYPTO_ID) && &method_name == b"getRandomValues" {
let id = match args.get(0) {
let id = match args.first() {
Some(InterpValue::Ref(x)) => *x,
_ => {
return Err(format!(
Expand Down
74 changes: 37 additions & 37 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
Expand All @@ -58,23 +59,22 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
messagesPerBatch *arbmath.MovingAverage[uint64]
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
Expand Down Expand Up @@ -1145,6 +1145,18 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
b.redisLock.Start(ctxIn)
b.StopWaiter.Start(ctxIn, b)
b.LaunchThread(b.pollForReverts)
commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0)
exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute)
storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute)
normalGasEstimationFailedEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, ErrNormalGasEstimationFailed.Error(), time.Minute)
accumulatorNotFoundEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, AccumulatorNotFoundErr.Error(), time.Minute)
resetAllEphemeralErrs := func() {
commonEphemeralErrorHandler.Reset()
exceedMaxMempoolSizeEphemeralErrorHandler.Reset()
storageRaceEphemeralErrorHandler.Reset()
normalGasEstimationFailedEphemeralErrorHandler.Reset()
accumulatorNotFoundEphemeralErrorHandler.Reset()
}
b.CallIteratively(func(ctx context.Context) time.Duration {
var err error
if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) {
Expand Down Expand Up @@ -1172,12 +1184,12 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
if !couldLock {
log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind")
b.building = nil
b.firstEphemeralError = time.Time{}
resetAllEphemeralErrs()
return b.config().PollInterval
}
posted, err := b.maybePostSequencerBatch(ctx)
if err == nil {
b.firstEphemeralError = time.Time{}
resetAllEphemeralErrs()
}
if err != nil {
if ctx.Err() != nil {
Expand All @@ -1186,28 +1198,16 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
}
b.building = nil
logLevel := log.Error
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
}
// Likely the inbox tracker just isn't caught up, or there's some other ephemeral error.
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
sinceFirstEphemeralError := time.Since(b.firstEphemeralError)
logLevel = commonEphemeralErrorHandler.LogLevel(err, logLevel)
// If the error matches one of these, it's only logged at debug for the first minute,
// then at warn for the next 4 minutes, then at error. If the error isn't one of these,
// it'll be logged at warn for the first minute, then at error.
ignoreAtFirst := errors.Is(err, dataposter.ErrExceedsMaxMempoolSize) ||
errors.Is(err, storage.ErrStorageRace) ||
errors.Is(err, ErrNormalGasEstimationFailed) ||
errors.Is(err, AccumulatorNotFoundErr)
if sinceFirstEphemeralError < time.Minute {
if ignoreAtFirst {
logLevel = log.Debug
} else {
logLevel = log.Warn
}
} else if sinceFirstEphemeralError < time.Minute*5 && ignoreAtFirst {
logLevel = log.Warn
}
logLevel = exceedMaxMempoolSizeEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = storageRaceEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = normalGasEstimationFailedEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel = accumulatorNotFoundEphemeralErrorHandler.LogLevel(err, logLevel)
logLevel("error posting batch", "err", err)
return b.config().ErrorDelay
} else if posted {
Expand Down
8 changes: 4 additions & 4 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,6 @@ func (n *Node) Start(ctx context.Context) error {
}

func (n *Node) StopAndWait() {
if n.Execution != nil {
n.Execution.StopAndWait()
}
if n.MaintenanceRunner != nil && n.MaintenanceRunner.Started() {
n.MaintenanceRunner.StopAndWait()
}
Expand Down Expand Up @@ -917,7 +914,10 @@ func (n *Node) StopAndWait() {
if n.DASLifecycleManager != nil {
n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second)
}
if n.Execution != nil {
n.Execution.StopAndWait()
}
if err := n.Stack.Close(); err != nil {
log.Error("error on stak close", "err", err)
log.Error("error on stack close", "err", err)
}
}
32 changes: 16 additions & 16 deletions arbnode/resourcemanager/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func Init(conf *Config) error {
return nil
}

limit, err := parseMemLimit(conf.MemFreeLimit)
limit, err := ParseMemLimit(conf.MemFreeLimit)
if err != nil {
return err
}

node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) {
var c limitChecker
c, err := newCgroupsMemoryLimitCheckerIfSupported(limit)
var c LimitChecker
c, err := NewCgroupsMemoryLimitCheckerIfSupported(limit)
if errors.Is(err, errNotSupported) {
log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling")
c = &trivialLimitChecker{}
Expand All @@ -57,7 +57,7 @@ func Init(conf *Config) error {
return nil
}

func parseMemLimit(limitStr string) (int, error) {
func ParseMemLimit(limitStr string) (int, error) {
var (
limit int = 1
s string
Expand Down Expand Up @@ -105,18 +105,18 @@ func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
// limit check.
type httpServer struct {
inner http.Handler
c limitChecker
c LimitChecker
}

func newHttpServer(inner http.Handler, c limitChecker) *httpServer {
func newHttpServer(inner http.Handler, c LimitChecker) *httpServer {
return &httpServer{inner: inner, c: c}
}

// ServeHTTP passes req to inner unless any configured system resource
// limit is exceeded, in which case it returns a HTTP 429 error.
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
start := time.Now()
exceeded, err := s.c.isLimitExceeded()
exceeded, err := s.c.IsLimitExceeded()
limitCheckDurationHistogram.Update(time.Since(start).Nanoseconds())
if err != nil {
log.Error("Error checking memory limit", "err", err, "checker", s.c.String())
Expand All @@ -130,19 +130,19 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.inner.ServeHTTP(w, req)
}

type limitChecker interface {
isLimitExceeded() (bool, error)
type LimitChecker interface {
IsLimitExceeded() (bool, error)
String() string
}

func isSupported(c limitChecker) bool {
_, err := c.isLimitExceeded()
func isSupported(c LimitChecker) bool {
_, err := c.IsLimitExceeded()
return err == nil
}

// newCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether
// NewCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether
// Cgroups V1 or V2 is supported for checking system memory limits.
func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) {
func NewCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) {
c := newCgroupsMemoryLimitChecker(cgroupsV1MemoryFiles, memLimitBytes)
if isSupported(c) {
log.Info("Cgroups v1 detected, enabling memory limit RPC throttling")
Expand All @@ -161,7 +161,7 @@ func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryL
// trivialLimitChecker checks no limits, so its limits are never exceeded.
type trivialLimitChecker struct{}

func (_ trivialLimitChecker) isLimitExceeded() (bool, error) {
func (_ trivialLimitChecker) IsLimitExceeded() (bool, error) {
return false, nil
}

Expand Down Expand Up @@ -202,7 +202,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) *
}
}

// isLimitExceeded checks if the system memory free is less than the limit.
// IsLimitExceeded checks if the system memory free is less than the limit.
// It returns true if the limit is exceeded.
//
// container_memory_working_set_bytes in prometheus is calculated as
Expand All @@ -223,7 +223,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) *
// free memory for the page cache, to avoid cache thrashing on chain state
// access. How much "reasonable" is will depend on access patterns, state
// size, and your application's tolerance for latency.
func (c *cgroupsMemoryLimitChecker) isLimitExceeded() (bool, error) {
func (c *cgroupsMemoryLimitChecker) IsLimitExceeded() (bool, error) {
var limit, usage, active, inactive int
var err error
if limit, err = readIntFromFile(c.files.limitFile); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions arbnode/resourcemanager/resource_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func makeCgroupsTestDir(cgroupDir string) cgroupsMemoryFiles {
func TestCgroupsFailIfCantOpen(t *testing.T) {
testFiles := makeCgroupsTestDir(t.TempDir())
c := newCgroupsMemoryLimitChecker(testFiles, 1024*1024*512)
if _, err := c.isLimitExceeded(); err == nil {
if _, err := c.IsLimitExceeded(); err == nil {
t.Fatal("Should fail open if can't read files")
}
}
Expand Down Expand Up @@ -124,20 +124,20 @@ func TestCgroupsMemoryLimit(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
testFiles := makeCgroupsTestDir(t.TempDir())
memLimit, err := parseMemLimit(tc.memLimit)
memLimit, err := ParseMemLimit(tc.memLimit)
if err != nil {
t.Fatalf("Parsing memory limit failed: %v", err)
}
c := newCgroupsMemoryLimitChecker(testFiles, memLimit)
if err := updateFakeCgroupFiles(c, tc.sysLimit, tc.usage, tc.inactive, tc.active); err != nil {
t.Fatalf("Updating cgroup files: %v", err)
}
exceeded, err := c.isLimitExceeded()
exceeded, err := c.IsLimitExceeded()
if err != nil {
t.Fatalf("Checking if limit exceeded: %v", err)
}
if exceeded != tc.want {
t.Errorf("isLimitExceeded() = %t, want %t", exceeded, tc.want)
t.Errorf("IsLimitExceeded() = %t, want %t", exceeded, tc.want)
}
},
)
Expand Down
4 changes: 2 additions & 2 deletions cmd/genericconf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func (c *RpcConfig) Apply(stackConf *node.Config) {
}

func RpcConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (-1 means no limit)")
f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch")
f.Int(prefix+".max-batch-response-size", DefaultRpcConfig.MaxBatchResponseSize, "the maximum response size for a JSON-RPC request measured in bytes (0 means no limit)")
f.Int(prefix+".batch-request-limit", DefaultRpcConfig.BatchRequestLimit, "the maximum number of requests in a batch (0 means no limit)")
}
Loading

0 comments on commit b7fb566

Please sign in to comment.