From e9d697006e71c90e1538b0f61cc098a3be7caa2f Mon Sep 17 00:00:00 2001 From: Iain Macdonald Date: Wed, 28 Aug 2024 11:38:13 -0700 Subject: [PATCH] Serve GetTree requests out of the local cache via BatchReadBlobs (#7319) --- .../BUILD | 2 + ...ontent_addressable_storage_server_proxy.go | 49 +++++++--- ...t_addressable_storage_server_proxy_test.go | 90 +++++++++++++++++-- .../server/remote_execution/dirtools/BUILD | 1 + .../remote_execution/dirtools/dirtools.go | 7 +- server/remote_cache/cachetools/cachetools.go | 5 +- .../content_addressable_storage_server/BUILD | 1 + .../content_addressable_storage_server.go | 8 +- server/util/rpcutil/rpcutil.go | 2 + 9 files changed, 131 insertions(+), 34 deletions(-) diff --git a/enterprise/server/content_addressable_storage_server_proxy/BUILD b/enterprise/server/content_addressable_storage_server_proxy/BUILD index b99163e5112..ebd8735ff11 100644 --- a/enterprise/server/content_addressable_storage_server_proxy/BUILD +++ b/enterprise/server/content_addressable_storage_server_proxy/BUILD @@ -13,6 +13,8 @@ go_library( "//server/real_environment", "//server/remote_cache/digest", "//server/util/log", + "//server/util/proto", + "//server/util/rpcutil", "//server/util/status", "@org_golang_google_grpc//codes", ], diff --git a/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy.go b/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy.go index 8b32025d10e..64efd626f5e 100644 --- a/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy.go +++ b/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy.go @@ -3,12 +3,13 @@ package content_addressable_storage_server_proxy import ( "context" "fmt" - "io" "github.com/buildbuddy-io/buildbuddy/server/environment" "github.com/buildbuddy-io/buildbuddy/server/real_environment" "github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest" "github.com/buildbuddy-io/buildbuddy/server/util/log" + "github.com/buildbuddy-io/buildbuddy/server/util/proto" + "github.com/buildbuddy-io/buildbuddy/server/util/rpcutil" "github.com/buildbuddy-io/buildbuddy/server/util/status" "google.golang.org/grpc/codes" @@ -156,22 +157,44 @@ func (s *CASServerProxy) batchReadBlobsRemote(ctx context.Context, readReq *repb } func (s *CASServerProxy) GetTree(req *repb.GetTreeRequest, stream repb.ContentAddressableStorage_GetTreeServer) error { - // TODO(iain): cache these - remoteStream, err := s.remote.GetTree(stream.Context(), req) - if err != nil { - return err - } - for { - rsp, err := remoteStream.Recv() - if err == io.EOF { - break + ctx := stream.Context() + resp := repb.GetTreeResponse{} + respSizeBytes := 0 + for dirsToGet := []*repb.Digest{req.RootDigest}; len(dirsToGet) > 0; { + brbreq := repb.BatchReadBlobsRequest{ + InstanceName: req.InstanceName, + Digests: dirsToGet, + DigestFunction: req.DigestFunction, } + brbresps, err := s.BatchReadBlobs(ctx, &brbreq) if err != nil { return err } - if err = stream.Send(rsp); err != nil { - return err + + dirsToGet = []*repb.Digest{} + for _, brbresp := range brbresps.Responses { + dir := &repb.Directory{} + if err := proto.Unmarshal(brbresp.Data, dir); err != nil { + return err + } + + // Flush to the stream if adding the dir will make resp bigger than + // the maximum gRPC frame size. + dirSizeBytes := proto.Size(dir) + if int64(respSizeBytes+dirSizeBytes) > rpcutil.GRPCMaxSizeBytes { + if err := stream.Send(&resp); err != nil { + return err + } + resp = repb.GetTreeResponse{} + respSizeBytes = 0 + } + + resp.Directories = append(resp.Directories, dir) + respSizeBytes += dirSizeBytes + for _, subDir := range dir.Directories { + dirsToGet = append(dirsToGet, subDir.Digest) + } } } - return nil + return stream.Send(&resp) } diff --git a/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy_test.go b/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy_test.go index daa6b66f2fc..f16c426efaa 100644 --- a/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy_test.go +++ b/enterprise/server/content_addressable_storage_server_proxy/content_addressable_storage_server_proxy_test.go @@ -276,39 +276,111 @@ func makeTree(ctx context.Context, client bspb.ByteStreamClient, t *testing.T) ( func TestGetTree(t *testing.T) { ctx := context.Background() - conn, _, requestCounter := runRemoteCASS(ctx, testenv.GetTestEnv(t), t) + conn, unaryRequests, streamRequests := runRemoteCASS(ctx, testenv.GetTestEnv(t), t) casClient := repb.NewContentAddressableStorageClient(conn) bsClient := bspb.NewByteStreamClient(conn) proxyConn := runCASProxy(ctx, conn, testenv.GetTestEnv(t), t) casProxy := repb.NewContentAddressableStorageClient(proxyConn) bsProxy := bspb.NewByteStreamClient(proxyConn) - // Read some random digest that's not there + // Unkown tree. digest := &repb.Digest{ Hash: strings.Repeat("a", 64), SizeBytes: 15, } - _, err := casClient.GetTree(ctx, &repb.GetTreeRequest{RootDigest: digest}) require.NoError(t, err) _, err = casProxy.GetTree(ctx, &repb.GetTreeRequest{RootDigest: digest}) require.NoError(t, err) - require.Equal(t, int32(1), requestCounter.Load()) + require.Equal(t, int32(0), unaryRequests.Load()) + require.Equal(t, int32(1), streamRequests.Load()) + // Full tree written to the remote. rootDigest, files := makeTree(ctx, bsClient, t) treeFiles := cas.ReadTree(ctx, t, casClient, "", rootDigest) require.ElementsMatch(t, files, treeFiles) - requestCounter.Store(0) + unaryRequests.Store(0) + streamRequests.Store(0) + treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest) + require.ElementsMatch(t, files, treeFiles) + // The tree has 4 levels, so expect 4 unary requests. + require.Equal(t, int32(4), unaryRequests.Load()) + require.Equal(t, int32(0), streamRequests.Load()) + unaryRequests.Store(0) + streamRequests.Store(0) treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest) require.ElementsMatch(t, files, treeFiles) - require.Equal(t, int32(1), requestCounter.Load()) + require.Equal(t, int32(0), unaryRequests.Load()) + require.Equal(t, int32(0), streamRequests.Load()) + // Full tree written to the proxy. rootDigest, files = makeTree(ctx, bsProxy, t) treeFiles = cas.ReadTree(ctx, t, casClient, "", rootDigest) require.ElementsMatch(t, files, treeFiles) - requestCounter.Store(0) + unaryRequests.Store(0) + streamRequests.Store(0) + treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest) + require.ElementsMatch(t, files, treeFiles) + require.Equal(t, int32(0), unaryRequests.Load()) + require.Equal(t, int32(0), streamRequests.Load()) + + // Write two subtrees to the proxy and a root node to the remote. + firstTreeRoot, firstTreeFiles := makeTree(ctx, bsProxy, t) + secondTreeRoot, secondTreeFiles := makeTree(ctx, bsProxy, t) + root := &repb.Directory{ + Directories: []*repb.DirectoryNode{ + &repb.DirectoryNode{ + Name: "first", + Digest: firstTreeRoot, + }, + &repb.DirectoryNode{ + Name: "second", + Digest: secondTreeRoot, + }, + }, + } + rootDigest, err = cachetools.UploadProto(ctx, bsClient, "", repb.DigestFunction_SHA256, root) + files = []string{"first", "second"} + files = append(files, firstTreeFiles...) + files = append(files, secondTreeFiles...) + require.NoError(t, err) + treeFiles = cas.ReadTree(ctx, t, casClient, "", rootDigest) + require.ElementsMatch(t, files, treeFiles) + unaryRequests.Store(0) + streamRequests.Store(0) + treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest) + require.ElementsMatch(t, files, treeFiles) + // Only the root note should be read from the remote. + require.Equal(t, int32(1), unaryRequests.Load()) + require.Equal(t, int32(0), streamRequests.Load()) + + // Write two subtrees to the remote and a root node to the proxy. + firstTreeRoot, firstTreeFiles = makeTree(ctx, bsClient, t) + secondTreeRoot, secondTreeFiles = makeTree(ctx, bsClient, t) + root = &repb.Directory{ + Directories: []*repb.DirectoryNode{ + &repb.DirectoryNode{ + Name: "first", + Digest: firstTreeRoot, + }, + &repb.DirectoryNode{ + Name: "second", + Digest: secondTreeRoot, + }, + }, + } + rootDigest, err = cachetools.UploadProto(ctx, bsProxy, "", repb.DigestFunction_SHA256, root) + files = []string{"first", "second"} + files = append(files, firstTreeFiles...) + files = append(files, secondTreeFiles...) + require.NoError(t, err) + treeFiles = cas.ReadTree(ctx, t, casClient, "", rootDigest) + require.ElementsMatch(t, files, treeFiles) + unaryRequests.Store(0) + streamRequests.Store(0) treeFiles = cas.ReadTree(ctx, t, casProxy, "", rootDigest) require.ElementsMatch(t, files, treeFiles) - // TODO(iain): change this to 0 once tree caching support is added - require.Equal(t, int32(1), requestCounter.Load()) + // The subtrees but not root should be read from the remote. + require.Equal(t, int32(4), unaryRequests.Load()) + require.Equal(t, int32(0), streamRequests.Load()) } diff --git a/enterprise/server/remote_execution/dirtools/BUILD b/enterprise/server/remote_execution/dirtools/BUILD index 6fe6aec7bff..d1aa08defbe 100644 --- a/enterprise/server/remote_execution/dirtools/BUILD +++ b/enterprise/server/remote_execution/dirtools/BUILD @@ -22,6 +22,7 @@ go_library( "//server/util/fastcopy", "//server/util/log", "//server/util/proto", + "//server/util/rpcutil", "//server/util/status", "//third_party/singleflight", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", diff --git a/enterprise/server/remote_execution/dirtools/dirtools.go b/enterprise/server/remote_execution/dirtools/dirtools.go index ca99a9542e5..9b32c420357 100644 --- a/enterprise/server/remote_execution/dirtools/dirtools.go +++ b/enterprise/server/remote_execution/dirtools/dirtools.go @@ -20,6 +20,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/fastcopy" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/proto" + "github.com/buildbuddy-io/buildbuddy/server/util/rpcutil" "github.com/buildbuddy-io/buildbuddy/server/util/status" "github.com/buildbuddy-io/buildbuddy/third_party/singleflight" "golang.org/x/sync/errgroup" @@ -30,8 +31,6 @@ import ( bspb "google.golang.org/genproto/googleapis/bytestream" ) -const gRPCMaxSize = int64(4000000) - var ( enableDownloadCompresssion = flag.Bool("cache.client.enable_download_compression", true, "If true, enable compression of downloads from remote caches") ) @@ -778,7 +777,7 @@ func (ff *BatchFileFetcher) FetchFiles(filesToFetch FileMap, opts *DownloadTreeO // fit in the batch call, so we'll have to bytestream // it. size := f.d.GetSizeBytes() - if size > gRPCMaxSize || ff.env.GetContentAddressableStorageClient() == nil { + if size > rpcutil.GRPCMaxSizeBytes || ff.env.GetContentAddressableStorageClient() == nil { eg.Go(func() error { return ff.bytestreamReadFiles(ctx, ff.instanceName, f.d, f.fps, opts) }) @@ -788,7 +787,7 @@ func (ff *BatchFileFetcher) FetchFiles(filesToFetch FileMap, opts *DownloadTreeO // If the digest would push our current batch request // size over the gRPC max, dispatch the request and // start a new one. - if currentBatchRequestSize+size > gRPCMaxSize { + if currentBatchRequestSize+size > rpcutil.GRPCMaxSizeBytes { reqCopy := req eg.Go(func() error { return ff.batchDownloadFiles(ctx, reqCopy, filesToFetch, opts) diff --git a/server/remote_cache/cachetools/cachetools.go b/server/remote_cache/cachetools/cachetools.go index 7ad4ed7c00a..13a244fd31b 100644 --- a/server/remote_cache/cachetools/cachetools.go +++ b/server/remote_cache/cachetools/cachetools.go @@ -33,7 +33,6 @@ import ( const ( uploadBufSizeBytes = 1000000 // 1MB - gRPCMaxSize = int64(4000000) maxCompressionBufSize = int64(4000000) ) @@ -610,7 +609,7 @@ func (ul *BatchCASUploader) Upload(d *repb.Digest, rsc io.ReadSeekCloser) error compressor = repb.Compressor_ZSTD } - if d.GetSizeBytes() > gRPCMaxSize { + if d.GetSizeBytes() > rpcutil.GRPCMaxSizeBytes { resourceName := digest.NewResourceName(d, ul.instanceName, rspb.CacheType_CAS, ul.digestFunction) resourceName.SetCompressor(compressor) @@ -637,7 +636,7 @@ func (ul *BatchCASUploader) Upload(d *repb.Digest, rsc io.ReadSeekCloser) error b = compression.CompressZstd(nil, b) } additionalSize := int64(len(b)) - if ul.unsentBatchSize+additionalSize > gRPCMaxSize { + if ul.unsentBatchSize+additionalSize > rpcutil.GRPCMaxSizeBytes { ul.flushCurrentBatch() } ul.unsentBatchReq.Requests = append(ul.unsentBatchReq.Requests, &repb.BatchUpdateBlobsRequest_Request{ diff --git a/server/remote_cache/content_addressable_storage_server/BUILD b/server/remote_cache/content_addressable_storage_server/BUILD index 579c649ae53..e3a8dd3634b 100644 --- a/server/remote_cache/content_addressable_storage_server/BUILD +++ b/server/remote_cache/content_addressable_storage_server/BUILD @@ -24,6 +24,7 @@ go_library( "//server/util/log", "//server/util/prefix", "//server/util/proto", + "//server/util/rpcutil", "//server/util/status", "@com_github_prometheus_client_golang//prometheus", "@org_golang_google_genproto_googleapis_rpc//status", diff --git a/server/remote_cache/content_addressable_storage_server/content_addressable_storage_server.go b/server/remote_cache/content_addressable_storage_server/content_addressable_storage_server.go index e22a040b128..9f5ada3003e 100644 --- a/server/remote_cache/content_addressable_storage_server/content_addressable_storage_server.go +++ b/server/remote_cache/content_addressable_storage_server/content_addressable_storage_server.go @@ -23,6 +23,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/prefix" "github.com/buildbuddy-io/buildbuddy/server/util/proto" + "github.com/buildbuddy-io/buildbuddy/server/util/rpcutil" "github.com/buildbuddy-io/buildbuddy/server/util/status" "github.com/prometheus/client_golang/prometheus" @@ -38,10 +39,7 @@ import ( gstatus "google.golang.org/grpc/status" ) -const ( - gRPCMaxSize = int64(4194304 - 2000) - TreeCacheRemoteInstanceName = "_bb_treecache_" -) +const TreeCacheRemoteInstanceName = "_bb_treecache_" var ( enableTreeCaching = flag.Bool("cache.enable_tree_caching", true, "If true, cache GetTree responses (full and partial)") @@ -532,7 +530,7 @@ func (s *ContentAddressableStorageServer) GetTree(req *repb.GetTreeRequest, stre rn := digest.ResourceNameFromProto(dirWithDigest.ResourceName) d := rn.GetDigest() - if rspSizeBytes+d.GetSizeBytes() > gRPCMaxSize { + if rspSizeBytes+d.GetSizeBytes() > rpcutil.GRPCMaxSizeBytes { if err := stream.Send(rsp); err != nil { return err } diff --git a/server/util/rpcutil/rpcutil.go b/server/util/rpcutil/rpcutil.go index a4c6515fac4..7d9edb31739 100644 --- a/server/util/rpcutil/rpcutil.go +++ b/server/util/rpcutil/rpcutil.go @@ -7,6 +7,8 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/proto" ) +const GRPCMaxSizeBytes = int64(4 * 1000 * 1000) + type StreamMsg[T proto.Message] struct { Data T Error error