Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DynamoDB] Support backfills #475

Merged
merged 15 commits into from
Oct 2, 2024
14 changes: 4 additions & 10 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ func (d *DynamoDB) Validate() error {
}

type SnapshotSettings struct {
S3Bucket string `yaml:"s3Bucket"`
Folder string `yaml:"folder"`
// If the files are not specified, that's okay.
// We will scan the folder and then load into `specifiedFiles`
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"`
BatchSize int32 `yaml:"batchSize"`
ShouldInitiateExport bool `yaml:"shouldInitiateExport"` // Whether Reader should initiate a DDB export or not.
Folder string `yaml:"folder"` // The folder where the snapshot files will be stored.
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` // If this is passed in, we'll only process these files
BatchSize int32 `yaml:"batchSize"`
}

func (s *SnapshotSettings) GetBatchSize() int32 {
Expand All @@ -60,9 +58,5 @@ func (s *SnapshotSettings) Validate() error {
return fmt.Errorf("folder is empty")
}

if s.S3Bucket == "" {
return fmt.Errorf("s3Bucket is empty")
}

return nil
}
29 changes: 29 additions & 0 deletions lib/dynamo/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package dynamo

import (
"fmt"
"path/filepath"
"strings"
)

func GetTableArnFromStreamArn(streamArn string) (string, error) {
parts := strings.Split(streamArn, "/stream/")
if len(parts) != 2 {
return "", fmt.Errorf("invalid stream ARN: %q", streamArn)
}

return parts[0], nil
}

func ParseManifestFile(bucket string, manifestFilePath string) (string, error) {
if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") {
return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath)
}

parts := strings.Split(manifestFilePath, "/")
if len(parts) == 0 {
return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath)
}

return filepath.Join(bucket, strings.Join(parts[:len(parts)-1], "/")), nil
}
34 changes: 34 additions & 0 deletions lib/dynamo/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dynamo

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetTableArnFromStreamArn(t *testing.T) {
{
// Valid stream ARN
tableArn, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2021-01-01T00:00:00.000")
assert.NoError(t, err)
assert.Equal(t, "arn:aws:dynamodb:us-west-2:123456789012:table/my-table", tableArn)
}
{
// Invalid stream ARN
_, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table")
assert.ErrorContains(t, err, `invalid stream ARN: "arn:aws:dynamodb:us-west-2:123456789012:table/my-table"`)
}
}

func TestParseManifestFile(t *testing.T) {
{
// Valid manifest file path
bucket, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary.json")
assert.NoError(t, err)
assert.Equal(t, "bucket/artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6", bucket)
}
{
// Invalid manifest file path
_, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary")
assert.ErrorContains(t, err, `invalid manifest filepath: "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary"`)
}
}
16 changes: 6 additions & 10 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,28 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client {
}
}

func bucketAndPrefixFromFilePath(fp string) (*string, *string, error) {
func BucketAndPrefixFromFilePath(fp string) (string, string, error) {
// Remove the s3:// prefix if it's there
fp = strings.TrimPrefix(fp, "s3://")

parts := strings.SplitN(fp, "/", 2)
parts := strings.SplitN(strings.TrimPrefix(fp, "s3://"), "/", 2)
if len(parts) < 2 {
return nil, nil, fmt.Errorf("invalid S3 path, missing prefix")
return "", "", fmt.Errorf("invalid S3 path, missing prefix")
}

bucket := parts[0]
prefix := parts[1]
return &bucket, &prefix, nil
return parts[0], parts[1], nil
}

type S3File struct {
Key *string `yaml:"key"`
}

func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
bucket, prefix, err := bucketAndPrefixFromFilePath(fp)
bucket, prefix, err := BucketAndPrefixFromFilePath(fp)
if err != nil {
return nil, err
}

var files []S3File
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix})
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &prefix})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
Expand Down
91 changes: 34 additions & 57 deletions lib/s3lib/s3lib_test.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,50 @@
package s3lib

import (
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBucketAndPrefixFromFilePath(t *testing.T) {
tcs := []struct {
name string
fp string
expectedBucket *string
expectedPrefix *string
expectedErr string
}{
{
// Invalid
{
name: "valid path (w/ S3 prefix)",
fp: "s3://bucket/prefix",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix"),
},
{
name: "valid path (w/ S3 prefix) with trailing slash",
fp: "s3://bucket/prefix/",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/"),
},
{
name: "valid path (w/ S3 prefix) with multiple slashes",
fp: "s3://bucket/prefix/with/multiple/slashes",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"),
},
// Without S3 prefix
// Empty string
bucket, prefix, err := BucketAndPrefixFromFilePath("")
assert.ErrorContains(t, err, "invalid S3 path, missing prefix")
assert.Empty(t, bucket)
assert.Empty(t, prefix)
}
{
name: "valid path (w/o S3 prefix)",
fp: "bucket/prefix",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix"),
},
// Bucket only, no prefix
bucket, prefix, err := BucketAndPrefixFromFilePath("bucket")
assert.ErrorContains(t, err, "invalid S3 path, missing prefix")
assert.Empty(t, bucket)
assert.Empty(t, prefix)
}
}
{
// Valid
{
name: "valid path (w/o S3 prefix) with trailing slash",
fp: "bucket/prefix/",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/"),
},
// No S3 prefix
bucket, prefix, err := BucketAndPrefixFromFilePath("bucket/prefix")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "prefix", prefix)
}
{
name: "valid path (w/o S3 prefix) with multiple slashes",
fp: "bucket/prefix/with/multiple/slashes",
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"),
},
// S3 prefix
bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "prefix", prefix)
}
{
name: "invalid path",
fp: "s3://bucket",
expectedErr: "invalid S3 path, missing prefix",
},
}

for _, tc := range tcs {
actualBucket, actualPrefix, actualErr := bucketAndPrefixFromFilePath(tc.fp)
if tc.expectedErr != "" {
assert.ErrorContains(t, actualErr, tc.expectedErr, tc.name)
} else {
assert.NoError(t, actualErr, tc.name)

// Now check the actualBucket and prefix
assert.Equal(t, *tc.expectedBucket, *actualBucket, tc.name)
assert.Equal(t, *tc.expectedPrefix, *actualPrefix, tc.name)
// S3 prefix long
bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix/long")
assert.NoError(t, err)
assert.Equal(t, "bucket", bucket)
assert.Equal(t, "prefix/long", prefix)
}
}
}
7 changes: 6 additions & 1 deletion sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error
}

if cfg.Snapshot {
return snapshot.NewStore(cfg, _awsCfg), false, nil
store, err := snapshot.NewStore(ctx, cfg, _awsCfg)
if err != nil {
return nil, false, err
}

return store, false, nil
} else {
return stream.NewStore(cfg, _awsCfg), true, nil
}
Expand Down
101 changes: 101 additions & 0 deletions sources/dynamodb/snapshot/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package snapshot

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/artie-labs/reader/lib/dynamo"
)

func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) {
var out []types.ExportSummary
var nextToken *string
for {
exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{TableArn: aws.String(tableARN), NextToken: nextToken})
if err != nil {
return nil, fmt.Errorf("failed to list exports: %w", err)
}

out = append(out, exports.ExportSummaries...)
if exports.NextToken == nil {
break
}

nextToken = exports.NextToken
}

return out, nil
}

func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix string) (*string, *string, error) {
tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn)
if err != nil {
return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err)
}

exports, err := s.listExports(ctx, tableARN)
if err != nil {
return nil, nil, fmt.Errorf("failed to list exports: %w", err)
}

for _, export := range exports {
if export.ExportStatus == types.ExportStatusFailed {
slog.Info("Filtering out failed exports", slog.String("exportARN", *export.ExportArn))
continue
}

exportDescription, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: export.ExportArn})
if err != nil {
return nil, nil, fmt.Errorf("failed to describe export: %w", err)
}

if *exportDescription.ExportDescription.S3Bucket == bucket && *exportDescription.ExportDescription.S3Prefix == prefix {
if export.ExportStatus == types.ExportStatusCompleted {
return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil
}

return export.ExportArn, nil, nil
}
}

// Not found, so let's initiate one
result, err := s.dynamoDBClient.ExportTableToPointInTime(ctx, &dynamodb.ExportTableToPointInTimeInput{
TableArn: aws.String(tableARN),
S3Bucket: aws.String(bucket),
S3Prefix: aws.String(prefix),
ExportFormat: types.ExportFormatDynamodbJson,
})

if err != nil {
return nil, nil, err
}

return result.ExportDescription.ExportArn, nil, nil
}

func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) {
for {
result, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: exportARN})
if err != nil {
return nil, fmt.Errorf("failed to describe export: %w", err)
}

switch result.ExportDescription.ExportStatus {
case types.ExportStatusCompleted:
return result.ExportDescription.ExportManifest, nil
case types.ExportStatusFailed:
return nil, fmt.Errorf("export has failed: %s", *result.ExportDescription.FailureMessage)
case types.ExportStatusInProgress:
slog.Info("Export is still in progress")
time.Sleep(30 * time.Second)
default:
return nil, fmt.Errorf("unknown export status: %s", string(result.ExportDescription.ExportStatus))
}
}
}
Loading