Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Proxy cache too, using external APIs.
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerwilliams committed Nov 30, 2020
1 parent e31895f commit 2547701
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 48 deletions.
8 changes: 8 additions & 0 deletions cache_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"@com_github_buildbuddy_io_buildbuddy//proto:remote_execution_go_proto",
"@com_github_buildbuddy_io_buildbuddy//server/environment:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/interfaces:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/remote_cache/byte_stream_server:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/remote_cache/cachetools:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/remote_cache/content_addressable_storage_server:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/remote_cache/digest:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/util/prefix:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/util/status:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//test/bufconn:go_default_library",
],
)
239 changes: 222 additions & 17 deletions cache_proxy/cache_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,114 @@ package cache_proxy

import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"time"

"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/byte_stream_server"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/cachetools"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/content_addressable_storage_server"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"

repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
bspb "google.golang.org/genproto/googleapis/bytestream"
)

var (
readThrough = flag.Bool("read_through", true, "If true, cache remote reads locally")
writeThrough = flag.Bool("write_through", true, "If true, upload writes to remote cache too")
)

const (
// Keep under the limit of ~4MB (1024 * 1024 * 4).
readBufSizeBytes = (1024 * 1024 * 4) - 100
queueBufferSize = 100
)

// CacheProxy implements a local GRPC cache that proxies a remote GRPC cache.
// It implements both read-through and write-through functionality by:
// - Checking existence first against the local cache, then, if any keys are
// still not found, consulting the remote cache.
// - Reading first from the local cache, then, if a key is not found, reading
// from the remote cache and writing the fetched object to the local cache.
// - Writing to the local cache and returning success immediately to the
// client, then enqueueing a job to upload this key to the remote cache.
type CacheProxy struct {
acClient repb.ActionCacheClient
bsClient bspb.ByteStreamClient
casClient repb.ContentAddressableStorageClient
cpbClient repb.CapabilitiesClient

env environment.Env
cache interfaces.Cache
localBSS *byte_stream_server.ByteStreamServer
localCAS *content_addressable_storage_server.ContentAddressableStorageServer
localBSSClient bspb.ByteStreamClient

qWorker *queueWorker
}

// startServerLocally registers the localBSS and serves it over a bufconn
// (in-memory connection) that is returned to the caller. The server is shutdown
// when the provided context is cancelled.
func startServerLocally(ctx context.Context, localBSS bspb.ByteStreamServer) (*grpc.ClientConn, error) {
buffer := 1024 * 1024 * 10
listener := bufconn.Listen(buffer)

localGRPCServer := grpc.NewServer()
bspb.RegisterByteStreamServer(localGRPCServer, localBSS)
go func() {
if err := localGRPCServer.Serve(listener); err != nil {
log.Printf("error serving locally: %s", err.Error())
}
listener.Close()
localGRPCServer.Stop()
}()

conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}), grpc.WithInsecure())
if err != nil {
return nil, err
}
return conn, nil
}

func NewCacheProxy(conn *grpc.ClientConn) (*CacheProxy, error) {
func NewCacheProxy(ctx context.Context, env environment.Env, conn *grpc.ClientConn) (*CacheProxy, error) {
if env.GetCache() == nil {
return nil, status.FailedPreconditionError("CacheProxy requires a local cache to run.")
}
cache := env.GetCache()
localBSS, err := byte_stream_server.NewByteStreamServer(env)
if err != nil {
return nil, status.InternalErrorf("CacheProxy: error starting local bytestream server: %s", err.Error())
}
localCAS, err := content_addressable_storage_server.NewContentAddressableStorageServer(env)
if err != nil {
return nil, status.InternalErrorf("CacheProxy: error starting local CAS server: %s", err.Error())
}
localConn, err := startServerLocally(ctx, localBSS)
localBSSClient := bspb.NewByteStreamClient(localConn)
remoteBSSClient := bspb.NewByteStreamClient(conn)
return &CacheProxy{
acClient: repb.NewActionCacheClient(conn),
bsClient: bspb.NewByteStreamClient(conn),
casClient: repb.NewContentAddressableStorageClient(conn),
cpbClient: repb.NewCapabilitiesClient(conn),
acClient: repb.NewActionCacheClient(conn),
bsClient: remoteBSSClient,
casClient: repb.NewContentAddressableStorageClient(conn),
cpbClient: repb.NewCapabilitiesClient(conn),
env: env,
cache: cache,
localBSS: localBSS,
localCAS: localCAS,
localBSSClient: localBSSClient,
qWorker: NewQueueWorker(ctx, localBSSClient, remoteBSSClient),
}, nil
}

Expand Down Expand Up @@ -72,31 +154,81 @@ func (p *CacheProxy) GetTree(req *repb.GetTreeRequest, stream repb.ContentAddres
}

func (p *CacheProxy) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRequest) (*repb.FindMissingBlobsResponse, error) {
return p.casClient.FindMissingBlobs(ctx, req)
localMissing, err := p.localCAS.FindMissingBlobs(ctx, req)
if err == nil && len(localMissing.GetMissingBlobDigests()) == 0 {
return localMissing, nil
}
remainingReq := &repb.FindMissingBlobsRequest{
InstanceName: req.GetInstanceName(),
}
if err == nil {
remainingReq.BlobDigests = localMissing.GetMissingBlobDigests()
} else {
remainingReq.BlobDigests = req.GetBlobDigests()
}
return p.casClient.FindMissingBlobs(ctx, remainingReq)
}

func (p *CacheProxy) hasBlobLocally(ctx context.Context, instanceName string, d *repb.Digest) bool {
rsp, err := p.localCAS.FindMissingBlobs(ctx, &repb.FindMissingBlobsRequest{
InstanceName: instanceName,
BlobDigests: []*repb.Digest{d},
})

if err == nil && len(rsp.GetMissingBlobDigests()) == 0 {
return true
}
return false
}

func (p *CacheProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStream_ReadServer) error {
clientStream, err := p.bsClient.Read(stream.Context(), req)
ctx := stream.Context()
instanceName, d, err := digest.ExtractDigestFromDownloadResourceName(req.GetResourceName())
if err == nil && p.hasBlobLocally(ctx, instanceName, d) {
return p.localBSS.Read(req, stream)
}
clientStream, err := p.bsClient.Read(ctx, req)
if err != nil {
return err
}

var localFile *os.File
if *readThrough {
tmpFile, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s%s-", instanceName, d.GetHash()))
if err == nil {
localFile = tmpFile
defer os.Remove(tmpFile.Name())
}
}
for {
msg, err := clientStream.Recv()
if err != nil {
if err == io.EOF {
if localFile != nil {
ind := digest.NewInstanceNameDigest(d, instanceName)
if _, err := cachetools.UploadFromReader(ctx, p.localBSSClient, ind, localFile); err != nil {
log.Printf("error uploading from reader: %s", err.Error())
}
}
break
}
return err
}
if sendErr := stream.Send(msg); sendErr != nil {
return sendErr
}
if localFile != nil {
if _, err := localFile.Write(msg.GetData()); err != nil {
log.Printf("error writing data to local file: %s", err.Error())
}
}
}
return nil
}

func (p *CacheProxy) Write(stream bspb.ByteStream_WriteServer) error {
clientStream, err := p.bsClient.Write(stream.Context())
var wreq *bspb.WriteRequest
clientStream, err := p.localBSSClient.Write(stream.Context())
if err != nil {
return err
}
Expand All @@ -105,6 +237,9 @@ func (p *CacheProxy) Write(stream bspb.ByteStream_WriteServer) error {
if err != nil {
return err
}
if wreq == nil {
wreq = rsp
}
if err := clientStream.Send(rsp); err != nil {
return err
}
Expand All @@ -113,18 +248,88 @@ func (p *CacheProxy) Write(stream bspb.ByteStream_WriteServer) error {
if err != nil {
return err
}
if *writeThrough {
if err := p.qWorker.EnqueueRemoteWrite(wreq); err != nil {
log.Printf("Error enqueueing write request to remote: %s", err.Error())
}
}
return stream.SendAndClose(lastRsp)
}

}
return nil
}

func (p *CacheProxy) QueryWriteStatus(ctx context.Context, req *bspb.QueryWriteStatusRequest) (*bspb.QueryWriteStatusResponse, error) {
// For now, just tell the client that the entire write failed and let
// them retry it.
return &bspb.QueryWriteStatusResponse{
CommittedSize: 0,
Complete: false,
}, nil
return p.localBSS.QueryWriteStatus(ctx, req)
}

type queueReq struct {
writeResourceName string
}
type queueWorker struct {
ctx context.Context
workQ chan queueReq
localClient bspb.ByteStreamClient
remoteClient bspb.ByteStreamClient
}

func NewQueueWorker(ctx context.Context, localClient, remoteClient bspb.ByteStreamClient) *queueWorker {
qw := &queueWorker{
ctx: ctx,
workQ: make(chan queueReq, queueBufferSize),
localClient: localClient,
remoteClient: remoteClient,
}
qw.Start()
return qw
}
func (qw *queueWorker) Start() {
go func() {
for {
select {
case <-qw.ctx.Done():
break
case req := <-qw.workQ:
if err := qw.handleWriteRequest(req); err != nil {
log.Printf("Error handling write request: %s", err.Error())
}
}
}
}()
}
func (qw *queueWorker) handleWriteRequest(qreq queueReq) error {
start := time.Now()
instanceName, d, err := digest.ExtractDigestFromUploadResourceName(qreq.writeResourceName)
if err != nil {
return err
}
tmpFile, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s%s-", instanceName, d.GetHash()))
if err != nil {
return err
}
defer os.Remove(tmpFile.Name())
ind := digest.NewInstanceNameDigest(d, instanceName)
if err := cachetools.GetBlob(qw.ctx, qw.localClient, ind, tmpFile); err != nil {
return err
}
if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := cachetools.UploadFromReader(qw.ctx, qw.remoteClient, ind, tmpFile); err != nil {
return err
}
log.Printf("Handled write request: %s in %s", qreq.writeResourceName, time.Since(start))
return nil
}

func (qw *queueWorker) EnqueueRemoteWrite(wreq *bspb.WriteRequest) error {
req := queueReq{
writeResourceName: wreq.GetResourceName(),
}
select {
case qw.workQ <- req:
return nil
default:
return status.ResourceExhaustedError("Queue was at capacity.")
}
}
1 change: 1 addition & 0 deletions cmd/sidecar/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//devnull:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//proto:publish_build_event_go_proto",
"@com_github_buildbuddy_io_buildbuddy//proto:remote_execution_go_proto",
"@com_github_buildbuddy_io_buildbuddy//server/backends/disk_cache:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/build_event_protocol/build_event_proxy:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/build_event_protocol/build_event_server:go_default_library",
"@com_github_buildbuddy_io_buildbuddy//server/config:go_default_library",
Expand Down
Loading

0 comments on commit 2547701

Please sign in to comment.