Skip to content

Commit

Permalink
Add register function for migration cache (#2486)
Browse files Browse the repository at this point in the history
* Add register function for migration cache

* Cleanup

* Update off new base commit

* Address pr comments
  • Loading branch information
maggie-lou authored Sep 6, 2022
1 parent ac6e720 commit 7c2edc1
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 9 deletions.
22 changes: 13 additions & 9 deletions server/backends/disk_cache/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ const (
)

var (
rootDirectory = flag.String("cache.disk.root_directory", "", "The root directory to store all blobs in, if using disk based storage.")
partitions = flagutil.New("cache.disk.partitions", []disk.Partition{}, "")
partitionMappings = flagutil.New("cache.disk.partition_mappings", []disk.PartitionMapping{}, "")
useV2Layout = flag.Bool("cache.disk.use_v2_layout", false, "If enabled, files will be stored using the v2 layout. See disk_cache.MigrateToV2Layout for a description.")
rootDirectoryFlag = flag.String("cache.disk.root_directory", "", "The root directory to store all blobs in, if using disk based storage.")
partitionsFlag = flagutil.New("cache.disk.partitions", []disk.Partition{}, "")
partitionMappingsFlag = flagutil.New("cache.disk.partition_mappings", []disk.PartitionMapping{}, "")
useV2LayoutFlag = flag.Bool("cache.disk.use_v2_layout", false, "If enabled, files will be stored using the v2 layout. See disk_cache.MigrateToV2Layout for a description.")

migrateDiskCacheToV2AndExit = flag.Bool("migrate_disk_cache_to_v2_and_exit", false, "If true, attempt to migrate disk cache to v2 layout.")
enableLiveUpdates = flag.Bool("cache.disk.enable_live_updates", false, "If set, enable live updates of disk cache adds / removes")
Expand Down Expand Up @@ -162,17 +162,17 @@ type DiskCache struct {
}

func Register(env environment.Env) error {
if *rootDirectory == "" {
if *rootDirectoryFlag == "" {
return nil
}
if env.GetCache() != nil {
log.Warningf("Overriding configured cache with disk_cache.")
}
dc := &Options{
RootDirectory: *rootDirectory,
Partitions: *partitions,
PartitionMappings: *partitionMappings,
UseV2Layout: *useV2Layout,
RootDirectory: *rootDirectoryFlag,
Partitions: *partitionsFlag,
PartitionMappings: *partitionMappingsFlag,
UseV2Layout: *useV2LayoutFlag,
}
c, err := NewDiskCache(env, dc, cache_config.MaxSizeBytes())
if err != nil {
Expand All @@ -183,6 +183,10 @@ func Register(env environment.Env) error {
}

func NewDiskCache(env environment.Env, opts *Options, defaultMaxSizeBytes int64) (*DiskCache, error) {
if opts.RootDirectory == "" {
return nil, status.FailedPreconditionError("Disk cache root directory must be set")
}

if *migrateDiskCacheToV2AndExit {
if err := MigrateToV2Layout(opts.RootDirectory); err != nil {
log.Errorf("Migration failed: %s", err)
Expand Down
23 changes: 23 additions & 0 deletions server/backends/migration_cache/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "migration_cache",
srcs = [
"config.go",
"migration_cache.go",
],
importpath = "github.com/buildbuddy-io/buildbuddy/server/backends/migration_cache",
visibility = ["//visibility:public"],
deps = [
"//enterprise/server/backends/pebble_cache",
"//proto:remote_execution_go_proto",
"//server/backends/disk_cache",
"//server/cache/config",
"//server/environment",
"//server/interfaces",
"//server/util/disk",
"//server/util/flagutil",
"//server/util/log",
"//server/util/status",
],
)
37 changes: 37 additions & 0 deletions server/backends/migration_cache/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package migration_cache

import (
"time"

"github.com/buildbuddy-io/buildbuddy/server/util/disk"
)

type MigrationConfig struct {
Src *CacheConfig `yaml:"src"`
Dest *CacheConfig `yaml:"dest"`
}

type CacheConfig struct {
DiskConfig *DiskCacheConfig `yaml:"disk"`
PebbleConfig *PebbleCacheConfig `yaml:"pebble"`
}

type DiskCacheConfig struct {
RootDirectory string `yaml:"root_directory"`
Partitions []disk.Partition `yaml:"partitions"`
PartitionMappings []disk.PartitionMapping `yaml:"partition_mappings"`
UseV2Layout bool `yaml:"use_v2_layout"`
}

type PebbleCacheConfig struct {
RootDirectory string `yaml:"root_directory"`
Partitions []disk.Partition `yaml:"partitions"`
PartitionMappings []disk.PartitionMapping `yaml:"partition_mappings"`
MaxSizeBytes int64 `yaml:"max_size_bytes"`
BlockCacheSizeBytes int64 `yaml:"block_cache_size_bytes"`
MaxInlineFileSizeBytes int64 `yaml:"max_inline_file_size_bytes"`
AtimeUpdateThreshold *time.Duration `yaml:"atime_update_threshold"`
AtimeWriteBatchSize int `yaml:"atime_write_batch_size"`
AtimeBufferSize *int `yaml:"atime_buffer_size"`
MinEvictionAge *time.Duration `yaml:"min_eviction_age"`
}
166 changes: 166 additions & 0 deletions server/backends/migration_cache/migration_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package migration_cache

import (
"context"
"io"

"github.com/buildbuddy-io/buildbuddy/enterprise/server/backends/pebble_cache"
"github.com/buildbuddy-io/buildbuddy/server/backends/disk_cache"
"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/util/flagutil"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/status"

repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
cache_config "github.com/buildbuddy-io/buildbuddy/server/cache/config"
)

var (
cacheMigrationConfig = flagutil.New("cache.migration", MigrationConfig{}, "Config to specify the details of a cache migration")
)

type MigrationCache struct {
Src interfaces.Cache
Dest interfaces.Cache
}

func Register(env environment.Env) error {
if cacheMigrationConfig.Src == nil || cacheMigrationConfig.Dest == nil {
return nil
}
log.Infof("Registering Migration Cache")

srcCache, err := getCacheFromConfig(env, *cacheMigrationConfig.Src)
if err != nil {
return err
}
destCache, err := getCacheFromConfig(env, *cacheMigrationConfig.Dest)
if err != nil {
return err
}
mc := &MigrationCache{
Src: srcCache,
Dest: destCache,
}

if env.GetCache() != nil {
log.Warningf("Overriding configured cache with migration_cache. If running a migration, all cache configs" +
" should be nested under the cache.migration block.")
}
env.SetCache(mc)

return nil
}

func validateCacheConfig(config CacheConfig) error {
if config.PebbleConfig != nil && config.DiskConfig != nil {
return status.FailedPreconditionError("only one cache config can be set")
} else if config.PebbleConfig == nil && config.DiskConfig == nil {
return status.FailedPreconditionError("a cache config must be set")
}

return nil
}

func getCacheFromConfig(env environment.Env, cfg CacheConfig) (interfaces.Cache, error) {
err := validateCacheConfig(cfg)
if err != nil {
return nil, status.FailedPreconditionErrorf("error validating migration cache config: %s", err)
}

if cfg.DiskConfig != nil {
c, err := diskCacheFromConfig(env, cfg.DiskConfig)
if err != nil {
return nil, err
}
return c, nil
} else if cfg.PebbleConfig != nil {
c, err := pebbleCacheFromConfig(env, cfg.PebbleConfig)
if err != nil {
return nil, err
}
return c, nil
}

return nil, status.FailedPreconditionErrorf("error getting cache from migration config: no valid cache types")
}

func diskCacheFromConfig(env environment.Env, cfg *DiskCacheConfig) (*disk_cache.DiskCache, error) {
opts := &disk_cache.Options{
RootDirectory: cfg.RootDirectory,
Partitions: cfg.Partitions,
PartitionMappings: cfg.PartitionMappings,
UseV2Layout: cfg.UseV2Layout,
}
return disk_cache.NewDiskCache(env, opts, cache_config.MaxSizeBytes())
}

func pebbleCacheFromConfig(env environment.Env, cfg *PebbleCacheConfig) (*pebble_cache.PebbleCache, error) {
opts := &pebble_cache.Options{
RootDirectory: cfg.RootDirectory,
Partitions: cfg.Partitions,
PartitionMappings: cfg.PartitionMappings,
MaxSizeBytes: cfg.MaxSizeBytes,
BlockCacheSizeBytes: cfg.BlockCacheSizeBytes,
MaxInlineFileSizeBytes: cfg.MaxInlineFileSizeBytes,
AtimeUpdateThreshold: cfg.AtimeUpdateThreshold,
AtimeWriteBatchSize: cfg.AtimeWriteBatchSize,
AtimeBufferSize: cfg.AtimeBufferSize,
MinEvictionAge: cfg.MinEvictionAge,
}
c, err := pebble_cache.NewPebbleCache(env, opts)
if err != nil {
return nil, err
}

c.Start()
env.GetHealthChecker().RegisterShutdownFunction(func(ctx context.Context) error {
return c.Stop()
})
return c, nil
}

func (mc MigrationCache) WithIsolation(ctx context.Context, cacheType interfaces.CacheType, remoteInstanceName string) (interfaces.Cache, error) {
return nil, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Contains(ctx context.Context, d *repb.Digest) (bool, error) {
return false, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Metadata(ctx context.Context, d *repb.Digest) (*interfaces.CacheMetadata, error) {
return nil, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) FindMissing(ctx context.Context, digests []*repb.Digest) ([]*repb.Digest, error) {
return nil, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Get(ctx context.Context, d *repb.Digest) ([]byte, error) {
return nil, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) GetMulti(ctx context.Context, digests []*repb.Digest) (map[*repb.Digest][]byte, error) {
return nil, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Set(ctx context.Context, d *repb.Digest, data []byte) error {
return status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) SetMulti(ctx context.Context, kvs map[*repb.Digest][]byte) error {
return status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Delete(ctx context.Context, d *repb.Digest) error {
return status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Reader(ctx context.Context, d *repb.Digest, offset, limit int64) (io.ReadCloser, error) {
return nil, status.UnimplementedError("not yet implemented")
}

func (mc MigrationCache) Writer(ctx context.Context, d *repb.Digest) (io.WriteCloser, error) {
return nil, status.UnimplementedError("not yet implemented")
}
1 change: 1 addition & 0 deletions server/libmain/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//server/backends/memory_cache",
"//server/backends/memory_kvstore",
"//server/backends/memory_metrics_collector",
"//server/backends/migration_cache",
"//server/backends/repo_downloader",
"//server/backends/slack",
"//server/build_event_protocol/build_event_handler",
Expand Down
4 changes: 4 additions & 0 deletions server/libmain/libmain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/backends/memory_cache"
"github.com/buildbuddy-io/buildbuddy/server/backends/memory_kvstore"
"github.com/buildbuddy-io/buildbuddy/server/backends/memory_metrics_collector"
"github.com/buildbuddy-io/buildbuddy/server/backends/migration_cache"
"github.com/buildbuddy-io/buildbuddy/server/backends/repo_downloader"
"github.com/buildbuddy-io/buildbuddy/server/backends/slack"
"github.com/buildbuddy-io/buildbuddy/server/build_event_protocol/build_event_handler"
Expand Down Expand Up @@ -183,6 +184,9 @@ func GetConfiguredEnvironmentOrDie(healthChecker *healthcheck.HealthChecker) *re
if err := disk_cache.Register(realEnv); err != nil {
log.Fatal(err.Error())
}
if err := migration_cache.Register(realEnv); err != nil {
log.Fatal(err.Error())
}
if realEnv.GetCache() != nil {
log.Printf("Cache: BuildBuddy cache API enabled!")
}
Expand Down

0 comments on commit 7c2edc1

Please sign in to comment.