Skip to content

Commit

Permalink
[DynamoDB] Support backfills (#475)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 2, 2024
1 parent 5a7f358 commit 839311e
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 81 deletions.
14 changes: 4 additions & 10 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ func (d *DynamoDB) Validate() error {
}

type SnapshotSettings struct {
S3Bucket string `yaml:"s3Bucket"`
Folder string `yaml:"folder"`
// If the files are not specified, that's okay.
// We will scan the folder and then load into `specifiedFiles`
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"`
BatchSize int32 `yaml:"batchSize"`
ShouldInitiateExport bool `yaml:"shouldInitiateExport"` // Whether Reader should initiate a DDB export or not.
Folder string `yaml:"folder"` // The folder where the snapshot files will be stored.
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` // If this is passed in, we'll only process these files
BatchSize int32 `yaml:"batchSize"`
}

func (s *SnapshotSettings) GetBatchSize() int32 {
Expand All @@ -60,9 +58,5 @@ func (s *SnapshotSettings) Validate() error {
return fmt.Errorf("folder is empty")
}

if s.S3Bucket == "" {
return fmt.Errorf("s3Bucket is empty")
}

return nil
}
29 changes: 29 additions & 0 deletions lib/dynamo/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package dynamo

import (
"fmt"
"path/filepath"
"strings"
)

func GetTableArnFromStreamArn(streamArn string) (string, error) {
parts := strings.Split(streamArn, "/stream/")
if len(parts) != 2 {
return "", fmt.Errorf("invalid stream ARN: %q", streamArn)
}

return parts[0], nil
}

func ParseManifestFile(bucket string, manifestFilePath string) (string, error) {
if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") {
return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath)
}

parts := strings.Split(manifestFilePath, "/")
if len(parts) == 0 {
return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath)
}

return filepath.Join(bucket, strings.Join(parts[:len(parts)-1], "/")), nil
}
34 changes: 34 additions & 0 deletions lib/dynamo/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dynamo

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetTableArnFromStreamArn(t *testing.T) {
{
// Valid stream ARN
tableArn, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2021-01-01T00:00:00.000")
assert.NoError(t, err)
assert.Equal(t, "arn:aws:dynamodb:us-west-2:123456789012:table/my-table", tableArn)
}
{
// Invalid stream ARN
_, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table")
assert.ErrorContains(t, err, `invalid stream ARN: "arn:aws:dynamodb:us-west-2:123456789012:table/my-table"`)
}
}

func TestParseManifestFile(t *testing.T) {
{
// Valid manifest file path
bucket, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary.json")
assert.NoError(t, err)
assert.Equal(t, "bucket/artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6", bucket)
}
{
// Invalid manifest file path
_, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary")
assert.ErrorContains(t, err, `invalid manifest filepath: "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary"`)
}
}
16 changes: 6 additions & 10 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,28 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client {
}
}

func bucketAndPrefixFromFilePath(fp string) (*string, *string, error) {
func BucketAndPrefixFromFilePath(fp string) (string, string, error) {
// Remove the s3:// prefix if it's there
fp = strings.TrimPrefix(fp, "s3://")

parts := strings.SplitN(fp, "/", 2)
parts := strings.SplitN(strings.TrimPrefix(fp, "s3://"), "/", 2)
if len(parts) < 2 {
return nil, nil, fmt.Errorf("invalid S3 path, missing prefix")
return "", "", fmt.Errorf("invalid S3 path, missing prefix")
}

bucket := parts[0]
prefix := parts[1]
return &bucket, &prefix, nil
return parts[0], parts[1], nil
}

type S3File struct {
Key *string `yaml:"key"`
}

func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
bucket, prefix, err := bucketAndPrefixFromFilePath(fp)
bucket, prefix, err := BucketAndPrefixFromFilePath(fp)
if err != nil {
return nil, err
}

var files []S3File
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix})
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &prefix})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
Expand Down
91 changes: 34 additions & 57 deletions lib/s3lib/s3lib_test.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,50 @@
package s3lib

import (
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBucketAndPrefixFromFilePath(t *testing.T) {
tcs := []struct {
name string
fp string
expectedBucket *string
expectedPrefix *string
expectedErr string
}{
{
// Invalid
{
name: "valid path (w/ S3 prefix)",
fp: "s3://bucket/prefix",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix"),
},
{
name: "valid path (w/ S3 prefix) with trailing slash",
fp: "s3://bucket/prefix/",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/"),
},
{
name: "valid path (w/ S3 prefix) with multiple slashes",
fp: "s3://bucket/prefix/with/multiple/slashes",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"),
},
// Without S3 prefix
// Empty string
bucket, prefix, err := BucketAndPrefixFromFilePath("")
assert.ErrorContains(t, err, "invalid S3 path, missing prefix")
assert.Empty(t, bucket)
assert.Empty(t, prefix)
}
{
name: "valid path (w/o S3 prefix)",
fp: "bucket/prefix",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix"),
},
// Bucket only, no prefix
bucket, prefix, err := BucketAndPrefixFromFilePath("bucket")
assert.ErrorContains(t, err, "invalid S3 path, missing prefix")
assert.Empty(t, bucket)
assert.Empty(t, prefix)
}
}
{
// Valid
{
name: "valid path (w/o S3 prefix) with trailing slash",
fp: "bucket/prefix/",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/"),
},
// No S3 prefix
bucket, prefix, err := BucketAndPrefixFromFilePath("bucket/prefix")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "prefix", prefix)
}
{
name: "valid path (w/o S3 prefix) with multiple slashes",
fp: "bucket/prefix/with/multiple/slashes",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"),
},
// S3 prefix
bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "prefix", prefix)
}
{
name: "invalid path",
fp: "s3://bucket",
expectedErr: "invalid S3 path, missing prefix",
},
}

for _, tc := range tcs {
actualBucket, actualPrefix, actualErr := bucketAndPrefixFromFilePath(tc.fp)
if tc.expectedErr != "" {
assert.ErrorContains(t, actualErr, tc.expectedErr, tc.name)
} else {
assert.NoError(t, actualErr, tc.name)

// Now check the actualBucket and prefix
assert.Equal(t, *tc.expectedBucket, *actualBucket, tc.name)
assert.Equal(t, *tc.expectedPrefix, *actualPrefix, tc.name)
// S3 prefix long
bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix/long")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "prefix/long", prefix)
}
}
}
7 changes: 6 additions & 1 deletion sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error
}

if cfg.Snapshot {
return snapshot.NewStore(cfg, _awsCfg), false, nil
store, err := snapshot.NewStore(ctx, cfg, _awsCfg)
if err != nil {
return nil, false, err
}

return store, false, nil
} else {
return stream.NewStore(cfg, _awsCfg), true, nil
}
Expand Down
101 changes: 101 additions & 0 deletions sources/dynamodb/snapshot/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package snapshot

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/artie-labs/reader/lib/dynamo"
)

func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) {
var out []types.ExportSummary
var nextToken *string
for {
exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{TableArn: aws.String(tableARN), NextToken: nextToken})
if err != nil {
return nil, fmt.Errorf("failed to list exports: %w", err)
}

out = append(out, exports.ExportSummaries...)
if exports.NextToken == nil {
break
}

nextToken = exports.NextToken
}

return out, nil
}

func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix string) (*string, *string, error) {
tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn)
if err != nil {
return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err)
}

exports, err := s.listExports(ctx, tableARN)
if err != nil {
return nil, nil, fmt.Errorf("failed to list exports: %w", err)
}

for _, export := range exports {
if export.ExportStatus == types.ExportStatusFailed {
slog.Info("Filtering out failed exports", slog.String("exportARN", *export.ExportArn))
continue
}

exportDescription, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: export.ExportArn})
if err != nil {
return nil, nil, fmt.Errorf("failed to describe export: %w", err)
}

if *exportDescription.ExportDescription.S3Bucket == bucket && *exportDescription.ExportDescription.S3Prefix == prefix {
if export.ExportStatus == types.ExportStatusCompleted {
return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil
}

return export.ExportArn, nil, nil
}
}

// Not found, so let's initiate one
result, err := s.dynamoDBClient.ExportTableToPointInTime(ctx, &dynamodb.ExportTableToPointInTimeInput{
TableArn: aws.String(tableARN),
S3Bucket: aws.String(bucket),
S3Prefix: aws.String(prefix),
ExportFormat: types.ExportFormatDynamodbJson,
})

if err != nil {
return nil, nil, err
}

return result.ExportDescription.ExportArn, nil, nil
}

func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) {
for {
result, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: exportARN})
if err != nil {
return nil, fmt.Errorf("failed to describe export: %w", err)
}

switch result.ExportDescription.ExportStatus {
case types.ExportStatusCompleted:
return result.ExportDescription.ExportManifest, nil
case types.ExportStatusFailed:
return nil, fmt.Errorf("export has failed: %s", *result.ExportDescription.FailureMessage)
case types.ExportStatusInProgress:
slog.Info("Export is still in progress")
time.Sleep(30 * time.Second)
default:
return nil, fmt.Errorf("unknown export status: %s", string(result.ExportDescription.ExportStatus))
}
}
}
Loading

0 comments on commit 839311e

Please sign in to comment.