Skip to content

Commit

Permalink
Fixed memory limits rather than percentage based
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan-Wilson committed Sep 22, 2023
1 parent affaf25 commit 8dcbb84
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 27 deletions.
94 changes: 70 additions & 24 deletions arbnode/resourcemanager/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"regexp"
"strconv"
"time"
"unicode"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand All @@ -31,37 +32,83 @@ var (
// prior to RPC request handling.
//
// Must be run before the go-ethereum stack is set up (ethereum/go-ethereum/node.New).
func Init(conf *Config) {
if conf.MemLimitPercent > 0 {
node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) {
var c limitChecker
c, err := newCgroupsMemoryLimitCheckerIfSupported(conf)
if errors.Is(err, errNotSupported) {
log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling")
c = &trivialLimitChecker{}
}
func Init(conf *Config) error {
if conf.MemFreeLimit == "" {
return nil
}

limit, err := parseMemLimit(conf.MemFreeLimit)
if err != nil {
return err
}

return newHttpServer(srv, c), nil
node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) {
var c limitChecker
c, err := newCgroupsMemoryLimitCheckerIfSupported(limit)
if errors.Is(err, errNotSupported) {
log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling")
c = &trivialLimitChecker{}
}

return newHttpServer(srv, c), nil
}
return nil
}

func parseMemLimit(limitStr_ string) (int, error) {
limitStr := limitStr_
limitAccumulator := 1
done := false
for !done {
switch unicode.ToUpper(rune(limitStr[len(limitStr)-1])) {
case 'B':
limitStr = limitStr[:len(limitStr)-1]
continue
case 'K':
limitStr = limitStr[:len(limitStr)-1]
limitAccumulator *= 1024
done = true
case 'M':
limitStr = limitStr[:len(limitStr)-1]
limitAccumulator *= 1024 * 1024
done = true
case 'G':
limitStr = limitStr[:len(limitStr)-1]
limitAccumulator *= 1024 * 1024 * 1024
done = true
case 'T':
limitStr = limitStr[:len(limitStr)-1]
limitAccumulator *= 1024 * 1024 * 1024 * 1024
done = true
default:
done = true
}
}

limitInUnits, err := strconv.Atoi(limitStr)
if err != nil {
return 0, err
}

return limitAccumulator * limitInUnits, nil
}

// Config contains the configuration for resourcemanager functionality.
// Currently only a memory limit is supported, other limits may be added
// in the future.
type Config struct {
MemLimitPercent int `koanf:"mem-limit-percent" reload:"hot"`
MemFreeLimit string `koanf:"mem-free-limit" reload:"hot"`
}

// DefaultConfig has the defaul resourcemanager configuration,
// all limits are disabled.
var DefaultConfig = Config{
MemLimitPercent: 0,
MemFreeLimit: "",
}

// ConfigAddOptions adds the configuration options for resourcemanager.
func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int(prefix+".mem-limit-percent", DefaultConfig.MemLimitPercent, "RPC calls are throttled if system memory utilization exceeds this percent value, zero (default) is disabled")
f.String(prefix+".mem-free-limit", DefaultConfig.MemFreeLimit, "RPC calls are throttled if free system memory is below this amount, expressed in bytes or multiples of bytes with suffix B, K, M, G")
}

// httpServer implements http.Handler and wraps calls to inner with a resource
Expand Down Expand Up @@ -105,14 +152,14 @@ func isSupported(c limitChecker) bool {

// newCgroupsMemoryLimitCheckerIfSupported attempts to auto-discover whether
// Cgroups V1 or V2 is supported for checking system memory limits.
func newCgroupsMemoryLimitCheckerIfSupported(conf *Config) (*cgroupsMemoryLimitChecker, error) {
c := newCgroupsMemoryLimitChecker(cgroupsV1MemoryFiles, conf.MemLimitPercent)
func newCgroupsMemoryLimitCheckerIfSupported(memLimitBytes int) (*cgroupsMemoryLimitChecker, error) {
c := newCgroupsMemoryLimitChecker(cgroupsV1MemoryFiles, memLimitBytes)
if isSupported(c) {
log.Info("Cgroups v1 detected, enabling memory limit RPC throttling")
return c, nil
}

c = newCgroupsMemoryLimitChecker(cgroupsV2MemoryFiles, conf.MemLimitPercent)
c = newCgroupsMemoryLimitChecker(cgroupsV2MemoryFiles, memLimitBytes)
if isSupported(c) {
log.Info("Cgroups v2 detected, enabling memory limit RPC throttling")
return c, nil
Expand Down Expand Up @@ -152,19 +199,18 @@ var cgroupsV2MemoryFiles = cgroupsMemoryFiles{
}

type cgroupsMemoryLimitChecker struct {
files cgroupsMemoryFiles
memoryLimitPercent int
files cgroupsMemoryFiles
memLimitBytes int
}

func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memoryLimitPercent int) *cgroupsMemoryLimitChecker {
func newCgroupsMemoryLimitChecker(files cgroupsMemoryFiles, memLimitBytes int) *cgroupsMemoryLimitChecker {
return &cgroupsMemoryLimitChecker{
files: files,
memoryLimitPercent: memoryLimitPercent,
files: files,
memLimitBytes: memLimitBytes,
}
}

// isLimitExceeded checks if the system memory used exceeds the limit
// scaled by the configured memoryLimitPercent.
// isLimitExceeded checks if the system memory free is less than the limit.
//
// See the following page for details of calculating the memory used,
// which is reported as container_memory_working_set_bytes in prometheus:
Expand All @@ -181,7 +227,7 @@ func (c *cgroupsMemoryLimitChecker) isLimitExceeded() (bool, error) {
if inactive, err = readInactive(c.files.statsFile, c.files.inactiveRe); err != nil {
return false, err
}
return usage-inactive >= ((limit * c.memoryLimitPercent) / 100), nil
return limit-(usage-inactive) <= c.memLimitBytes, nil
}

func (c cgroupsMemoryLimitChecker) String() string {
Expand Down
51 changes: 48 additions & 3 deletions arbnode/resourcemanager/resource_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func makeCgroupsTestDir(cgroupDir string) cgroupsMemoryFiles {

func TestCgroupsFailIfCantOpen(t *testing.T) {
testFiles := makeCgroupsTestDir(t.TempDir())
c := newCgroupsMemoryLimitChecker(testFiles, 95)
c := newCgroupsMemoryLimitChecker(testFiles, 1024*1024*512)
if _, err := c.isLimitExceeded(); err == nil {
t.Fatal("Should fail open if can't read files")
}
Expand All @@ -59,24 +59,69 @@ func TestCgroupsFailIfCantOpen(t *testing.T) {
func TestCgroupsMemoryLimit(t *testing.T) {
for _, tc := range []struct {
desc string
sysLimit int
inactive int
usage int
memLimit string
want bool
}{
{
desc: "limit should be exceeded",
sysLimit: 1000,
inactive: 50,
usage: 1000,
memLimit: "50B",
want: true,
},
{
desc: "limit should not be exceeded",
sysLimit: 1000,
inactive: 51,
usage: 1000,
memLimit: "50b",
want: false,
},
{
desc: "limit (MB) should be exceeded",
sysLimit: 1000 * 1024 * 1024,
inactive: 50 * 1024 * 1024,
usage: 1000 * 1024 * 1024,
memLimit: "50MB",
want: true,
},
{
desc: "limit (MB) should not be exceeded",
sysLimit: 1000 * 1024 * 1024,
inactive: 1 + 50*1024*1024,
usage: 1000 * 1024 * 1024,
memLimit: "50m",
want: false,
},
{
desc: "limit (GB) should be exceeded",
sysLimit: 1000 * 1024 * 1024 * 1024,
inactive: 50 * 1024 * 1024 * 1024,
usage: 1000 * 1024 * 1024 * 1024,
memLimit: "50G",
want: true,
},
{
desc: "limit (GB) should not be exceeded",
sysLimit: 1000 * 1024 * 1024 * 1024,
inactive: 1 + 50*1024*1024*1024,
usage: 1000 * 1024 * 1024 * 1024,
memLimit: "50gb",
want: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
testFiles := makeCgroupsTestDir(t.TempDir())
c := newCgroupsMemoryLimitChecker(testFiles, 95)
if err := updateFakeCgroupFiles(c, 1000, 1000, tc.inactive); err != nil {
memLimit, err := parseMemLimit(tc.memLimit)
if err != nil {
t.Fatalf("Parsing memory limit failed: %v", err)
}
c := newCgroupsMemoryLimitChecker(testFiles, memLimit)
if err := updateFakeCgroupFiles(c, tc.sysLimit, tc.usage, tc.inactive); err != nil {
t.Fatalf("Updating cgroup files: %v", err)
}
exceeded, err := c.isLimitExceeded()
Expand Down

0 comments on commit 8dcbb84

Please sign in to comment.