diff --git a/cache_proxy/BUILD b/cache_proxy/BUILD index 2f7fe44..5ae5f68 100644 --- a/cache_proxy/BUILD +++ b/cache_proxy/BUILD @@ -7,8 +7,16 @@ go_library( visibility = ["//visibility:public"], deps = [ "@com_github_buildbuddy_io_buildbuddy//proto:remote_execution_go_proto", + "@com_github_buildbuddy_io_buildbuddy//server/environment:go_default_library", "@com_github_buildbuddy_io_buildbuddy//server/interfaces:go_default_library", + "@com_github_buildbuddy_io_buildbuddy//server/remote_cache/byte_stream_server:go_default_library", + "@com_github_buildbuddy_io_buildbuddy//server/remote_cache/cachetools:go_default_library", + "@com_github_buildbuddy_io_buildbuddy//server/remote_cache/content_addressable_storage_server:go_default_library", + "@com_github_buildbuddy_io_buildbuddy//server/remote_cache/digest:go_default_library", + "@com_github_buildbuddy_io_buildbuddy//server/util/prefix:go_default_library", + "@com_github_buildbuddy_io_buildbuddy//server/util/status:go_default_library", "@go_googleapis//google/bytestream:bytestream_go_proto", "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//test/bufconn:go_default_library", ], ) diff --git a/cache_proxy/cache_proxy.go b/cache_proxy/cache_proxy.go index 3a4034b..d80e687 100644 --- a/cache_proxy/cache_proxy.go +++ b/cache_proxy/cache_proxy.go @@ -2,32 +2,114 @@ package cache_proxy import ( "context" + "flag" + "fmt" "io" + "io/ioutil" + "log" + "net" + "os" + "time" + "github.com/buildbuddy-io/buildbuddy/server/environment" + "github.com/buildbuddy-io/buildbuddy/server/interfaces" + "github.com/buildbuddy-io/buildbuddy/server/remote_cache/byte_stream_server" + "github.com/buildbuddy-io/buildbuddy/server/remote_cache/cachetools" + "github.com/buildbuddy-io/buildbuddy/server/remote_cache/content_addressable_storage_server" + "github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest" + "github.com/buildbuddy-io/buildbuddy/server/util/status" "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution" bspb "google.golang.org/genproto/googleapis/bytestream" ) +var ( + readThrough = flag.Bool("read_through", true, "If true, cache remote reads locally") + writeThrough = flag.Bool("write_through", true, "If true, upload writes to remote cache too") +) + const ( - // Keep under the limit of ~4MB (1024 * 1024 * 4). - readBufSizeBytes = (1024 * 1024 * 4) - 100 + queueBufferSize = 100 ) +// CacheProxy implements a local GRPC cache that proxies a remote GRPC cache. +// It implements both read-through and write-through functionality by: +// - Checking existence first against the local cache, then, if any keys are +// still not found, consulting the remote cache. +// - Reading first from the local cache, then, if a key is not found, reading +// from the remote cache and writing the fetched object to the local cache. +// - Writing to the local cache and returning success immediately to the +// client, then enqueueing a job to upload this key to the remote cache. type CacheProxy struct { acClient repb.ActionCacheClient bsClient bspb.ByteStreamClient casClient repb.ContentAddressableStorageClient cpbClient repb.CapabilitiesClient + + env environment.Env + cache interfaces.Cache + localBSS *byte_stream_server.ByteStreamServer + localCAS *content_addressable_storage_server.ContentAddressableStorageServer + localBSSClient bspb.ByteStreamClient + + qWorker *queueWorker +} + +// startServerLocally registers the localBSS and serves it over a bufconn +// (in-memory connection) that is returned to the caller. The server is shutdown +// when the provided context is cancelled. +func startServerLocally(ctx context.Context, localBSS bspb.ByteStreamServer) (*grpc.ClientConn, error) { + buffer := 1024 * 1024 * 10 + listener := bufconn.Listen(buffer) + + localGRPCServer := grpc.NewServer() + bspb.RegisterByteStreamServer(localGRPCServer, localBSS) + go func() { + if err := localGRPCServer.Serve(listener); err != nil { + log.Printf("error serving locally: %s", err.Error()) + } + listener.Close() + localGRPCServer.Stop() + }() + + conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return listener.Dial() + }), grpc.WithInsecure()) + if err != nil { + return nil, err + } + return conn, nil } -func NewCacheProxy(conn *grpc.ClientConn) (*CacheProxy, error) { +func NewCacheProxy(ctx context.Context, env environment.Env, conn *grpc.ClientConn) (*CacheProxy, error) { + if env.GetCache() == nil { + return nil, status.FailedPreconditionError("CacheProxy requires a local cache to run.") + } + cache := env.GetCache() + localBSS, err := byte_stream_server.NewByteStreamServer(env) + if err != nil { + return nil, status.InternalErrorf("CacheProxy: error starting local bytestream server: %s", err.Error()) + } + localCAS, err := content_addressable_storage_server.NewContentAddressableStorageServer(env) + if err != nil { + return nil, status.InternalErrorf("CacheProxy: error starting local CAS server: %s", err.Error()) + } + localConn, err := startServerLocally(ctx, localBSS) + localBSSClient := bspb.NewByteStreamClient(localConn) + remoteBSSClient := bspb.NewByteStreamClient(conn) return &CacheProxy{ - acClient: repb.NewActionCacheClient(conn), - bsClient: bspb.NewByteStreamClient(conn), - casClient: repb.NewContentAddressableStorageClient(conn), - cpbClient: repb.NewCapabilitiesClient(conn), + acClient: repb.NewActionCacheClient(conn), + bsClient: remoteBSSClient, + casClient: repb.NewContentAddressableStorageClient(conn), + cpbClient: repb.NewCapabilitiesClient(conn), + env: env, + cache: cache, + localBSS: localBSS, + localCAS: localCAS, + localBSSClient: localBSSClient, + qWorker: NewQueueWorker(ctx, localBSSClient, remoteBSSClient), }, nil } @@ -72,18 +154,62 @@ func (p *CacheProxy) GetTree(req *repb.GetTreeRequest, stream repb.ContentAddres } func (p *CacheProxy) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRequest) (*repb.FindMissingBlobsResponse, error) { - return p.casClient.FindMissingBlobs(ctx, req) + localMissing, err := p.localCAS.FindMissingBlobs(ctx, req) + if err == nil && len(localMissing.GetMissingBlobDigests()) == 0 { + return localMissing, nil + } + remainingReq := &repb.FindMissingBlobsRequest{ + InstanceName: req.GetInstanceName(), + } + if err == nil { + remainingReq.BlobDigests = localMissing.GetMissingBlobDigests() + } else { + remainingReq.BlobDigests = req.GetBlobDigests() + } + return p.casClient.FindMissingBlobs(ctx, remainingReq) +} + +func (p *CacheProxy) hasBlobLocally(ctx context.Context, instanceName string, d *repb.Digest) bool { + rsp, err := p.localCAS.FindMissingBlobs(ctx, &repb.FindMissingBlobsRequest{ + InstanceName: instanceName, + BlobDigests: []*repb.Digest{d}, + }) + + if err == nil && len(rsp.GetMissingBlobDigests()) == 0 { + return true + } + return false } func (p *CacheProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStream_ReadServer) error { - clientStream, err := p.bsClient.Read(stream.Context(), req) + ctx := stream.Context() + instanceName, d, err := digest.ExtractDigestFromDownloadResourceName(req.GetResourceName()) + if err == nil && p.hasBlobLocally(ctx, instanceName, d) { + return p.localBSS.Read(req, stream) + } + clientStream, err := p.bsClient.Read(ctx, req) if err != nil { return err } + + var localFile *os.File + if *readThrough { + tmpFile, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s%s-", instanceName, d.GetHash())) + if err == nil { + localFile = tmpFile + defer os.Remove(tmpFile.Name()) + } + } for { msg, err := clientStream.Recv() if err != nil { if err == io.EOF { + if localFile != nil { + ind := digest.NewInstanceNameDigest(d, instanceName) + if _, err := cachetools.UploadFromReader(ctx, p.localBSSClient, ind, localFile); err != nil { + log.Printf("error uploading from reader: %s", err.Error()) + } + } break } return err @@ -91,12 +217,18 @@ func (p *CacheProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStream_ReadServ if sendErr := stream.Send(msg); sendErr != nil { return sendErr } + if localFile != nil { + if _, err := localFile.Write(msg.GetData()); err != nil { + log.Printf("error writing data to local file: %s", err.Error()) + } + } } return nil } func (p *CacheProxy) Write(stream bspb.ByteStream_WriteServer) error { - clientStream, err := p.bsClient.Write(stream.Context()) + var wreq *bspb.WriteRequest + clientStream, err := p.localBSSClient.Write(stream.Context()) if err != nil { return err } @@ -105,6 +237,9 @@ func (p *CacheProxy) Write(stream bspb.ByteStream_WriteServer) error { if err != nil { return err } + if wreq == nil { + wreq = rsp + } if err := clientStream.Send(rsp); err != nil { return err } @@ -113,18 +248,88 @@ func (p *CacheProxy) Write(stream bspb.ByteStream_WriteServer) error { if err != nil { return err } + if *writeThrough { + if err := p.qWorker.EnqueueRemoteWrite(wreq); err != nil { + log.Printf("Error enqueueing write request to remote: %s", err.Error()) + } + } return stream.SendAndClose(lastRsp) } - } return nil } func (p *CacheProxy) QueryWriteStatus(ctx context.Context, req *bspb.QueryWriteStatusRequest) (*bspb.QueryWriteStatusResponse, error) { - // For now, just tell the client that the entire write failed and let - // them retry it. - return &bspb.QueryWriteStatusResponse{ - CommittedSize: 0, - Complete: false, - }, nil + return p.localBSS.QueryWriteStatus(ctx, req) +} + +type queueReq struct { + writeResourceName string +} +type queueWorker struct { + ctx context.Context + workQ chan queueReq + localClient bspb.ByteStreamClient + remoteClient bspb.ByteStreamClient +} + +func NewQueueWorker(ctx context.Context, localClient, remoteClient bspb.ByteStreamClient) *queueWorker { + qw := &queueWorker{ + ctx: ctx, + workQ: make(chan queueReq, queueBufferSize), + localClient: localClient, + remoteClient: remoteClient, + } + qw.Start() + return qw +} +func (qw *queueWorker) Start() { + go func() { + for { + select { + case <-qw.ctx.Done(): + break + case req := <-qw.workQ: + if err := qw.handleWriteRequest(req); err != nil { + log.Printf("Error handling write request: %s", err.Error()) + } + } + } + }() +} +func (qw *queueWorker) handleWriteRequest(qreq queueReq) error { + start := time.Now() + instanceName, d, err := digest.ExtractDigestFromUploadResourceName(qreq.writeResourceName) + if err != nil { + return err + } + tmpFile, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s%s-", instanceName, d.GetHash())) + if err != nil { + return err + } + defer os.Remove(tmpFile.Name()) + ind := digest.NewInstanceNameDigest(d, instanceName) + if err := cachetools.GetBlob(qw.ctx, qw.localClient, ind, tmpFile); err != nil { + return err + } + if _, err := tmpFile.Seek(0, io.SeekStart); err != nil { + return err + } + if _, err := cachetools.UploadFromReader(qw.ctx, qw.remoteClient, ind, tmpFile); err != nil { + return err + } + log.Printf("Handled write request: %s in %s", qreq.writeResourceName, time.Since(start)) + return nil +} + +func (qw *queueWorker) EnqueueRemoteWrite(wreq *bspb.WriteRequest) error { + req := queueReq{ + writeResourceName: wreq.GetResourceName(), + } + select { + case qw.workQ <- req: + return nil + default: + return status.ResourceExhaustedError("Queue was at capacity.") + } } diff --git a/cmd/sidecar/BUILD b/cmd/sidecar/BUILD index 5ff81f8..82ff683 100644 --- a/cmd/sidecar/BUILD +++ b/cmd/sidecar/BUILD @@ -10,6 +10,7 @@ go_library( "//devnull:go_default_library", "@com_github_buildbuddy_io_buildbuddy//proto:publish_build_event_go_proto", "@com_github_buildbuddy_io_buildbuddy//proto:remote_execution_go_proto", + "@com_github_buildbuddy_io_buildbuddy//server/backends/disk_cache:go_default_library", "@com_github_buildbuddy_io_buildbuddy//server/build_event_protocol/build_event_proxy:go_default_library", "@com_github_buildbuddy_io_buildbuddy//server/build_event_protocol/build_event_server:go_default_library", "@com_github_buildbuddy_io_buildbuddy//server/config:go_default_library", diff --git a/cmd/sidecar/sidecar.go b/cmd/sidecar/sidecar.go index 6f12e4a..ad29701 100644 --- a/cmd/sidecar/sidecar.go +++ b/cmd/sidecar/sidecar.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "log" "net" @@ -8,6 +9,7 @@ import ( "github.com/buildbuddy-io/buildbuddy-cli/cache_proxy" "github.com/buildbuddy-io/buildbuddy-cli/devnull" + "github.com/buildbuddy-io/buildbuddy/server/backends/disk_cache" "github.com/buildbuddy-io/buildbuddy/server/build_event_protocol/build_event_proxy" "github.com/buildbuddy-io/buildbuddy/server/build_event_protocol/build_event_server" "github.com/buildbuddy-io/buildbuddy/server/config" @@ -32,20 +34,22 @@ var ( listenAddr = flag.String("listen_addr", "localhost:1991", "Local address to listen on.") besBackend = flag.String("bes_backend", "grpcs://cloud.buildbuddy.io:443", "Server address to proxy build events to.") remoteCache = flag.String("remote_cache", "grpcs://cloud.buildbuddy.io:443", "Server address to cache events to.") + + cacheDir = flag.String("cache_directory", "", "Root directory to use for local cache") + cacheMaxSizeBytes = flag.Int64("cache_max_size_bytes", 0, "Max cache size, in bytes") ) -func main() { - flag.Parse() - configurator, err := config.NewConfigurator("") - if err != nil { - log.Fatalf("Error initializing Configurator: %s", err.Error()) - } +func initializeEnv(configurator *config.Configurator) *real_environment.RealEnv { healthChecker := healthcheck.NewHealthChecker(*serverType) env := real_environment.NewRealEnv(configurator, healthChecker) env.SetAuthenticator(&nullauth.NullAuthenticator{}) env.SetBuildEventHandler(&devnull.BuildEventHandler{}) + return env +} +func initializeGRPCServer(env *real_environment.RealEnv) (*grpc.Server, net.Listener) { var lis net.Listener + var err error if strings.HasPrefix(*listenAddr, "unix://") { sockPath := strings.TrimPrefix(*listenAddr, "unix://") lis, err = net.Listen("unix", sockPath) @@ -63,37 +67,71 @@ func main() { } grpcServer := grpc.NewServer(grpcOptions...) reflection.Register(grpcServer) - env.GetHealthChecker().RegisterShutdownFunction(grpc_server.GRPCShutdownFunc(grpcServer)) + return grpcServer, lis +} - if *besBackend != "" { - buildEventProxyClients := make([]pepb.PublishBuildEventClient, 0) - buildEventProxyClients = append(buildEventProxyClients, build_event_proxy.NewBuildEventProxyClient(*besBackend)) - log.Printf("Proxy: forwarding build events to: %q", *besBackend) - env.SetBuildEventProxyClients(buildEventProxyClients) +func registerBESProxy(env *real_environment.RealEnv, grpcServer *grpc.Server) { + buildEventProxyClients := make([]pepb.PublishBuildEventClient, 0) + buildEventProxyClients = append(buildEventProxyClients, build_event_proxy.NewBuildEventProxyClient(*besBackend)) + log.Printf("Proxy: forwarding build events to: %q", *besBackend) + env.SetBuildEventProxyClients(buildEventProxyClients) - // Register to handle build event protocol messages. - buildEventServer, err := build_event_server.NewBuildEventProtocolServer(env) - if err != nil { - log.Fatalf("Error initializing BuildEventProtocolServer: %s", err.Error()) - } - pepb.RegisterPublishBuildEventServer(grpcServer, buildEventServer) + // Register to handle build event protocol messages. + buildEventServer, err := build_event_server.NewBuildEventProtocolServer(env) + if err != nil { + log.Fatalf("Error initializing BuildEventProtocolServer: %s", err.Error()) } + pepb.RegisterPublishBuildEventServer(grpcServer, buildEventServer) +} - if *remoteCache != "" { - conn, err := grpc_client.DialTarget(*remoteCache) - if err != nil { - log.Fatalf("Error dialing remote cache: %s", err.Error()) - } - cacheProxy, err := cache_proxy.NewCacheProxy(conn) - if err != nil { - log.Fatalf("Error initializing cache proxy: %s", err.Error()) - } - bspb.RegisterByteStreamServer(grpcServer, cacheProxy) - repb.RegisterActionCacheServer(grpcServer, cacheProxy) - repb.RegisterContentAddressableStorageServer(grpcServer, cacheProxy) - repb.RegisterCapabilitiesServer(grpcServer, cacheProxy) +func registerCacheProxy(ctx context.Context, env *real_environment.RealEnv, grpcServer *grpc.Server) { + conn, err := grpc_client.DialTarget(*remoteCache) + if err != nil { + log.Fatalf("Error dialing remote cache: %s", err.Error()) + } + cacheProxy, err := cache_proxy.NewCacheProxy(ctx, env, conn) + if err != nil { + log.Fatalf("Error initializing cache proxy: %s", err.Error()) + } + bspb.RegisterByteStreamServer(grpcServer, cacheProxy) + repb.RegisterActionCacheServer(grpcServer, cacheProxy) + repb.RegisterContentAddressableStorageServer(grpcServer, cacheProxy) + repb.RegisterCapabilitiesServer(grpcServer, cacheProxy) +} + +func initializeDiskCache(env *real_environment.RealEnv) { + maxSizeBytes := int64(1e9) // 1 GB + if *cacheMaxSizeBytes != 0 { + maxSizeBytes = *cacheMaxSizeBytes + } + c, err := disk_cache.NewDiskCache(*cacheDir, maxSizeBytes) + if err != nil { + log.Fatalf("Error configuring cache: %s", err) + } + c.Start() + env.SetCache(c) +} + +func main() { + flag.Parse() + configurator, err := config.NewConfigurator("") + if err != nil { + log.Fatalf("Error initializing Configurator: %s", err.Error()) } + ctx := context.Background() + env := initializeEnv(configurator) + grpcServer, lis := initializeGRPCServer(env) + env.GetHealthChecker().RegisterShutdownFunction(grpc_server.GRPCShutdownFunc(grpcServer)) + if *cacheDir != "" { + initializeDiskCache(env) + } + if *besBackend != "" { + registerBESProxy(env, grpcServer) + } + if *remoteCache != "" { + registerCacheProxy(ctx, env, grpcServer) + } if *besBackend != "" || *remoteCache != "" { grpcServer.Serve(lis) } else {