Skip to content

Commit

Permalink
Print download/upload speed
Browse files Browse the repository at this point in the history
Signed-off-by: or-shachar <[email protected]>
  • Loading branch information
or-shachar committed Nov 19, 2023
1 parent 08b4000 commit d21ccd3
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 34 deletions.
6 changes: 4 additions & 2 deletions cacheproc/cacheproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ type Process struct {
Puts atomic.Int64
PutErrors atomic.Int64
RemoteCacheEnabled bool
BytesDownloaded func() int64
BytesUploaded func() int64
KBDownloaded func() int64
KBUploaded func() int64
AvgKBDownloadSpeed func() float64
AvgKBUploadSpeed func() float64
}

func (p *Process) Run() error {
Expand Down
107 changes: 87 additions & 20 deletions cachers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"log"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/aws/smithy-go"

Expand All @@ -31,8 +33,14 @@ type S3Cache struct {

s3Client *s3.Client

bytesDownloaded atomic.Int64
bytesUploaded atomic.Int64
bytesDownloaded atomic.Int64
bytesUploaded atomic.Int64
downloadCount int64
uploadCount int64
avgBytesDownloadSpeed float64
avgBytesUploadSpeed float64
downloadsStatsMutex *sync.Mutex
uploadStatsMutex *sync.Mutex
}

func NewS3Cache(bucketName string, cfg *aws.Config, cacheKey string, disk *DiskCache, verbose bool) *S3Cache {
Expand All @@ -45,11 +53,13 @@ func NewS3Cache(bucketName string, cfg *aws.Config, cacheKey string, disk *DiskC
prefix := fmt.Sprintf("cache/%s/%s/%s/%s", cacheKey, arc, os, ver)
log.Printf("S3Cache: configured to s3://%s/%s", bucketName, prefix)
return &S3Cache{
Bucket: bucketName,
cfg: cfg,
diskCache: disk,
prefix: prefix,
verbose: verbose,
Bucket: bucketName,
cfg: cfg,
diskCache: disk,
prefix: prefix,
verbose: verbose,
downloadsStatsMutex: new(sync.Mutex),
uploadStatsMutex: new(sync.Mutex),
}
}

Expand Down Expand Up @@ -116,6 +126,7 @@ func (c *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath
var putBody io.Reader
if av.Size == 0 {
putBody = bytes.NewReader(nil)
diskPath, err = c.diskCache.Put(ctx, actionID, outputID, av.Size, putBody)
} else {
outputKey := c.outputKey(outputID)
outputResult, getOutputErr := client.GetObject(ctx, &s3.GetObjectInput{
Expand All @@ -131,14 +142,31 @@ func (c *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath
}
return "", "", fmt.Errorf("unexpected S3 get for %s: %v", outputKey, getOutputErr)
}
defer outputResult.Body.Close()

putBody = outputResult.Body
downloadFunc := func() error {
defer outputResult.Body.Close()
putBody = outputResult.Body
diskPath, err = c.diskCache.Put(ctx, actionID, outputID, av.Size, putBody)
if err != nil {
return err
}
c.bytesDownloaded.Add(av.Size)
return nil
}
if c.verbose {
speed, err := DoAndMeasureSpeed(av.Size, downloadFunc)
if err == nil {
c.downloadsStatsMutex.Lock()
c.avgBytesDownloadSpeed = newAverage(c.avgBytesDownloadSpeed, c.downloadCount, speed)
c.downloadCount++
c.downloadsStatsMutex.Unlock()
}
} else {
err = downloadFunc()
}
}
diskPath, err = c.diskCache.Put(ctx, actionID, outputID, av.Size, putBody)
c.bytesDownloaded.Add(av.Size)
return outputID, diskPath, err
}

func (c *S3Cache) actionKey(actionID string) string {
return fmt.Sprintf("%s/actions/%s", c.prefix, actionID)
}
Expand Down Expand Up @@ -185,22 +213,61 @@ func (c *S3Cache) Put(ctx context.Context, actionID, outputID string, size int64
})
}
if size > 0 && err == nil {
outputKey := c.outputKey(outputID)
_, err = client.PutObject(ctx, &s3.PutObjectInput{
c.uploadOutput(ctx, outputID, client, readerForS3, size)
}
c.bytesUploaded.Add(size)

return
}

func (c *S3Cache) uploadOutput(ctx context.Context, outputID string, client *s3.Client, readerForS3 bytes.Buffer, size int64) {
outputKey := c.outputKey(outputID)
putObjectFunc := func() error {
_, err := client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &c.Bucket,
Key: &outputKey,
Body: &readerForS3,
ContentLength: size,
})
return err
}
c.bytesUploaded.Add(size)
return
if c.verbose {
speed, err := DoAndMeasureSpeed(size, putObjectFunc)
if err == nil {
c.uploadStatsMutex.Lock()
c.avgBytesUploadSpeed = newAverage(c.avgBytesUploadSpeed, c.uploadCount, speed)
c.uploadCount++
c.uploadStatsMutex.Unlock()
}
} else {
_ = putObjectFunc()
}
}

func (c *S3Cache) KBDownloaded() int64 {
return c.bytesDownloaded.Load() / 1024
}

func (c *S3Cache) KBUploaded() int64 {
return c.bytesUploaded.Load() / 1024
}

func (c *S3Cache) AvgKBDownloadSpeed() float64 {
return c.avgBytesDownloadSpeed / 1024
}

func (c *S3Cache) AvgKBUploadSpeed() float64 {
return c.avgBytesUploadSpeed / 1024
}

func (c *S3Cache) BytesDownloaded() int64 {
return c.bytesDownloaded.Load()
func newAverage(oldAverage float64, count int64, newValue float64) float64 {
return (oldAverage*float64(count) + newValue) / float64(count+1)
}

func (c *S3Cache) BytesUploaded() int64 {
return c.bytesUploaded.Load()
func DoAndMeasureSpeed(dataSize int64, functionOnData func() error) (float64, error) {
start := time.Now()
err := functionOnData()
elapsed := time.Since(start)
speed := float64(dataSize) / elapsed.Seconds()
return speed, err
}
30 changes: 18 additions & 12 deletions cmd/go-cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -98,25 +97,30 @@ func main() {
}

dc := &cachers.DiskCache{Dir: dir}

zeroIntFunc := func() int64 {
return 0
}
zeroFloatFunc := func() float64 {
return 0
}
var p *cacheproc.Process
p = &cacheproc.Process{
Close: func() error {
if *verbose {
summary := fmt.Sprintf("cacher: closing; %d gets (%d hits, %d misses, %d errors); %d puts (%d errors)",
log.Printf("cacher: closing; %d gets (%d hits, %d misses, %d errors); %d puts (%d errors)",
p.Gets.Load(), p.GetHits.Load(), p.GetMisses.Load(), p.GetErrors.Load(), p.Puts.Load(), p.PutErrors.Load())
if p.RemoteCacheEnabled {
summary += fmt.Sprintf("; %d KB downloaded, %d KB uploaded", p.BytesDownloaded()/1024, p.BytesUploaded()/1024)
log.Printf("%d KB downloaded (%.2f/s); %d KB uploaded (%.2f/s)", p.KBDownloaded(), p.AvgKBDownloadSpeed(), p.KBUploaded(), p.AvgKBUploadSpeed())
}
log.Print(summary)
}
return nil
},
Get: dc.Get,
Put: dc.Put,
BytesDownloaded: func() int64 {
return 0
},
Get: dc.Get,
Put: dc.Put,
KBDownloaded: zeroIntFunc,
KBUploaded: zeroIntFunc,
AvgKBDownloadSpeed: zeroFloatFunc,
AvgKBUploadSpeed: zeroFloatFunc,
}

if *serverBase != "" {
Expand All @@ -136,8 +140,10 @@ func main() {
if s3Cache != nil {
p.Get = s3Cache.Get
p.Put = s3Cache.Put
p.BytesDownloaded = s3Cache.BytesDownloaded
p.BytesUploaded = s3Cache.BytesUploaded
p.KBDownloaded = s3Cache.KBDownloaded
p.KBUploaded = s3Cache.KBUploaded
p.AvgKBDownloadSpeed = s3Cache.AvgKBDownloadSpeed
p.AvgKBUploadSpeed = s3Cache.AvgKBUploadSpeed
p.RemoteCacheEnabled = true
}

Expand Down

0 comments on commit d21ccd3

Please sign in to comment.