From a01041f9ecce23edd8738d04e55cd27ec6e20f0a Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Wed, 4 Oct 2023 17:02:16 +0300 Subject: [PATCH] fix(cache): make dynamoDB aware of orignal/deduped blobs Signed-off-by: Petu Eusebiu --- pkg/storage/cache/boltdb_test.go | 60 +++++++++++++++ pkg/storage/cache/dynamodb.go | 119 ++++++++++++++++++++++------- pkg/storage/cache/dynamodb_test.go | 57 ++++++++++++++ 3 files changed, 207 insertions(+), 29 deletions(-) diff --git a/pkg/storage/cache/boltdb_test.go b/pkg/storage/cache/boltdb_test.go index 8759b781db..0e75ccbf31 100644 --- a/pkg/storage/cache/boltdb_test.go +++ b/pkg/storage/cache/boltdb_test.go @@ -60,5 +60,65 @@ func TestBoltDBCache(t *testing.T) { err = cacheDriver.PutBlob("key", "") So(err, ShouldNotBeNil) So(err, ShouldEqual, errors.ErrEmptyValue) + + cacheDriver, _ = storage.Create("boltdb", cache.BoltDBDriverParameters{t.TempDir(), "cache_test", false}, log) + So(cacheDriver, ShouldNotBeNil) + + err = cacheDriver.PutBlob("key1", "originalBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key1") + So(val, ShouldEqual, "originalBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key1") + So(val, ShouldEqual, "originalBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key1", "originalBlobPath") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key1") + So(val, ShouldEqual, "duplicateBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + // should be empty + val, err = cacheDriver.GetBlob("key1") + So(err, ShouldNotBeNil) + So(val, ShouldBeEmpty) + + // try to add three same values + err = cacheDriver.PutBlob("key2", "duplicate") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key2", "duplicate") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key2", "duplicate") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key2") + So(val, ShouldEqual, "duplicate") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key2", "duplicate") + So(err, ShouldBeNil) + + // should be empty + val, err = cacheDriver.GetBlob("key2") + So(err, ShouldNotBeNil) + So(val, ShouldBeEmpty) }) } diff --git a/pkg/storage/cache/dynamodb.go b/pkg/storage/cache/dynamodb.go index 4aafed0900..9d63e018e5 100644 --- a/pkg/storage/cache/dynamodb.go +++ b/pkg/storage/cache/dynamodb.go @@ -26,11 +26,11 @@ type DynamoDBDriverParameters struct { } type Blob struct { - Digest string `dynamodbav:"Digest,string"` - BlobPath []string `dynamodbav:"BlobPath,stringset"` + Digest string `dynamodbav:"Digest,string"` + DuplicateBlobPath []string `dynamodbav:"DuplicateBlobPath,stringset"` + OriginalBlobPath string `dynamodbav:"OriginalBlobPath,string"` } -// Use ONLY for tests. func (d *DynamoDBDriver) NewTable(tableName string) error { //nolint:gomnd _, err := d.client.CreateTable(context.TODO(), &dynamodb.CreateTableInput{ @@ -107,7 +107,7 @@ func (d *DynamoDBDriver) Name() string { return "dynamodb" } -// Returns the first path of the blob if it exists. +// Returns the original blob. func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) { resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{ TableName: aws.String(d.tableName), @@ -129,11 +129,7 @@ func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) { _ = attributevalue.UnmarshalMap(resp.Item, &out) - if len(out.BlobPath) == 0 { - return "", nil - } - - return out.BlobPath[0], nil + return out.OriginalBlobPath, nil } func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error { @@ -143,17 +139,18 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error { return zerr.ErrEmptyValue } - marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()}) - expression := "ADD BlobPath :i" + if originBlob, _ := d.GetBlob(digest); originBlob == "" { + // first entry, so add original blob + if err := d.putOriginBlob(digest, path); err != nil { + return err + } + } + + expression := "ADD DuplicateBlobPath :i" attrPath := types.AttributeValueMemberSS{Value: []string{path}} - if _, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{ - Key: marshaledKey, - TableName: &d.tableName, - UpdateExpression: &expression, - ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath}, - }); err != nil { - d.log.Error().Err(err) + if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil { + d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to put blob") return err } @@ -184,7 +181,11 @@ func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool { _ = attributevalue.UnmarshalMap(resp.Item, &out) - for _, item := range out.BlobPath { + if out.OriginalBlobPath == path { + return true + } + + for _, item := range out.DuplicateBlobPath { if item == path { return true } @@ -198,24 +199,28 @@ func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool { func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error { marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()}) - expression := "DELETE BlobPath :i" + expression := "DELETE DuplicateBlobPath :i" attrPath := types.AttributeValueMemberSS{Value: []string{path}} - _, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{ - Key: marshaledKey, - TableName: &d.tableName, - UpdateExpression: &expression, - ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath}, - }) - if err != nil { + if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil { d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to delete") return err } - result, _ := d.GetBlob(digest) + originBlob, _ := d.GetBlob(digest) + // if original blob is the one deleted + if originBlob == path { + // move duplicate blob to original, storage will move content here + originBlob, _ = d.getDuplicateBlob(digest) + if originBlob != "" { + if err := d.putOriginBlob(digest, originBlob); err != nil { + return err + } + } + } - if result == "" { + if originBlob == "" { d.log.Debug().Str("digest", digest.String()).Str("path", path).Msg("deleting empty bucket") _, _ = d.client.DeleteItem(context.TODO(), &dynamodb.DeleteItemInput{ @@ -226,3 +231,59 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error { return nil } + +func (d *DynamoDBDriver) getDuplicateBlob(digest godigest.Digest) (string, error) { + resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + "Digest": &types.AttributeValueMemberS{Value: digest.String()}, + }, + }) + if err != nil { + d.log.Error().Err(err).Str("tableName", d.tableName).Msg("failed to get blob") + + return "", err + } + + out := Blob{} + + if resp.Item == nil { + return "", zerr.ErrCacheMiss + } + + _ = attributevalue.UnmarshalMap(resp.Item, &out) + + if len(out.DuplicateBlobPath) == 0 { + return "", nil + } + + return out.DuplicateBlobPath[0], nil +} + +func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) error { + expression := "SET OriginalBlobPath = :s" + attrPath := types.AttributeValueMemberS{Value: path} + + if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":s": &attrPath}); err != nil { + d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to put original blob") + + return err + } + + return nil +} + +func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string, + expressionAttVals map[string]types.AttributeValue, +) error { + marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()}) + + _, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{ + Key: marshaledKey, + TableName: &d.tableName, + UpdateExpression: &expression, + ExpressionAttributeValues: expressionAttVals, + }) + + return err +} diff --git a/pkg/storage/cache/dynamodb_test.go b/pkg/storage/cache/dynamodb_test.go index 64a67f4dd9..bb8e86b787 100644 --- a/pkg/storage/cache/dynamodb_test.go +++ b/pkg/storage/cache/dynamodb_test.go @@ -107,5 +107,62 @@ func TestDynamoDB(t *testing.T) { err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value2")) So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key1", "originalBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key1") + So(val, ShouldEqual, "originalBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key1") + So(val, ShouldEqual, "originalBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key1", "originalBlobPath") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key1") + So(val, ShouldEqual, "duplicateBlobPath") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath") + So(err, ShouldBeNil) + + // should be empty + val, err = cacheDriver.GetBlob("key1") + So(err, ShouldNotBeNil) + So(val, ShouldBeEmpty) + + // try to add three same values + err = cacheDriver.PutBlob("key2", "duplicate") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key2", "duplicate") + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("key2", "duplicate") + So(err, ShouldBeNil) + + val, err = cacheDriver.GetBlob("key2") + So(val, ShouldEqual, "duplicate") + So(err, ShouldBeNil) + + err = cacheDriver.DeleteBlob("key2", "duplicate") + So(err, ShouldBeNil) + + // should be empty + val, err = cacheDriver.GetBlob("key2") + So(err, ShouldNotBeNil) + So(val, ShouldBeEmpty) }) }