From d21ccd3d1a1063000b2ca160bbe1f778b580399c Mon Sep 17 00:00:00 2001 From: or-shachar Date: Sun, 19 Nov 2023 11:21:47 +0200 Subject: [PATCH] Print download/upload speed Signed-off-by: or-shachar --- cacheproc/cacheproc.go | 6 ++- cachers/s3.go | 107 ++++++++++++++++++++++++++++++++-------- cmd/go-cacher/cacher.go | 30 ++++++----- 3 files changed, 109 insertions(+), 34 deletions(-) diff --git a/cacheproc/cacheproc.go b/cacheproc/cacheproc.go index dbd8a20..5a973fd 100644 --- a/cacheproc/cacheproc.go +++ b/cacheproc/cacheproc.go @@ -58,8 +58,10 @@ type Process struct { Puts atomic.Int64 PutErrors atomic.Int64 RemoteCacheEnabled bool - BytesDownloaded func() int64 - BytesUploaded func() int64 + KBDownloaded func() int64 + KBUploaded func() int64 + AvgKBDownloadSpeed func() float64 + AvgKBUploadSpeed func() float64 } func (p *Process) Run() error { diff --git a/cachers/s3.go b/cachers/s3.go index 2d1a730..b6b0db0 100644 --- a/cachers/s3.go +++ b/cachers/s3.go @@ -10,7 +10,9 @@ import ( "log" "runtime" "strings" + "sync" "sync/atomic" + "time" "github.com/aws/smithy-go" @@ -31,8 +33,14 @@ type S3Cache struct { s3Client *s3.Client - bytesDownloaded atomic.Int64 - bytesUploaded atomic.Int64 + bytesDownloaded atomic.Int64 + bytesUploaded atomic.Int64 + downloadCount int64 + uploadCount int64 + avgBytesDownloadSpeed float64 + avgBytesUploadSpeed float64 + downloadsStatsMutex *sync.Mutex + uploadStatsMutex *sync.Mutex } func NewS3Cache(bucketName string, cfg *aws.Config, cacheKey string, disk *DiskCache, verbose bool) *S3Cache { @@ -45,11 +53,13 @@ func NewS3Cache(bucketName string, cfg *aws.Config, cacheKey string, disk *DiskC prefix := fmt.Sprintf("cache/%s/%s/%s/%s", cacheKey, arc, os, ver) log.Printf("S3Cache: configured to s3://%s/%s", bucketName, prefix) return &S3Cache{ - Bucket: bucketName, - cfg: cfg, - diskCache: disk, - prefix: prefix, - verbose: verbose, + Bucket: bucketName, + cfg: cfg, + diskCache: disk, + prefix: prefix, + verbose: verbose, + downloadsStatsMutex: new(sync.Mutex), + uploadStatsMutex: new(sync.Mutex), } } @@ -116,6 +126,7 @@ func (c *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath var putBody io.Reader if av.Size == 0 { putBody = bytes.NewReader(nil) + diskPath, err = c.diskCache.Put(ctx, actionID, outputID, av.Size, putBody) } else { outputKey := c.outputKey(outputID) outputResult, getOutputErr := client.GetObject(ctx, &s3.GetObjectInput{ @@ -131,14 +142,31 @@ func (c *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath } return "", "", fmt.Errorf("unexpected S3 get for %s: %v", outputKey, getOutputErr) } - defer outputResult.Body.Close() - - putBody = outputResult.Body + downloadFunc := func() error { + defer outputResult.Body.Close() + putBody = outputResult.Body + diskPath, err = c.diskCache.Put(ctx, actionID, outputID, av.Size, putBody) + if err != nil { + return err + } + c.bytesDownloaded.Add(av.Size) + return nil + } + if c.verbose { + speed, err := DoAndMeasureSpeed(av.Size, downloadFunc) + if err == nil { + c.downloadsStatsMutex.Lock() + c.avgBytesDownloadSpeed = newAverage(c.avgBytesDownloadSpeed, c.downloadCount, speed) + c.downloadCount++ + c.downloadsStatsMutex.Unlock() + } + } else { + err = downloadFunc() + } } - diskPath, err = c.diskCache.Put(ctx, actionID, outputID, av.Size, putBody) - c.bytesDownloaded.Add(av.Size) return outputID, diskPath, err } + func (c *S3Cache) actionKey(actionID string) string { return fmt.Sprintf("%s/actions/%s", c.prefix, actionID) } @@ -185,22 +213,61 @@ func (c *S3Cache) Put(ctx context.Context, actionID, outputID string, size int64 }) } if size > 0 && err == nil { - outputKey := c.outputKey(outputID) - _, err = client.PutObject(ctx, &s3.PutObjectInput{ + c.uploadOutput(ctx, outputID, client, readerForS3, size) + } + c.bytesUploaded.Add(size) + + return +} + +func (c *S3Cache) uploadOutput(ctx context.Context, outputID string, client *s3.Client, readerForS3 bytes.Buffer, size int64) { + outputKey := c.outputKey(outputID) + putObjectFunc := func() error { + _, err := client.PutObject(ctx, &s3.PutObjectInput{ Bucket: &c.Bucket, Key: &outputKey, Body: &readerForS3, ContentLength: size, }) + return err } - c.bytesUploaded.Add(size) - return + if c.verbose { + speed, err := DoAndMeasureSpeed(size, putObjectFunc) + if err == nil { + c.uploadStatsMutex.Lock() + c.avgBytesUploadSpeed = newAverage(c.avgBytesUploadSpeed, c.uploadCount, speed) + c.uploadCount++ + c.uploadStatsMutex.Unlock() + } + } else { + _ = putObjectFunc() + } +} + +func (c *S3Cache) KBDownloaded() int64 { + return c.bytesDownloaded.Load() / 1024 +} + +func (c *S3Cache) KBUploaded() int64 { + return c.bytesUploaded.Load() / 1024 +} + +func (c *S3Cache) AvgKBDownloadSpeed() float64 { + return c.avgBytesDownloadSpeed / 1024 +} + +func (c *S3Cache) AvgKBUploadSpeed() float64 { + return c.avgBytesUploadSpeed / 1024 } -func (c *S3Cache) BytesDownloaded() int64 { - return c.bytesDownloaded.Load() +func newAverage(oldAverage float64, count int64, newValue float64) float64 { + return (oldAverage*float64(count) + newValue) / float64(count+1) } -func (c *S3Cache) BytesUploaded() int64 { - return c.bytesUploaded.Load() +func DoAndMeasureSpeed(dataSize int64, functionOnData func() error) (float64, error) { + start := time.Now() + err := functionOnData() + elapsed := time.Since(start) + speed := float64(dataSize) / elapsed.Seconds() + return speed, err } diff --git a/cmd/go-cacher/cacher.go b/cmd/go-cacher/cacher.go index bc0c34c..c3148e1 100644 --- a/cmd/go-cacher/cacher.go +++ b/cmd/go-cacher/cacher.go @@ -8,7 +8,6 @@ package main import ( "context" "flag" - "fmt" "log" "os" "path/filepath" @@ -98,25 +97,30 @@ func main() { } dc := &cachers.DiskCache{Dir: dir} - + zeroIntFunc := func() int64 { + return 0 + } + zeroFloatFunc := func() float64 { + return 0 + } var p *cacheproc.Process p = &cacheproc.Process{ Close: func() error { if *verbose { - summary := fmt.Sprintf("cacher: closing; %d gets (%d hits, %d misses, %d errors); %d puts (%d errors)", + log.Printf("cacher: closing; %d gets (%d hits, %d misses, %d errors); %d puts (%d errors)", p.Gets.Load(), p.GetHits.Load(), p.GetMisses.Load(), p.GetErrors.Load(), p.Puts.Load(), p.PutErrors.Load()) if p.RemoteCacheEnabled { - summary += fmt.Sprintf("; %d KB downloaded, %d KB uploaded", p.BytesDownloaded()/1024, p.BytesUploaded()/1024) + log.Printf("%d KB downloaded (%.2f/s); %d KB uploaded (%.2f/s)", p.KBDownloaded(), p.AvgKBDownloadSpeed(), p.KBUploaded(), p.AvgKBUploadSpeed()) } - log.Print(summary) } return nil }, - Get: dc.Get, - Put: dc.Put, - BytesDownloaded: func() int64 { - return 0 - }, + Get: dc.Get, + Put: dc.Put, + KBDownloaded: zeroIntFunc, + KBUploaded: zeroIntFunc, + AvgKBDownloadSpeed: zeroFloatFunc, + AvgKBUploadSpeed: zeroFloatFunc, } if *serverBase != "" { @@ -136,8 +140,10 @@ func main() { if s3Cache != nil { p.Get = s3Cache.Get p.Put = s3Cache.Put - p.BytesDownloaded = s3Cache.BytesDownloaded - p.BytesUploaded = s3Cache.BytesUploaded + p.KBDownloaded = s3Cache.KBDownloaded + p.KBUploaded = s3Cache.KBUploaded + p.AvgKBDownloadSpeed = s3Cache.AvgKBDownloadSpeed + p.AvgKBUploadSpeed = s3Cache.AvgKBUploadSpeed p.RemoteCacheEnabled = true }