Skip to content

Commit

Permalink
Merge pull request #2017 from OffchainLabs/blockvalidator-pause-valid…
Browse files Browse the repository at this point in the history
…ation

Pause nitro node block validator validation when memory is running low
  • Loading branch information
ganeshvanahalli authored Jan 8, 2024
2 parents 521fe53 + 0e75be2 commit e065123
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 20 deletions.
32 changes: 16 additions & 16 deletions arbnode/resourcemanager/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func Init(conf *Config) error {
return nil
}

limit, err := parseMemLimit(conf.MemFreeLimit)
limit, err := ParseMemLimit(conf.MemFreeLimit)
if err != nil {
return err
}

node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) {
var c limitChecker
c, err := newCgroupsMemoryLimitCheckerIfSupported(limit)
var c LimitChecker
c, err := NewCgroupsMemoryLimitCheckerIfSupported(limit)
if errors.Is(err, errNotSupported) {
log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling")
c = &trivialLimitChecker{}
Expand All @@ -57,7 +57,7 @@ func Init(conf *Config) error {
return nil
}

func parseMemLimit(limitStr string) (int, error) {
func ParseMemLimit(limitStr string) (int, error) {
var (
limit int = 1
s string
Expand Down Expand Up @@ -105,18 +105,18 @@ func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
// limit check.
type httpServer struct {
inner http.Handler
c limitChecker
c LimitChecker
}

func newHttpServer(inner http.Handler, c limitChecker) *httpServer {
func newHttpServer(inner http.Handler, c LimitChecker) *httpServer {
return &httpServer{inner: inner, c: c}
}

// ServeHTTP passes req to inner unless any configured system resource
// limit is exceeded, in which case it returns a HTTP 429 error.
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
start := time.Now()
exceeded, err := s.c.isLimitExceeded()
exceeded, err := s.c.IsLimitExceeded()
limitCheckDurationHistogram.Update(time.Since(start).Nanoseconds())
if err != nil {
log.Error("Error checking memory limit", "err", err, "checker", s.c.String())
Expand All @@ -130,19 +130,19 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.inner.ServeHTTP(w, req)
}

type limitChecker interface {
isLimitExceeded() (bool, error)
type LimitChecker interface {
IsLimitExceeded() (bool, error)
String() string
}

func isSupported(c limitChecker) bool {
_, err := c.isLimitExceeded()
func isSupported(c LimitChecker) bool {
_, err := c.IsLimitExceeded()
return err == nil
}

// newCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether
// NewCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether
// Cgroups V1 or V2 is supported for checking system memory limits.
func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) {
func NewCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) {
c := newCgroupsMemoryLimitChecker(cgroupsV1MemoryFiles, memLimitBytes)
if isSupported(c) {
log.Info("Cgroups v1 detected, enabling memory limit RPC throttling")
Expand All @@ -161,7 +161,7 @@ func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryL
// trivialLimitChecker checks no limits, so its limits are never exceeded.
type trivialLimitChecker struct{}

func (_ trivialLimitChecker) isLimitExceeded() (bool, error) {
func (_ trivialLimitChecker) IsLimitExceeded() (bool, error) {
return false, nil
}

Expand Down Expand Up @@ -202,7 +202,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) *
}
}

// isLimitExceeded checks if the system memory free is less than the limit.
// IsLimitExceeded checks if the system memory free is less than the limit.
// It returns true if the limit is exceeded.
//
// container_memory_working_set_bytes in prometheus is calculated as
Expand All @@ -223,7 +223,7 @@ func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) *
// free memory for the page cache, to avoid cache thrashing on chain state
// access. How much "reasonable" is will depend on access patterns, state
// size, and your application's tolerance for latency.
func (c *cgroupsMemoryLimitChecker) isLimitExceeded() (bool, error) {
func (c *cgroupsMemoryLimitChecker) IsLimitExceeded() (bool, error) {
var limit, usage, active, inactive int
var err error
if limit, err = readIntFromFile(c.files.limitFile); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions arbnode/resourcemanager/resource_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func makeCgroupsTestDir(cgroupDir string) cgroupsMemoryFiles {
func TestCgroupsFailIfCantOpen(t *testing.T) {
testFiles := makeCgroupsTestDir(t.TempDir())
c := newCgroupsMemoryLimitChecker(testFiles, 1024*1024*512)
if _, err := c.isLimitExceeded(); err == nil {
if _, err := c.IsLimitExceeded(); err == nil {
t.Fatal("Should fail open if can't read files")
}
}
Expand Down Expand Up @@ -124,20 +124,20 @@ func TestCgroupsMemoryLimit(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
testFiles := makeCgroupsTestDir(t.TempDir())
memLimit, err := parseMemLimit(tc.memLimit)
memLimit, err := ParseMemLimit(tc.memLimit)
if err != nil {
t.Fatalf("Parsing memory limit failed: %v", err)
}
c := newCgroupsMemoryLimitChecker(testFiles, memLimit)
if err := updateFakeCgroupFiles(c, tc.sysLimit, tc.usage, tc.inactive, tc.active); err != nil {
t.Fatalf("Updating cgroup files: %v", err)
}
exceeded, err := c.isLimitExceeded()
exceeded, err := c.IsLimitExceeded()
if err != nil {
t.Fatalf("Checking if limit exceeded: %v", err)
}
if exceeded != tc.want {
t.Errorf("isLimitExceeded() = %t, want %t", exceeded, tc.want)
t.Errorf("IsLimitExceeded() = %t, want %t", exceeded, tc.want)
}
},
)
Expand Down
57 changes: 57 additions & 0 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/rpcclient"
Expand Down Expand Up @@ -75,6 +76,8 @@ type BlockValidator struct {
testingProgressMadeChan chan struct{}

fatalErr chan<- error

MemoryFreeLimitChecker resourcemanager.LimitChecker
}

type BlockValidatorConfig struct {
Expand All @@ -87,9 +90,21 @@ type BlockValidatorConfig struct {
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`

memoryFreeLimit int
}

func (c *BlockValidatorConfig) Validate() error {
if c.MemoryFreeLimit == "default" {
c.memoryFreeLimit = 1073741824 // 1GB
} else if c.MemoryFreeLimit != "" {
limit, err := resourcemanager.ParseMemLimit(c.MemoryFreeLimit)
if err != nil {
return fmt.Errorf("failed to parse block-validator config memory-free-limit string: %w", err)
}
c.memoryFreeLimit = limit
}
return c.ValidationServer.Validate()
}

Expand All @@ -109,6 +124,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)")
f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error")
BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f)
f.String(prefix+".memory-free-limit", DefaultBlockValidatorConfig.MemoryFreeLimit, "minimum free-memory limit after reaching which the blockvalidator pauses validation. Enabled by default as 1GB, to disable provide empty string")
}

func BlockValidatorDangerousConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -125,6 +141,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var TestBlockValidatorConfig = BlockValidatorConfig{
Expand All @@ -137,6 +154,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{
Expand Down Expand Up @@ -215,6 +233,18 @@ func NewBlockValidator(
}
streamer.SetBlockValidator(ret)
inbox.SetBlockValidator(ret)
if config().MemoryFreeLimit != "" {
limtchecker, err := resourcemanager.NewCgroupsMemoryLimitCheckerIfSupported(config().memoryFreeLimit)
if err != nil {
if config().MemoryFreeLimit == "default" {
log.Warn("Cgroups V1 or V2 is unsupported, memory-free-limit feature inside block-validator is disabled")
} else {
return nil, fmt.Errorf("failed to create MemoryFreeLimitChecker, Cgroups V1 or V2 is unsupported")
}
} else {
ret.MemoryFreeLimitChecker = limtchecker
}
}
return ret, nil
}

Expand Down Expand Up @@ -521,6 +551,15 @@ func (v *BlockValidator) iterativeValidationEntryCreator(ctx context.Context, ig
}

func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, error) {
if v.MemoryFreeLimitChecker != nil {
exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded()
if err != nil {
log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err)
}
if exceeded {
return false, nil
}
}
v.reorgMutex.RLock()
pos := v.recordSent()
created := v.created()
Expand Down Expand Up @@ -550,6 +589,15 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro
return true, nil
}
for pos <= recordUntil {
if v.MemoryFreeLimitChecker != nil {
exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded()
if err != nil {
log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err)
}
if exceeded {
return false, nil
}
}
validationStatus, found := v.validations.Load(pos)
if !found {
return false, fmt.Errorf("not found entry for pos %d", pos)
Expand Down Expand Up @@ -699,6 +747,15 @@ validationsLoop:
log.Trace("advanceValidations: no more room", "pos", pos)
return nil, nil
}
if v.MemoryFreeLimitChecker != nil {
exceeded, err := v.MemoryFreeLimitChecker.IsLimitExceeded()
if err != nil {
log.Error("error checking if free-memory limit exceeded using MemoryFreeLimitChecker", "err", err)
}
if exceeded {
return nil, nil
}
}
if currentStatus == Prepared {
input, err := validationStatus.Entry.ToInput()
if err != nil && ctx.Err() == nil {
Expand Down

0 comments on commit e065123

Please sign in to comment.