Skip to content

Commit

Permalink
Remodeled with interfaces + parallel puts
Browse files Browse the repository at this point in the history
Signed-off-by: or-shachar <[email protected]>
  • Loading branch information
or-shachar committed Nov 22, 2023
1 parent 755bc7a commit f09b126
Show file tree
Hide file tree
Showing 11 changed files with 611 additions and 472 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*~
bin/
90 changes: 15 additions & 75 deletions cacheproc/cacheproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,22 @@ import (
"log"
"os"
"sync"
"sync/atomic"

"github.com/bradfitz/go-tool-cache/cachers"

"github.com/bradfitz/go-tool-cache/wire"
)

// Process implements the cmd/go JSON protocol over stdin & stdout via three
// funcs that callers can optionally implement.
type Process struct {
// Get optionally specifies a func to look up something from the cache. If
// nil, all gets are treated as cache misses.touch
//
// The actionID is a lowercase hex string of unspecified format or length.
//
// The returned outputID must be the same outputID provided to Put earlier;
// it will be a lowercase hex string of unspecified hash function or length.
//
// On cache miss, return all zero values (no error). On cache hit, diskPath
// must be the absolute path to a regular file; its size and modtime are
// returned to cmd/go.
//
// If the returned diskPath doesn't exist, it's treated as a cache miss.
Get func(ctx context.Context, actionID string) (outputID, diskPath string, _ error)

// Put optionally specifies a func to add something to the cache.
// The actionID and objectID is a lowercase hex string of unspecified format or length.
// On success, diskPath must be the absolute path to a regular file.
// If nil, cmd/go may write to disk elsewhere as needed.
Put func(ctx context.Context, actionID, objectID string, size int64, r io.Reader) (diskPath string, _ error)

// Close optionally specifies a func to run when the cmd/go tool is
// shutting down.
Close func() error
cache cachers.LocalCache
}

Gets atomic.Int64
GetHits atomic.Int64
GetMisses atomic.Int64
GetErrors atomic.Int64
Puts atomic.Int64
PutErrors atomic.Int64
RemoteCacheEnabled bool
BytesDownloaded func() int64
BytesUploaded func() int64
AvgBytesDownloadSpeed func() float64
AvgBytesUploadSpeed func() float64
func NewCacheProc(cache cachers.LocalCache) *Process {
return &Process{
cache: cache,
}
}

func (p *Process) Run() error {
Expand All @@ -70,17 +42,10 @@ func (p *Process) Run() error {

bw := bufio.NewWriter(os.Stdout)
je := json.NewEncoder(bw)

var caps []wire.Cmd
if p.Get != nil {
caps = append(caps, "get")
}
if p.Put != nil {
caps = append(caps, "put")
}
if p.Close != nil {
caps = append(caps, "close")
if err := p.cache.Start(); err != nil {
return err
}
caps := []wire.Cmd{"get", "put", "close"}
je.Encode(&wire.Response{KnownCommands: caps})
if err := bw.Flush(); err != nil {
return err
Expand Down Expand Up @@ -130,10 +95,7 @@ func (p *Process) handleRequest(ctx context.Context, req *wire.Request, res *wir
default:
return errors.New("unknown command")
case "close":
if p.Close != nil {
return p.Close()
}
return nil
return p.cache.Close()
case "get":
return p.handleGet(ctx, req, res)
case "put":
Expand All @@ -142,21 +104,7 @@ func (p *Process) handleRequest(ctx context.Context, req *wire.Request, res *wir
}

func (p *Process) handleGet(ctx context.Context, req *wire.Request, res *wire.Response) (retErr error) {
p.Gets.Add(1)
defer func() {
if retErr != nil {
p.GetErrors.Add(1)
} else if res.Miss {
p.GetMisses.Add(1)
} else {
p.GetHits.Add(1)
}
}()
if p.Get == nil {
res.Miss = true
return nil
}
outputID, diskPath, err := p.Get(ctx, fmt.Sprintf("%x", req.ActionID))
outputID, diskPath, err := p.cache.Get(ctx, fmt.Sprintf("%x", req.ActionID))
if err != nil {
return err
}
Expand Down Expand Up @@ -190,24 +138,16 @@ func (p *Process) handleGet(ctx context.Context, req *wire.Request, res *wire.Re

func (p *Process) handlePut(ctx context.Context, req *wire.Request, res *wire.Response) (retErr error) {
actionID, objectID := fmt.Sprintf("%x", req.ActionID), fmt.Sprintf("%x", req.ObjectID)
p.Puts.Add(1)
defer func() {
if retErr != nil {
p.PutErrors.Add(1)
log.Printf("put(action %s, obj %s, %v bytes): %v", actionID, objectID, req.BodySize, retErr)
}
}()
if p.Put == nil {
if req.Body != nil {
io.Copy(io.Discard, req.Body)
}
return nil
}
var body io.Reader = req.Body
var body = req.Body
if body == nil {
body = bytes.NewReader(nil)
}
diskPath, err := p.Put(ctx, actionID, objectID, req.BodySize, body)
diskPath, err := p.cache.Put(ctx, actionID, objectID, req.BodySize, body)
if err != nil {
return err
}
Expand Down
28 changes: 28 additions & 0 deletions cachers/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cachers

import (
"context"
"io"
)

// Cache is the interface implemented by all caches.
type Cache interface {
Start() error
Close() error
Kind() string
}

// LocalCache is the basic interface for a local cache.
// It supposed to write to Disk, thus the signature include diskPath.
type LocalCache interface {
Cache
Get(ctx context.Context, actionID string) (outputID, diskPath string, err error)
Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, err error)
}

// RemoteCache is the basic interface for a remote cache.
type RemoteCache interface {
Cache
Get(ctx context.Context, actionID string) (outputID string, size int64, output io.ReadCloser, err error)
Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (err error)
}
162 changes: 162 additions & 0 deletions cachers/combined.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package cachers

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
)

// CombinedCache is a LocalCache that wraps a LocalCache and a RemoteCache.
// It also keeps times for the remote cache Download/Uploads
type CombinedCache struct {
verbose bool
localCache LocalCache
remoteCache RemoteCache
putsMetrics *timeKeeper
getsMetrics *timeKeeper
}

func (l *CombinedCache) Kind() string {
return "combined"
}

func NewCombinedCache(localCache LocalCache, remoteCache RemoteCache, verbose bool) LocalCache {
cache := &CombinedCache{
verbose: verbose,
localCache: localCache,
remoteCache: remoteCache,
putsMetrics: newTimeKeeper(),
getsMetrics: newTimeKeeper(),
}
if verbose {
cache.localCache = NewLocalCacheStates(localCache)
cache.remoteCache = NewRemoteCacheStats(remoteCache)
return NewLocalCacheStates(cache)
}
return cache
}

func (l *CombinedCache) Start() error {
err := l.localCache.Start()
if err != nil {
return fmt.Errorf("local cache start failed: %w", err)
}
err = l.remoteCache.Start()
if err != nil {
return fmt.Errorf("remote cache start failed: %w", err)
}
l.putsMetrics.Start()
l.getsMetrics.Start()
return nil
}

func (l *CombinedCache) Close() error {
err := l.localCache.Close()
if err != nil {
err = fmt.Errorf("local cache stop failed: %w", err)
}
err = l.remoteCache.Close()
if err != nil {
err = errors.Join(fmt.Errorf("remote cache stop failed: %w", err))
}
l.putsMetrics.Stop()
l.getsMetrics.Stop()
if l.verbose {
log.Printf("[%s]\tDownloads: %s, Uploads %s", l.remoteCache.Kind(), l.getsMetrics.Summary(), l.putsMetrics.Summary())
}
return err
}

func (l *CombinedCache) Get(ctx context.Context, actionID string) (outputID, diskPath string, err error) {
outputID, diskPath, err = l.localCache.Get(ctx, actionID)
if err == nil && outputID != "" {
return outputID, diskPath, nil
}
outputID, size, output, err := l.remoteCache.Get(ctx, actionID)
if err != nil {
return "", "", err
}
if outputID == "" {
return "", "", nil
}
diskPath, err = l.getsMetrics.DoWithMeasure(size, func() (string, error) {
defer output.Close()
return l.localCache.Put(ctx, actionID, outputID, size, output)
})
if err != nil {
return "", "", err
}
return outputID, diskPath, nil
}
func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, err error) {
pr, pw := io.Pipe()
diskPutCh := make(chan any, 1)
go func() {
var putBody io.Reader = pr
if size == 0 {
putBody = bytes.NewReader(nil)
}
diskPath, err := l.localCache.Put(ctx, actionID, outputID, size, putBody)
if err != nil {
diskPutCh <- err
} else {
diskPutCh <- diskPath
}
}()

var putBody io.Reader
if size == 0 {
// Special case the empty file so NewRequest sets "Content-Length: 0",
// as opposed to thinking we didn't set it and not being able to sniff its size
// from the type.
putBody = bytes.NewReader(nil)
} else {

putBody = io.TeeReader(body, pw)
}
_, err = l.putsMetrics.DoWithMeasure(size, func() (string, error) {
e := l.remoteCache.Put(ctx, actionID, outputID, size, putBody)
return "", e
})
pw.Close()
if err != nil {
return "", err
}
v := <-diskPutCh
if err, ok := v.(error); ok {
log.Printf("HTTPCache.Put local disk error: %v", err)
return "", err
}
return v.(string), nil
}

// TODO: DELETEME
// func (l *CombinedCache) putOld(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, err error) {
// var localError, remoteError error
// var bytesReaderForDisk io.Reader
// var bytesBufferRemote bytes.Buffer
// if size == 0 {
// bytesReaderForDisk = bytes.NewReader(nil)
// bytesBufferRemote = bytes.Buffer{}
// } else {
// bytesReaderForDisk = io.TeeReader(body, &bytesBufferRemote)
// }
// // TODO or-shachar: Can we stream the data in parallel to both caches?
// diskPath, localError = l.localCache.Put(ctx, actionID, outputID, size, bytesReaderForDisk)
// if localError != nil {
// return "", localError
// }
// _, remoteError = l.putsMetrics.DoWithMeasure(size, func() (string, error) {
// e := l.remoteCache.Put(ctx, actionID, outputID, size, &bytesBufferRemote)
// return "", e
// })
// if remoteError != nil {
// return "", remoteError
// }
// return diskPath, nil
// }

var _ LocalCache = &CombinedCache{}
Loading

0 comments on commit f09b126

Please sign in to comment.