Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory balloon #8084

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6541,10 +6541,11 @@ def install_static_dependencies(workspace_name = "buildbuddy"):
sha256 = "a836972b8a7c34970fb9ecc44768ece172f184c5f7e2972c80033fcdcf8c1870",
urls = ["https://github.com/bazelbuild/bazelisk/releases/download/v1.17.0/bazelisk-linux-arm64"],
)
# BALLOON: Uses our current Kernel config (intended for v5.15) with CONFIG_BALLOON_COMPACTION=y
http_file(
name = "org_kernel_git_linux_kernel-vmlinux",
sha256 = "3fd19c602f2b11969ad563d4d4855c9147cf13c34238537c1e434097a11aa6b7",
urls = ["https://storage.googleapis.com/buildbuddy-tools/binaries/linux/vmlinux-v5.15-3fd19c602f2b11969ad563d4d4855c9147cf13c34238537c1e434097a11aa6b7"],
sha256 = "7ce5fd0a6cb1c6864111419cbe4917ce49f69163d3f08cc9a2aed050e4d40612",
urls = ["https://storage.googleapis.com/buildbuddy-tools/binaries/linux/vmlinux-x86_64-v6.2-7ce5fd0a6cb1c6864111419cbe4917ce49f69163d3f08cc9a2aed050e4d40612"],
executable = True,
)
http_file(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
// by SIGKILL and may be retried.
ErrSIGKILL = status.UnavailableErrorf("command was terminated by SIGKILL, likely due to executor shutdown or OOM")

DebugStreamCommandOutputs = flag.Bool("debug_stream_command_outputs", false, "If true, stream command outputs to the terminal. Intended for debugging purposes only and should not be used in production.")
DebugStreamCommandOutputs = flag.Bool("debug_stream_command_outputs", true, "If true, stream command outputs to the terminal. Intended for debugging purposes only and should not be used in production.")
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (c *FirecrackerContainer) pauseVM(ctx context.Context) error {
return nil
}

func (c *FirecrackerContainer) saveSnapshot(ctx context.Context, snapshotDetails *snapshotDetails) error {
func (c *FirecrackerContainer) saveSnapshot(ctx context.Context, snapshotDetails *snapshotDetails, removedAddresses map[int64]string) (*snaploader.CacheCowStats, error) {
ctx, span := tracing.StartSpan(ctx)
defer span.End()

Expand All @@ -862,7 +862,7 @@ func (c *FirecrackerContainer) saveSnapshot(ctx context.Context, snapshotDetails
if snapshotDetails.snapshotType == diffSnapshotType {
mergeStart := time.Now()
if err := MergeDiffSnapshot(ctx, baseMemSnapshotPath, c.memoryStore, memSnapshotPath, mergeDiffSnapshotConcurrency, mergeDiffSnapshotBlockSize); err != nil {
return status.UnknownErrorf("merge diff snapshot failed: %s", err)
return nil, status.UnknownErrorf("merge diff snapshot failed: %s", err)
}
log.CtxDebugf(ctx, "VMM merge diff snapshot took %s", time.Since(mergeStart))
// Use the merged memory snapshot.
Expand All @@ -876,7 +876,7 @@ func (c *FirecrackerContainer) saveSnapshot(ctx context.Context, snapshotDetails
memChunkDir := filepath.Join(c.getChroot(), memoryChunkDirName)
memoryStore, err := c.convertToCOW(ctx, memSnapshotPath, memChunkDir)
if err != nil {
return status.WrapError(err, "convert memory snapshot to COWStore")
return nil, status.WrapError(err, "convert memory snapshot to COWStore")
}
c.memoryStore = memoryStore
}
Expand Down Expand Up @@ -911,14 +911,16 @@ func (c *FirecrackerContainer) saveSnapshot(ctx context.Context, snapshotDetails
} else {
opts.MemSnapshotPath = memSnapshotPath
}
opts.RemovedAddresses = removedAddresses

snaploaderStart := time.Now()
if err := c.loader.CacheSnapshot(ctx, c.snapshotKeySet.GetWriteKey(), opts); err != nil {
return status.WrapError(err, "add snapshot to cache")
memoryStats, err := c.loader.CacheSnapshot(ctx, c.snapshotKeySet.GetWriteKey(), opts)
if err != nil {
return nil, status.WrapError(err, "add snapshot to cache")
}
log.CtxDebugf(ctx, "snaploader.CacheSnapshot took %s", time.Since(snaploaderStart))

return nil
return memoryStats, nil
}

func (c *FirecrackerContainer) getVMMetadata() *fcpb.VMMetadata {
Expand Down Expand Up @@ -1065,7 +1067,7 @@ func (c *FirecrackerContainer) LoadSnapshot(ctx context.Context) error {
return status.WrapError(err, "failed to init virtual block devices")
}

if err := c.setupUFFDHandler(ctx); err != nil {
if err := c.setupUFFDHandler(ctx, snap.GetRemovedAddresses()); err != nil {
return err
}

Expand Down Expand Up @@ -1706,15 +1708,15 @@ func (c *FirecrackerContainer) setupNetworking(ctx context.Context) error {
return nil
}

func (c *FirecrackerContainer) setupUFFDHandler(ctx context.Context) error {
func (c *FirecrackerContainer) setupUFFDHandler(ctx context.Context, removedAddresses map[int64]string) error {
if c.memoryStore == nil {
// No memory file to serve over UFFD; do nothing.
return nil
}
if c.uffdHandler != nil {
return status.InternalErrorf("uffd handler is already running")
}
h, err := uffd.NewHandler()
h, err := uffd.NewHandler(removedAddresses)
if err != nil {
return status.WrapError(err, "create uffd handler")
}
Expand Down Expand Up @@ -1964,8 +1966,14 @@ func (c *FirecrackerContainer) create(ctx context.Context) error {
return status.WrapError(err, "failed to init VBD mounts")
}

// TODO: Add support for free page reporting, and we will not have to manually
// control the balloon
balloon := fcclient.NewCreateBalloonHandler(1, true, 5)
machineOpts := []fcclient.Opt{
fcclient.WithLogger(getLogrusLogger()),
func(m *fcclient.Machine) {
m.Handlers.FcInit = m.Handlers.FcInit.AppendAfter(fcclient.CreateMachineHandlerName, balloon)
},
}

m, err := fcclient.NewMachine(vmCtx, *fcCfg, machineOpts...)
Expand Down Expand Up @@ -2081,6 +2089,7 @@ func (c *FirecrackerContainer) dialVMExecServer(ctx context.Context) (*grpc.Clie
// Intentionally not returning DeadlineExceededError here since it
// is not a Bazel-retryable error, but this particular timeout
// should be retryable.
log.Warningf("VM logs are %s", string(c.vmLog.Tail()))
return nil, status.UnavailableErrorf("failed to connect to VM: %s", err)
}
return nil, status.UnavailableErrorf("failed to connect to VM: %s", err)
Expand Down Expand Up @@ -2284,6 +2293,19 @@ func (c *FirecrackerContainer) Exec(ctx context.Context, cmd *repb.Command, stdi
return result
}

func (c *FirecrackerContainer) UpdateBalloon(ctx context.Context, sizeInMB int64) error {
if c.uffdHandler != nil {
log.CtxInfof(ctx, "Updating balloon VM: %dMB", sizeInMB)
err := c.machine.UpdateBalloon(ctx, sizeInMB)
if err != nil {
return err
}
// TODO: Do we need the sleep?
time.Sleep(10 * time.Second)
}
return nil
}

func (c *FirecrackerContainer) Signal(ctx context.Context, sig syscall.Signal) error {
// TODO: forward the signal as a message on any currently running vmexec
// stream.
Expand Down Expand Up @@ -2497,7 +2519,7 @@ func (c *FirecrackerContainer) Pause(ctx context.Context) error {
defer span.End()

start := time.Now()
err := c.pause(ctx)
_, err := c.pause(ctx)

pauseTime := time.Since(start)
log.CtxDebugf(ctx, "Pause took %s", pauseTime)
Expand All @@ -2506,38 +2528,44 @@ func (c *FirecrackerContainer) Pause(ctx context.Context) error {
return err
}

func (c *FirecrackerContainer) pause(ctx context.Context) error {
func (c *FirecrackerContainer) PauseWithMemoryStats(ctx context.Context) (*snaploader.CacheCowStats, error) {
return c.pause(ctx)
}

func (c *FirecrackerContainer) pause(ctx context.Context) (*snaploader.CacheCowStats, error) {
ctx, cancel := c.monitorVMContext(ctx)
defer cancel()

log.CtxInfof(ctx, "Pausing VM")

snapDetails, err := c.snapshotDetails(ctx)
if err != nil {
return err
return nil, err
}

if err = c.pauseVM(ctx); err != nil {
return err
return nil, err
}

// If an older snapshot is present -- nuke it since we're writing a new one.
if err = c.cleanupOldSnapshots(snapDetails); err != nil {
return err
return nil, err
}

if err = c.createSnapshot(ctx, snapDetails); err != nil {
return err
return nil, err
}

// Stop the VM, UFFD page fault handler, and VBD servers to ensure nothing
// is modifying the snapshot files as we save them
if err := c.stopMachine(ctx); err != nil {
return err
return nil, err
}
var removedAddresses map[int64]string
if c.uffdHandler != nil {
removedAddresses = c.uffdHandler.RemovedAddresses()
if err := c.uffdHandler.Stop(); err != nil {
return status.WrapError(err, "stop uffd handler")
return nil, status.WrapError(err, "stop uffd handler")
}
c.uffdHandler = nil
}
Expand All @@ -2546,19 +2574,20 @@ func (c *FirecrackerContainer) pause(ctx context.Context) error {
// Don't log errors here because it may succeed the second try, especially
// as we are extending the context for that cleanup.
if err := c.unmountAllVBDs(ctx, false /*logErrors*/); err != nil {
return status.WrapError(err, "unmount vbds")
return nil, status.WrapError(err, "unmount vbds")
}

if err = c.saveSnapshot(ctx, snapDetails); err != nil {
return err
memoryStats, err := c.saveSnapshot(ctx, snapDetails, removedAddresses)
if err != nil {
return nil, err
}

// Finish cleaning up VM resources
if err = c.Remove(ctx); err != nil {
return err
return nil, err
}

return nil
return memoryStats, nil
}

type snapshotDetails struct {
Expand Down
Loading