Skip to content

Commit

Permalink
Using channels
Browse files Browse the repository at this point in the history
  • Loading branch information
or-shachar committed Nov 19, 2023
1 parent 2916100 commit 230040c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 27 deletions.
80 changes: 53 additions & 27 deletions cachers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"io"
"log"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/aws/smithy-go"
Expand All @@ -32,14 +30,19 @@ type S3Cache struct {

s3Client *s3.Client

bytesDownloaded atomic.Int64
bytesUploaded atomic.Int64
bytesDownloaded int64
bytesUploaded int64
downloadCount int64
uploadCount int64
avgBytesDownloadSpeed float64
avgBytesUploadSpeed float64
downloadsStatsMutex *sync.Mutex
uploadStatsMutex *sync.Mutex
uploadStatsChan chan *Stats
downloadStatsChan chan *Stats
}

type Stats struct {
Bytes int64
Speed float64
}

func NewS3Cache(bucketName string, cfg *aws.Config, cacheKey string, disk *DiskCache, verbose bool) *S3Cache {
Expand All @@ -49,15 +52,17 @@ func NewS3Cache(bucketName string, cfg *aws.Config, cacheKey string, disk *DiskC
os := runtime.GOOS
prefix := fmt.Sprintf("cache/%s/%s/%s", cacheKey, arc, os)
log.Printf("S3Cache: configured to s3://%s/%s", bucketName, prefix)
return &S3Cache{
Bucket: bucketName,
cfg: cfg,
diskCache: disk,
prefix: prefix,
verbose: verbose,
downloadsStatsMutex: new(sync.Mutex),
uploadStatsMutex: new(sync.Mutex),
cache := &S3Cache{
Bucket: bucketName,
cfg: cfg,
diskCache: disk,
prefix: prefix,
verbose: verbose,
uploadStatsChan: make(chan *Stats, 10),
downloadStatsChan: make(chan *Stats, 10),
}
cache.StartStatsGathering()
return cache
}

func (c *S3Cache) client() (*s3.Client, error) {
Expand Down Expand Up @@ -146,16 +151,17 @@ func (c *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath
if err != nil {
return err
}
c.bytesDownloaded.Add(outputResult.ContentLength)
return nil
}
if c.verbose {
speed, err := DoAndMeasureSpeed(av.Size, downloadFunc)
if err == nil {
c.downloadsStatsMutex.Lock()
c.avgBytesDownloadSpeed = newAverage(c.avgBytesDownloadSpeed, c.downloadCount, speed)
c.downloadCount++
c.downloadsStatsMutex.Unlock()
c.downloadStatsChan <- &Stats{
Bytes: outputResult.ContentLength,
Speed: speed,
}
} else {
log.Printf("error downloading %s: %v", outputKey, err)
}
} else {
err = downloadFunc()
Expand Down Expand Up @@ -212,8 +218,6 @@ func (c *S3Cache) Put(ctx context.Context, actionID, outputID string, size int64
if size > 0 && err == nil {
c.uploadOutput(ctx, outputID, client, readerForS3, size)
}
c.bytesUploaded.Add(size)

return
}

Expand All @@ -231,22 +235,22 @@ func (c *S3Cache) uploadOutput(ctx context.Context, outputID string, client *s3.
if c.verbose {
speed, err := DoAndMeasureSpeed(size, putObjectFunc)
if err == nil {
c.uploadStatsMutex.Lock()
c.avgBytesUploadSpeed = newAverage(c.avgBytesUploadSpeed, c.uploadCount, speed)
c.uploadCount++
c.uploadStatsMutex.Unlock()
c.uploadStatsChan <- &Stats{
Bytes: size,
Speed: speed,
}
}
} else {
_ = putObjectFunc()
}
}

func (c *S3Cache) BytesDownloaded() int64 {
return c.bytesDownloaded.Load()
return c.bytesDownloaded
}

func (c *S3Cache) BytesUploaded() int64 {
return c.bytesUploaded.Load()
return c.bytesUploaded
}

func (c *S3Cache) AvgBytesDownloadSpeed() float64 {
Expand All @@ -268,3 +272,25 @@ func DoAndMeasureSpeed(dataSize int64, functionOnData func() error) (float64, er
speed := float64(dataSize) / elapsed.Seconds()
return speed, err
}

func (c *S3Cache) StartStatsGathering() {
go func() {
for s := range c.uploadStatsChan {
c.bytesUploaded += s.Bytes
c.avgBytesUploadSpeed = newAverage(c.avgBytesUploadSpeed, c.uploadCount, s.Speed)
c.uploadCount++
}
}()
go func() {
for s := range c.downloadStatsChan {
c.bytesDownloaded += s.Bytes
c.avgBytesDownloadSpeed = newAverage(c.avgBytesDownloadSpeed, c.downloadCount, s.Speed)
c.downloadCount++
}
}()
}

func (c *S3Cache) Close() {
close(c.uploadStatsChan)
close(c.downloadStatsChan)
}
4 changes: 4 additions & 0 deletions cmd/go-cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func main() {
p.BytesUploaded = s3Cache.BytesUploaded
p.AvgBytesDownloadSpeed = s3Cache.AvgBytesDownloadSpeed
p.AvgBytesUploadSpeed = s3Cache.AvgBytesUploadSpeed
p.Close = func() error {
s3Cache.Close()
return p.Close()
}
p.RemoteCacheEnabled = true
}

Expand Down

0 comments on commit 230040c

Please sign in to comment.