Skip to content

Commit

Permalink
fix(cache): make dynamoDB aware of orignal/deduped blobs
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 5, 2023
1 parent e6902b9 commit a036efa
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 29 deletions.
60 changes: 60 additions & 0 deletions pkg/storage/cache/boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,65 @@ func TestBoltDBCache(t *testing.T) {
err = cacheDriver.PutBlob("key", "")
So(err, ShouldNotBeNil)
So(err, ShouldEqual, errors.ErrEmptyValue)

cacheDriver, _ = storage.Create("boltdb", cache.BoltDBDriverParameters{t.TempDir(), "cache_test", false}, log)
So(cacheDriver, ShouldNotBeNil)

err = cacheDriver.PutBlob("key1", "originalBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "originalBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "originalBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key1", "originalBlobPath")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "duplicateBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

// should be empty
val, err = cacheDriver.GetBlob("key1")
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)

// try to add three same values
err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key2")
So(val, ShouldEqual, "duplicate")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key2", "duplicate")
So(err, ShouldBeNil)

// should be empty
val, err = cacheDriver.GetBlob("key2")
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)
})
}
119 changes: 90 additions & 29 deletions pkg/storage/cache/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type DynamoDBDriverParameters struct {
}

type Blob struct {
Digest string `dynamodbav:"Digest,string"`
BlobPath []string `dynamodbav:"BlobPath,stringset"`
Digest string `dynamodbav:"Digest,string"`
DuplicateBlobPath []string `dynamodbav:"DuplicateBlobPath,stringset"`
OriginalBlobPath string `dynamodbav:"OriginalBlobPath,string"`
}

// Use ONLY for tests.
func (d *DynamoDBDriver) NewTable(tableName string) error {
//nolint:gomnd
_, err := d.client.CreateTable(context.TODO(), &dynamodb.CreateTableInput{
Expand Down Expand Up @@ -107,7 +107,7 @@ func (d *DynamoDBDriver) Name() string {
return "dynamodb"
}

// Returns the first path of the blob if it exists.
// Returns the original blob.
func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) {
resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
TableName: aws.String(d.tableName),
Expand All @@ -129,11 +129,7 @@ func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) {

_ = attributevalue.UnmarshalMap(resp.Item, &out)

if len(out.BlobPath) == 0 {
return "", nil
}

return out.BlobPath[0], nil
return out.OriginalBlobPath, nil
}

func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
Expand All @@ -143,17 +139,18 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
return zerr.ErrEmptyValue
}

marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})
expression := "ADD BlobPath :i"
if originBlob, _ := d.GetBlob(digest); originBlob == "" {
// first entry, so add original blob
if err := d.putOriginBlob(digest, path); err != nil {
return err
}
}

expression := "ADD DuplicateBlobPath :i"
attrPath := types.AttributeValueMemberSS{Value: []string{path}}

if _, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
Key: marshaledKey,
TableName: &d.tableName,
UpdateExpression: &expression,
ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath},
}); err != nil {
d.log.Error().Err(err)
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to put blob")

Check warning on line 153 in pkg/storage/cache/dynamodb.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/dynamodb.go#L153

Added line #L153 was not covered by tests

return err
}
Expand Down Expand Up @@ -184,7 +181,11 @@ func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool {

_ = attributevalue.UnmarshalMap(resp.Item, &out)

for _, item := range out.BlobPath {
if out.OriginalBlobPath == path {
return true
}

for _, item := range out.DuplicateBlobPath {
if item == path {
return true
}
Expand All @@ -198,24 +199,28 @@ func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool {
func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})

expression := "DELETE BlobPath :i"
expression := "DELETE DuplicateBlobPath :i"
attrPath := types.AttributeValueMemberSS{Value: []string{path}}

_, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
Key: marshaledKey,
TableName: &d.tableName,
UpdateExpression: &expression,
ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath},
})
if err != nil {
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to delete")

return err
}

result, _ := d.GetBlob(digest)
originBlob, _ := d.GetBlob(digest)
// if original blob is the one deleted
if originBlob == path {
// move duplicate blob to original, storage will move content here
originBlob, _ = d.getDuplicateBlob(digest)
if originBlob != "" {
if err := d.putOriginBlob(digest, originBlob); err != nil {
return err
}

Check warning on line 219 in pkg/storage/cache/dynamodb.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/dynamodb.go#L218-L219

Added lines #L218 - L219 were not covered by tests
}
}

if result == "" {
if originBlob == "" {
d.log.Debug().Str("digest", digest.String()).Str("path", path).Msg("deleting empty bucket")

_, _ = d.client.DeleteItem(context.TODO(), &dynamodb.DeleteItemInput{
Expand All @@ -226,3 +231,59 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {

return nil
}

func (d *DynamoDBDriver) getDuplicateBlob(digest godigest.Digest) (string, error) {
resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
TableName: aws.String(d.tableName),
Key: map[string]types.AttributeValue{
"Digest": &types.AttributeValueMemberS{Value: digest.String()},
},
})
if err != nil {
d.log.Error().Err(err).Str("tableName", d.tableName).Msg("failed to get blob")

return "", err
}

Check warning on line 246 in pkg/storage/cache/dynamodb.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/dynamodb.go#L243-L246

Added lines #L243 - L246 were not covered by tests

out := Blob{}

if resp.Item == nil {
return "", zerr.ErrCacheMiss
}

Check warning on line 252 in pkg/storage/cache/dynamodb.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/dynamodb.go#L251-L252

Added lines #L251 - L252 were not covered by tests

_ = attributevalue.UnmarshalMap(resp.Item, &out)

if len(out.DuplicateBlobPath) == 0 {
return "", nil
}

return out.DuplicateBlobPath[0], nil
}

func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) error {
expression := "SET OriginalBlobPath = :s"
attrPath := types.AttributeValueMemberS{Value: path}

if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":s": &attrPath}); err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to put original blob")

return err
}

return nil
}

func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string,
expressionAttVals map[string]types.AttributeValue,
) error {
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})

_, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
Key: marshaledKey,
TableName: &d.tableName,
UpdateExpression: &expression,
ExpressionAttributeValues: expressionAttVals,
})

return err
}
57 changes: 57 additions & 0 deletions pkg/storage/cache/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,62 @@ func TestDynamoDB(t *testing.T) {

err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value2"))
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key1", "originalBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "originalBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "originalBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key1", "originalBlobPath")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "duplicateBlobPath")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)

// should be empty
val, err = cacheDriver.GetBlob("key1")
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)

// try to add three same values
err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)

val, err = cacheDriver.GetBlob("key2")
So(val, ShouldEqual, "duplicate")
So(err, ShouldBeNil)

err = cacheDriver.DeleteBlob("key2", "duplicate")
So(err, ShouldBeNil)

// should be empty
val, err = cacheDriver.GetBlob("key2")
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)
})
}

0 comments on commit a036efa

Please sign in to comment.