diff --git a/config/dynamodb.go b/config/dynamodb.go index 98db5ddd..3344e9cc 100644 --- a/config/dynamodb.go +++ b/config/dynamodb.go @@ -39,12 +39,10 @@ func (d *DynamoDB) Validate() error { } type SnapshotSettings struct { - S3Bucket string `yaml:"s3Bucket"` - Folder string `yaml:"folder"` - // If the files are not specified, that's okay. - // We will scan the folder and then load into `specifiedFiles` - SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` - BatchSize int32 `yaml:"batchSize"` + ShouldInitiateExport bool `yaml:"shouldInitiateExport"` // Whether Reader should initiate a DDB export or not. + Folder string `yaml:"folder"` // The folder where the snapshot files will be stored. + SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` // If this is passed in, we'll only process these files + BatchSize int32 `yaml:"batchSize"` } func (s *SnapshotSettings) GetBatchSize() int32 { @@ -60,9 +58,5 @@ func (s *SnapshotSettings) Validate() error { return fmt.Errorf("folder is empty") } - if s.S3Bucket == "" { - return fmt.Errorf("s3Bucket is empty") - } - return nil } diff --git a/lib/dynamo/util.go b/lib/dynamo/util.go new file mode 100644 index 00000000..f12575bf --- /dev/null +++ b/lib/dynamo/util.go @@ -0,0 +1,29 @@ +package dynamo + +import ( + "fmt" + "path/filepath" + "strings" +) + +func GetTableArnFromStreamArn(streamArn string) (string, error) { + parts := strings.Split(streamArn, "/stream/") + if len(parts) != 2 { + return "", fmt.Errorf("invalid stream ARN: %q", streamArn) + } + + return parts[0], nil +} + +func ParseManifestFile(bucket string, manifestFilePath string) (string, error) { + if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") { + return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath) + } + + parts := strings.Split(manifestFilePath, "/") + if len(parts) == 0 { + return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath) + } + + return filepath.Join(bucket, strings.Join(parts[:len(parts)-1], "/")), nil +} diff --git a/lib/dynamo/util_test.go b/lib/dynamo/util_test.go new file mode 100644 index 00000000..f77157d0 --- /dev/null +++ b/lib/dynamo/util_test.go @@ -0,0 +1,34 @@ +package dynamo + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetTableArnFromStreamArn(t *testing.T) { + { + // Valid stream ARN + tableArn, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2021-01-01T00:00:00.000") + assert.NoError(t, err) + assert.Equal(t, "arn:aws:dynamodb:us-west-2:123456789012:table/my-table", tableArn) + } + { + // Invalid stream ARN + _, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table") + assert.ErrorContains(t, err, `invalid stream ARN: "arn:aws:dynamodb:us-west-2:123456789012:table/my-table"`) + } +} + +func TestParseManifestFile(t *testing.T) { + { + // Valid manifest file path + bucket, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary.json") + assert.NoError(t, err) + assert.Equal(t, "bucket/artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6", bucket) + } + { + // Invalid manifest file path + _, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary") + assert.ErrorContains(t, err, `invalid manifest filepath: "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary"`) + } +} diff --git a/lib/s3lib/s3lib.go b/lib/s3lib/s3lib.go index d8f7f746..e990c81e 100644 --- a/lib/s3lib/s3lib.go +++ b/lib/s3lib/s3lib.go @@ -24,18 +24,14 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client { } } -func bucketAndPrefixFromFilePath(fp string) (*string, *string, error) { +func BucketAndPrefixFromFilePath(fp string) (string, string, error) { // Remove the s3:// prefix if it's there - fp = strings.TrimPrefix(fp, "s3://") - - parts := strings.SplitN(fp, "/", 2) + parts := strings.SplitN(strings.TrimPrefix(fp, "s3://"), "/", 2) if len(parts) < 2 { - return nil, nil, fmt.Errorf("invalid S3 path, missing prefix") + return "", "", fmt.Errorf("invalid S3 path, missing prefix") } - bucket := parts[0] - prefix := parts[1] - return &bucket, &prefix, nil + return parts[0], parts[1], nil } type S3File struct { @@ -43,13 +39,13 @@ type S3File struct { } func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) { - bucket, prefix, err := bucketAndPrefixFromFilePath(fp) + bucket, prefix, err := BucketAndPrefixFromFilePath(fp) if err != nil { return nil, err } var files []S3File - paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix}) + paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &prefix}) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { diff --git a/lib/s3lib/s3lib_test.go b/lib/s3lib/s3lib_test.go index caf7e21a..c6775a61 100644 --- a/lib/s3lib/s3lib_test.go +++ b/lib/s3lib/s3lib_test.go @@ -1,73 +1,50 @@ package s3lib import ( - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" "testing" ) func TestBucketAndPrefixFromFilePath(t *testing.T) { - tcs := []struct { - name string - fp string - expectedBucket *string - expectedPrefix *string - expectedErr string - }{ + { + // Invalid { - name: "valid path (w/ S3 prefix)", - fp: "s3://bucket/prefix", - expectedBucket: typing.ToPtr("bucket"), - expectedPrefix: typing.ToPtr("prefix"), - }, - { - name: "valid path (w/ S3 prefix) with trailing slash", - fp: "s3://bucket/prefix/", - expectedBucket: typing.ToPtr("bucket"), - expectedPrefix: typing.ToPtr("prefix/"), - }, - { - name: "valid path (w/ S3 prefix) with multiple slashes", - fp: "s3://bucket/prefix/with/multiple/slashes", - expectedBucket: typing.ToPtr("bucket"), - expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"), - }, - // Without S3 prefix + // Empty string + bucket, prefix, err := BucketAndPrefixFromFilePath("") + assert.ErrorContains(t, err, "invalid S3 path, missing prefix") + assert.Empty(t, bucket) + assert.Empty(t, prefix) + } { - name: "valid path (w/o S3 prefix)", - fp: "bucket/prefix", - expectedBucket: typing.ToPtr("bucket"), - expectedPrefix: typing.ToPtr("prefix"), - }, + // Bucket only, no prefix + bucket, prefix, err := BucketAndPrefixFromFilePath("bucket") + assert.ErrorContains(t, err, "invalid S3 path, missing prefix") + assert.Empty(t, bucket) + assert.Empty(t, prefix) + } + } + { + // Valid { - name: "valid path (w/o S3 prefix) with trailing slash", - fp: "bucket/prefix/", - expectedBucket: typing.ToPtr("bucket"), - expectedPrefix: typing.ToPtr("prefix/"), - }, + // No S3 prefix + bucket, prefix, err := BucketAndPrefixFromFilePath("bucket/prefix") + assert.NoError(t, err) + assert.Equal(t, "bucket", bucket) + assert.Equal(t, "prefix", prefix) + } { - name: "valid path (w/o S3 prefix) with multiple slashes", - fp: "bucket/prefix/with/multiple/slashes", - expectedBucket: typing.ToPtr("bucket"), - expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"), - }, + // S3 prefix + bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix") + assert.NoError(t, err) + assert.Equal(t, "bucket", bucket) + assert.Equal(t, "prefix", prefix) + } { - name: "invalid path", - fp: "s3://bucket", - expectedErr: "invalid S3 path, missing prefix", - }, - } - - for _, tc := range tcs { - actualBucket, actualPrefix, actualErr := bucketAndPrefixFromFilePath(tc.fp) - if tc.expectedErr != "" { - assert.ErrorContains(t, actualErr, tc.expectedErr, tc.name) - } else { - assert.NoError(t, actualErr, tc.name) - - // Now check the actualBucket and prefix - assert.Equal(t, *tc.expectedBucket, *actualBucket, tc.name) - assert.Equal(t, *tc.expectedPrefix, *actualPrefix, tc.name) + // S3 prefix long + bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix/long") + assert.NoError(t, err) + assert.Equal(t, "bucket", bucket) + assert.Equal(t, "prefix/long", prefix) } } } diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 5c643c38..ee56cc47 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -30,7 +30,12 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error } if cfg.Snapshot { - return snapshot.NewStore(cfg, _awsCfg), false, nil + store, err := snapshot.NewStore(ctx, cfg, _awsCfg) + if err != nil { + return nil, false, err + } + + return store, false, nil } else { return stream.NewStore(cfg, _awsCfg), true, nil } diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go new file mode 100644 index 00000000..b19f025c --- /dev/null +++ b/sources/dynamodb/snapshot/export.go @@ -0,0 +1,101 @@ +package snapshot + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + + "github.com/artie-labs/reader/lib/dynamo" +) + +func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) { + var out []types.ExportSummary + var nextToken *string + for { + exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{TableArn: aws.String(tableARN), NextToken: nextToken}) + if err != nil { + return nil, fmt.Errorf("failed to list exports: %w", err) + } + + out = append(out, exports.ExportSummaries...) + if exports.NextToken == nil { + break + } + + nextToken = exports.NextToken + } + + return out, nil +} + +func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix string) (*string, *string, error) { + tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) + if err != nil { + return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err) + } + + exports, err := s.listExports(ctx, tableARN) + if err != nil { + return nil, nil, fmt.Errorf("failed to list exports: %w", err) + } + + for _, export := range exports { + if export.ExportStatus == types.ExportStatusFailed { + slog.Info("Filtering out failed exports", slog.String("exportARN", *export.ExportArn)) + continue + } + + exportDescription, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: export.ExportArn}) + if err != nil { + return nil, nil, fmt.Errorf("failed to describe export: %w", err) + } + + if *exportDescription.ExportDescription.S3Bucket == bucket && *exportDescription.ExportDescription.S3Prefix == prefix { + if export.ExportStatus == types.ExportStatusCompleted { + return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil + } + + return export.ExportArn, nil, nil + } + } + + // Not found, so let's initiate one + result, err := s.dynamoDBClient.ExportTableToPointInTime(ctx, &dynamodb.ExportTableToPointInTimeInput{ + TableArn: aws.String(tableARN), + S3Bucket: aws.String(bucket), + S3Prefix: aws.String(prefix), + ExportFormat: types.ExportFormatDynamodbJson, + }) + + if err != nil { + return nil, nil, err + } + + return result.ExportDescription.ExportArn, nil, nil +} + +func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) { + for { + result, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: exportARN}) + if err != nil { + return nil, fmt.Errorf("failed to describe export: %w", err) + } + + switch result.ExportDescription.ExportStatus { + case types.ExportStatusCompleted: + return result.ExportDescription.ExportManifest, nil + case types.ExportStatusFailed: + return nil, fmt.Errorf("export has failed: %s", *result.ExportDescription.FailureMessage) + case types.ExportStatusInProgress: + slog.Info("Export is still in progress") + time.Sleep(30 * time.Second) + default: + return nil, fmt.Errorf("unknown export status: %s", string(result.ExportDescription.ExportStatus)) + } + } +} diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index 72b3ae60..5873c9c4 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "path/filepath" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -25,14 +26,50 @@ type Store struct { dynamoDBClient *dynamodb.Client } -func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store { - return &Store{ +func NewStore(ctx context.Context, cfg config.DynamoDB, awsCfg aws.Config) (*Store, error) { + bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(cfg.SnapshotSettings.Folder) + if err != nil { + return nil, err + } + + store := &Store{ tableName: cfg.TableName, streamArn: cfg.StreamArn, cfg: &cfg, - s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, awsCfg), + s3Client: s3lib.NewClient(bucketName, awsCfg), dynamoDBClient: dynamodb.NewFromConfig(awsCfg), } + + if cfg.SnapshotSettings.ShouldInitiateExport { + exportARN, manifestFilePath, err := store.findRecentExport(ctx, bucketName, prefixName) + if err != nil { + return nil, err + } + + if manifestFilePath == nil { + // This means that the export is not done yet, so let's wait. + manifestFilePath, err = store.checkExportStatus(ctx, exportARN) + if err != nil { + return nil, fmt.Errorf("failed to check export status: %w", err) + } + } + + if err = store.loadFolderFromManifest(bucketName, *manifestFilePath); err != nil { + return nil, err + } + } + + return store, nil +} + +func (s *Store) loadFolderFromManifest(bucketName string, manifestFilePath string) error { + folder, err := dynamo.ParseManifestFile(bucketName, manifestFilePath) + if err != nil { + return fmt.Errorf("failed to parse manifest: %w", err) + } + + s.cfg.SnapshotSettings.Folder = filepath.Join(folder, "data") + return nil } func (s *Store) Close() error {