From 3709efae5d65b298c87c1474853f5113c5079ba7 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 18:16:14 -0700 Subject: [PATCH 01/13] Checkpoint. --- lib/dynamo/util.go | 26 +++++++ lib/dynamo/util_test.go | 34 ++++++++++ lib/s3lib/s3lib.go | 4 +- lib/s3lib/s3lib_test.go | 2 +- sources/dynamodb/snapshot/export.go | 102 ++++++++++++++++++++++++++++ 5 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 lib/dynamo/util.go create mode 100644 lib/dynamo/util_test.go create mode 100644 sources/dynamodb/snapshot/export.go diff --git a/lib/dynamo/util.go b/lib/dynamo/util.go new file mode 100644 index 00000000..912c0634 --- /dev/null +++ b/lib/dynamo/util.go @@ -0,0 +1,26 @@ +package dynamo + +import ( + "fmt" + "path/filepath" + "strings" +) + +func GetTableArnFromStreamArn(streamArn string) (string, error) { + parts := strings.Split(streamArn, "/stream/") + if len(parts) != 2 { + return "", fmt.Errorf("invalid stream ARN: %s", streamArn) + } + + return parts[0], nil +} + +func ParseManifestFile(bucket string, manifestFilePath string) (string, error) { + // artie-ddb-export/AWSDynamoDB/01722458674792-8831c8f6/manifest-summary.json + if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") { + return "", fmt.Errorf("invalid manifest filepath: %s", manifestFilePath) + } + + parts := strings.Split(manifestFilePath, "/") + return filepath.Join(bucket, strings.Join(parts[:len(parts)-1], "/")), nil +} diff --git a/lib/dynamo/util_test.go b/lib/dynamo/util_test.go new file mode 100644 index 00000000..cb422cff --- /dev/null +++ b/lib/dynamo/util_test.go @@ -0,0 +1,34 @@ +package dynamo + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetTableArnFromStreamArn(t *testing.T) { + { + // Valid stream ARN + tableArn, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2021-01-01T00:00:00.000") + assert.NoError(t, err) + assert.Equal(t, "arn:aws:dynamodb:us-west-2:123456789012:table/my-table", tableArn) + } + { + // Invalid stream ARN + _, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table") + assert.ErrorContains(t, err, "invalid stream ARN: arn:aws:dynamodb:us-west-2:123456789012:table/my-table") + } +} + +func TestParseManifestFile(t *testing.T) { + { + // Valid manifest file path + bucket, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary.json") + assert.NoError(t, err) + assert.Equal(t, "bucket/artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6", bucket) + } + { + // Invalid manifest file path + _, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary") + assert.ErrorContains(t, err, "invalid manifest filepath: artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary") + } +} diff --git a/lib/s3lib/s3lib.go b/lib/s3lib/s3lib.go index d8f7f746..35ca6f64 100644 --- a/lib/s3lib/s3lib.go +++ b/lib/s3lib/s3lib.go @@ -24,7 +24,7 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client { } } -func bucketAndPrefixFromFilePath(fp string) (*string, *string, error) { +func BucketAndPrefixFromFilePath(fp string) (*string, *string, error) { // Remove the s3:// prefix if it's there fp = strings.TrimPrefix(fp, "s3://") @@ -43,7 +43,7 @@ type S3File struct { } func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) { - bucket, prefix, err := bucketAndPrefixFromFilePath(fp) + bucket, prefix, err := BucketAndPrefixFromFilePath(fp) if err != nil { return nil, err } diff --git a/lib/s3lib/s3lib_test.go b/lib/s3lib/s3lib_test.go index c722646c..69a1db87 100644 --- a/lib/s3lib/s3lib_test.go +++ b/lib/s3lib/s3lib_test.go @@ -60,7 +60,7 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) { } for _, tc := range tcs { - actualBucket, actualPrefix, actualErr := bucketAndPrefixFromFilePath(tc.fp) + actualBucket, actualPrefix, actualErr := BucketAndPrefixFromFilePath(tc.fp) if tc.expectedErr != "" { assert.ErrorContains(t, actualErr, tc.expectedErr, tc.name) } else { diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go new file mode 100644 index 00000000..4b9ed7de --- /dev/null +++ b/sources/dynamodb/snapshot/export.go @@ -0,0 +1,102 @@ +package snapshot + +import ( + "context" + "fmt" + "github.com/artie-labs/reader/lib/dynamo" + "github.com/artie-labs/reader/lib/s3lib" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "log/slog" + "time" +) + +func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) { + bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(s3FilePath) + if err != nil { + return nil, nil, err + } + + tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) + if err != nil { + return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err) + } + + exports, err := s.listExports(ctx, tableARN) + if err != nil { + return nil, nil, fmt.Errorf("failed to list exports: %w", err) + } + + for _, export := range exports { + if export.ExportStatus == types.ExportStatusFailed { + slog.Info("Filtering out failed export", slog.String("exportARN", *export.ExportArn)) + continue + } + + exportDescription, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: export.ExportArn}) + if err != nil { + return nil, nil, fmt.Errorf("failed to describe export: %w", err) + } + + if exportDescription.ExportDescription.S3Bucket == bucketName && exportDescription.ExportDescription.S3Prefix == prefixName { + if export.ExportStatus == types.ExportStatusCompleted { + return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil + } + + return export.ExportArn, nil, nil + } + } + + return nil, nil, fmt.Errorf("no recent export found for %s", s3FilePath) +} + +func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) { + var out []types.ExportSummary + + var nextToken *string + for { + exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{ + TableArn: aws.String(tableARN), + NextToken: nextToken, + }) + + if err != nil { + return nil, fmt.Errorf("failed to list exports: %w", err) + } + + out = append(out, exports.ExportSummaries...) + if exports.NextToken == nil { + break + } + + nextToken = exports.NextToken + } + + return out, nil +} + +func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) { + for { + describeInput := &dynamodb.DescribeExportInput{ + ExportArn: exportARN, + } + + result, err := s.dynamoDBClient.DescribeExport(ctx, describeInput) + if err != nil { + return nil, fmt.Errorf("failed to describe export: %w", err) + } + + switch result.ExportDescription.ExportStatus { + case types.ExportStatusCompleted: + return result.ExportDescription.ExportManifest, nil + case types.ExportStatusFailed: + return nil, fmt.Errorf("export has failed: %s", *result.ExportDescription.FailureMessage) + case types.ExportStatusInProgress: + slog.Info("Export is still in progress") + time.Sleep(30 * time.Second) + default: + return nil, fmt.Errorf("unknown export status: %s", string(result.ExportDescription.ExportStatus)) + } + } +} From e9bbca46a480705f744ab1ecfc206376cf27d877 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 18:16:44 -0700 Subject: [PATCH 02/13] Clean up. --- lib/dynamo/util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/dynamo/util.go b/lib/dynamo/util.go index 912c0634..3b5717cb 100644 --- a/lib/dynamo/util.go +++ b/lib/dynamo/util.go @@ -16,7 +16,6 @@ func GetTableArnFromStreamArn(streamArn string) (string, error) { } func ParseManifestFile(bucket string, manifestFilePath string) (string, error) { - // artie-ddb-export/AWSDynamoDB/01722458674792-8831c8f6/manifest-summary.json if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") { return "", fmt.Errorf("invalid manifest filepath: %s", manifestFilePath) } From 8f4217510d4198bd27ca2e245749d01a0e36fdce Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 18:21:57 -0700 Subject: [PATCH 03/13] Checkpoint. --- config/dynamodb.go | 7 +----- lib/s3lib/s3lib.go | 10 ++++---- lib/s3lib/s3lib_test.go | 33 +++++++++++++-------------- sources/dynamodb/dynamodb.go | 7 +++++- sources/dynamodb/snapshot/export.go | 5 ---- sources/dynamodb/snapshot/snapshot.go | 20 ++++++++++++---- 6 files changed, 42 insertions(+), 40 deletions(-) diff --git a/config/dynamodb.go b/config/dynamodb.go index 98db5ddd..b4120eea 100644 --- a/config/dynamodb.go +++ b/config/dynamodb.go @@ -39,8 +39,7 @@ func (d *DynamoDB) Validate() error { } type SnapshotSettings struct { - S3Bucket string `yaml:"s3Bucket"` - Folder string `yaml:"folder"` + Folder string `yaml:"folder"` // If the files are not specified, that's okay. // We will scan the folder and then load into `specifiedFiles` SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` @@ -60,9 +59,5 @@ func (s *SnapshotSettings) Validate() error { return fmt.Errorf("folder is empty") } - if s.S3Bucket == "" { - return fmt.Errorf("s3Bucket is empty") - } - return nil } diff --git a/lib/s3lib/s3lib.go b/lib/s3lib/s3lib.go index 35ca6f64..4fb51912 100644 --- a/lib/s3lib/s3lib.go +++ b/lib/s3lib/s3lib.go @@ -24,18 +24,16 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client { } } -func BucketAndPrefixFromFilePath(fp string) (*string, *string, error) { +func BucketAndPrefixFromFilePath(fp string) (string, string, error) { // Remove the s3:// prefix if it's there fp = strings.TrimPrefix(fp, "s3://") parts := strings.SplitN(fp, "/", 2) if len(parts) < 2 { - return nil, nil, fmt.Errorf("invalid S3 path, missing prefix") + return "", "", fmt.Errorf("invalid S3 path, missing prefix") } - bucket := parts[0] - prefix := parts[1] - return &bucket, &prefix, nil + return parts[0], parts[1], nil } type S3File struct { @@ -49,7 +47,7 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) { } var files []S3File - paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix}) + paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &prefix}) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { diff --git a/lib/s3lib/s3lib_test.go b/lib/s3lib/s3lib_test.go index 69a1db87..fe77db95 100644 --- a/lib/s3lib/s3lib_test.go +++ b/lib/s3lib/s3lib_test.go @@ -3,7 +3,6 @@ package s3lib import ( "testing" - "github.com/artie-labs/transfer/lib/ptr" "github.com/stretchr/testify/assert" ) @@ -11,46 +10,46 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) { tcs := []struct { name string fp string - expectedBucket *string - expectedPrefix *string + expectedBucket string + expectedPrefix string expectedErr string }{ { name: "valid path (w/ S3 prefix)", fp: "s3://bucket/prefix", - expectedBucket: ptr.ToString("bucket"), - expectedPrefix: ptr.ToString("prefix"), + expectedBucket: "bucket", + expectedPrefix: "prefix", }, { name: "valid path (w/ S3 prefix) with trailing slash", fp: "s3://bucket/prefix/", - expectedBucket: ptr.ToString("bucket"), - expectedPrefix: ptr.ToString("prefix/"), + expectedBucket: "bucket", + expectedPrefix: "prefix/", }, { name: "valid path (w/ S3 prefix) with multiple slashes", fp: "s3://bucket/prefix/with/multiple/slashes", - expectedBucket: ptr.ToString("bucket"), - expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"), + expectedBucket: "bucket", + expectedPrefix: "prefix/with/multiple/slashes", }, // Without S3 prefix { name: "valid path (w/o S3 prefix)", fp: "bucket/prefix", - expectedBucket: ptr.ToString("bucket"), - expectedPrefix: ptr.ToString("prefix"), + expectedBucket: "bucket", + expectedPrefix: "prefix", }, { name: "valid path (w/o S3 prefix) with trailing slash", fp: "bucket/prefix/", - expectedBucket: ptr.ToString("bucket"), - expectedPrefix: ptr.ToString("prefix/"), + expectedBucket: "bucket", + expectedPrefix: "prefix/", }, { name: "valid path (w/o S3 prefix) with multiple slashes", fp: "bucket/prefix/with/multiple/slashes", - expectedBucket: ptr.ToString("bucket"), - expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"), + expectedBucket: "bucket", + expectedPrefix: "prefix/with/multiple/slashes", }, { name: "invalid path", @@ -67,8 +66,8 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) { assert.NoError(t, actualErr, tc.name) // Now check the actualBucket and prefix - assert.Equal(t, *tc.expectedBucket, *actualBucket, tc.name) - assert.Equal(t, *tc.expectedPrefix, *actualPrefix, tc.name) + assert.Equal(t, tc.expectedBucket, actualBucket, tc.name) + assert.Equal(t, tc.expectedPrefix, actualPrefix, tc.name) } } } diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 5c643c38..b7b571a7 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -30,7 +30,12 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error } if cfg.Snapshot { - return snapshot.NewStore(cfg, _awsCfg), false, nil + store, err := snapshot.NewStore(cfg, _awsCfg) + if err != nil { + return nil, false, err + } + + return store, false, nil } else { return stream.NewStore(cfg, _awsCfg), true, nil } diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index 4b9ed7de..0745ffa1 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/artie-labs/reader/lib/dynamo" - "github.com/artie-labs/reader/lib/s3lib" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" @@ -13,10 +12,6 @@ import ( ) func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) { - bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(s3FilePath) - if err != nil { - return nil, nil, err - } tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) if err != nil { diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index 72b3ae60..bc5e757b 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -18,21 +18,31 @@ import ( ) type Store struct { - tableName string - streamArn string + tableName string + streamArn string + s3BucketName string + s3PrefixName string + cfg *config.DynamoDB s3Client *s3lib.S3Client dynamoDBClient *dynamodb.Client } -func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store { +func NewStore(cfg config.DynamoDB, awsCfg aws.Config) (*Store, error) { + bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(cfg.SnapshotSettings.Folder) + if err != nil { + return nil, err + } + return &Store{ tableName: cfg.TableName, streamArn: cfg.StreamArn, + s3BucketName: bucketName, + s3PrefixName: prefixName, cfg: &cfg, - s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, awsCfg), + s3Client: s3lib.NewClient(bucketName, awsCfg), dynamoDBClient: dynamodb.NewFromConfig(awsCfg), - } + }, nil } func (s *Store) Close() error { From 03ff2701a1e846bd08130c2a3fa70f96a2138c86 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 18:34:24 -0700 Subject: [PATCH 04/13] Clean up. --- config/dynamodb.go | 9 +++--- sources/dynamodb/dynamodb.go | 2 +- sources/dynamodb/snapshot/export.go | 9 ++---- sources/dynamodb/snapshot/snapshot.go | 40 +++++++++++++++++++++++++-- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/config/dynamodb.go b/config/dynamodb.go index b4120eea..3344e9cc 100644 --- a/config/dynamodb.go +++ b/config/dynamodb.go @@ -39,11 +39,10 @@ func (d *DynamoDB) Validate() error { } type SnapshotSettings struct { - Folder string `yaml:"folder"` - // If the files are not specified, that's okay. - // We will scan the folder and then load into `specifiedFiles` - SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` - BatchSize int32 `yaml:"batchSize"` + ShouldInitiateExport bool `yaml:"shouldInitiateExport"` // Whether Reader should initiate a DDB export or not. + Folder string `yaml:"folder"` // The folder where the snapshot files will be stored. + SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"` // If this is passed in, we'll only process these files + BatchSize int32 `yaml:"batchSize"` } func (s *SnapshotSettings) GetBatchSize() int32 { diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index b7b571a7..ee56cc47 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -30,7 +30,7 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error } if cfg.Snapshot { - store, err := snapshot.NewStore(cfg, _awsCfg) + store, err := snapshot.NewStore(ctx, cfg, _awsCfg) if err != nil { return nil, false, err } diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index 0745ffa1..55fb3d8a 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -12,7 +12,6 @@ import ( ) func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) { - tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) if err != nil { return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err) @@ -34,7 +33,7 @@ func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*strin return nil, nil, fmt.Errorf("failed to describe export: %w", err) } - if exportDescription.ExportDescription.S3Bucket == bucketName && exportDescription.ExportDescription.S3Prefix == prefixName { + if *exportDescription.ExportDescription.S3Bucket == s.s3BucketName && *exportDescription.ExportDescription.S3Prefix == s.s3PrefixName { if export.ExportStatus == types.ExportStatusCompleted { return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil } @@ -73,11 +72,7 @@ func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.Expor func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) { for { - describeInput := &dynamodb.DescribeExportInput{ - ExportArn: exportARN, - } - - result, err := s.dynamoDBClient.DescribeExport(ctx, describeInput) + result, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: exportARN}) if err != nil { return nil, fmt.Errorf("failed to describe export: %w", err) } diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index bc5e757b..32013000 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -28,13 +28,13 @@ type Store struct { dynamoDBClient *dynamodb.Client } -func NewStore(cfg config.DynamoDB, awsCfg aws.Config) (*Store, error) { +func NewStore(ctx context.Context, cfg config.DynamoDB, awsCfg aws.Config) (*Store, error) { bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(cfg.SnapshotSettings.Folder) if err != nil { return nil, err } - return &Store{ + store := &Store{ tableName: cfg.TableName, streamArn: cfg.StreamArn, s3BucketName: bucketName, @@ -42,7 +42,41 @@ func NewStore(cfg config.DynamoDB, awsCfg aws.Config) (*Store, error) { cfg: &cfg, s3Client: s3lib.NewClient(bucketName, awsCfg), dynamoDBClient: dynamodb.NewFromConfig(awsCfg), - }, nil + } + + if cfg.SnapshotSettings.ShouldInitiateExport { + exportARN, manifestFilePath, err := store.findRecentExport(ctx, cfg.SnapshotSettings.Folder) + if err != nil { + return nil, err + } + + if manifestFilePath != nil { + if err = store.loadFolderFromManifest(bucketName, *manifestFilePath); err != nil { + return nil, err + } + } + + manifestFilePath, err = store.checkExportStatus(ctx, exportARN) + if err != nil { + return nil, fmt.Errorf("failed to check export status: %w", err) + } + + if err = store.loadFolderFromManifest(bucketName, *manifestFilePath); err != nil { + return nil, err + } + } + + return store, nil +} + +func (s *Store) loadFolderFromManifest(bucketName string, manifestFilePath string) error { + folder, err := dynamo.ParseManifestFile(bucketName, manifestFilePath) + if err != nil { + return fmt.Errorf("failed to parse manifest: %w", err) + } + + s.cfg.SnapshotSettings.Folder = folder + return nil } func (s *Store) Close() error { From ddb5a8a310770223e22b423a435ad9c6d12f1dc8 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 18:38:02 -0700 Subject: [PATCH 05/13] Clean up. --- sources/dynamodb/snapshot/export.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index 55fb3d8a..7435d424 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -11,6 +11,8 @@ import ( "time" ) +// findRecentExport - This will check against the DynamoDB table to see if there is a recent export for the given S3 file path. +// It will then return the exportARN, manifestFilePath and error if any. func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) { tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) if err != nil { From e9d5d16a55ce23b0a1ec63dda7bb36d27055e1a1 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 19:55:12 -0700 Subject: [PATCH 06/13] Inline. --- lib/s3lib/s3lib.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/s3lib/s3lib.go b/lib/s3lib/s3lib.go index 4fb51912..e990c81e 100644 --- a/lib/s3lib/s3lib.go +++ b/lib/s3lib/s3lib.go @@ -26,9 +26,7 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client { func BucketAndPrefixFromFilePath(fp string) (string, string, error) { // Remove the s3:// prefix if it's there - fp = strings.TrimPrefix(fp, "s3://") - - parts := strings.SplitN(fp, "/", 2) + parts := strings.SplitN(strings.TrimPrefix(fp, "s3://"), "/", 2) if len(parts) < 2 { return "", "", fmt.Errorf("invalid S3 path, missing prefix") } From 7a8df9e5d446ca2e8e85c28f284f7700be16f04d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 20:27:38 -0700 Subject: [PATCH 07/13] rm new line. --- sources/dynamodb/snapshot/export.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index 7435d424..e7b8bf6f 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -49,7 +49,6 @@ func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*strin func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) { var out []types.ExportSummary - var nextToken *string for { exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{ From 3754b8305f40797e6bb876a531b46e1959dd58cf Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 22 Aug 2024 20:28:02 -0700 Subject: [PATCH 08/13] inline. --- sources/dynamodb/snapshot/export.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index e7b8bf6f..10dc9fc0 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -51,11 +51,7 @@ func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.Expor var out []types.ExportSummary var nextToken *string for { - exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{ - TableArn: aws.String(tableARN), - NextToken: nextToken, - }) - + exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{TableArn: aws.String(tableARN), NextToken: nextToken}) if err != nil { return nil, fmt.Errorf("failed to list exports: %w", err) } From f7d7f332490d5ef21de98f17921a8a81c3a257f6 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 2 Oct 2024 15:14:05 -0700 Subject: [PATCH 09/13] Clean up. --- sources/dynamodb/snapshot/export.go | 16 ++++++++++++++-- sources/dynamodb/snapshot/snapshot.go | 5 +++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index 10dc9fc0..e4b33078 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -13,7 +13,7 @@ import ( // findRecentExport - This will check against the DynamoDB table to see if there is a recent export for the given S3 file path. // It will then return the exportARN, manifestFilePath and error if any. -func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) { +func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix string) (*string, *string, error) { tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) if err != nil { return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err) @@ -44,7 +44,19 @@ func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*strin } } - return nil, nil, fmt.Errorf("no recent export found for %s", s3FilePath) + // Not found, so let's initiate one + result, err := s.dynamoDBClient.ExportTableToPointInTime(ctx, &dynamodb.ExportTableToPointInTimeInput{ + TableArn: aws.String(tableARN), + S3Bucket: aws.String(bucket), + S3Prefix: aws.String(prefix), + ExportFormat: types.ExportFormatDynamodbJson, + }) + + if err != nil { + return nil, nil, err + } + + return result.ExportDescription.ExportArn, nil, nil } func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) { diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index 32013000..76bcdaf1 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "path/filepath" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -45,7 +46,7 @@ func NewStore(ctx context.Context, cfg config.DynamoDB, awsCfg aws.Config) (*Sto } if cfg.SnapshotSettings.ShouldInitiateExport { - exportARN, manifestFilePath, err := store.findRecentExport(ctx, cfg.SnapshotSettings.Folder) + exportARN, manifestFilePath, err := store.findRecentExport(ctx, bucketName, prefixName) if err != nil { return nil, err } @@ -75,7 +76,7 @@ func (s *Store) loadFolderFromManifest(bucketName string, manifestFilePath strin return fmt.Errorf("failed to parse manifest: %w", err) } - s.cfg.SnapshotSettings.Folder = folder + s.cfg.SnapshotSettings.Folder = filepath.Join(folder, "data") return nil } From 74f50874e197f379fa5214a6a52b16c1fea01569 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 2 Oct 2024 15:38:19 -0700 Subject: [PATCH 10/13] Clean up. --- lib/dynamo/util.go | 8 ++++++-- lib/dynamo/util_test.go | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/dynamo/util.go b/lib/dynamo/util.go index 3b5717cb..f12575bf 100644 --- a/lib/dynamo/util.go +++ b/lib/dynamo/util.go @@ -9,7 +9,7 @@ import ( func GetTableArnFromStreamArn(streamArn string) (string, error) { parts := strings.Split(streamArn, "/stream/") if len(parts) != 2 { - return "", fmt.Errorf("invalid stream ARN: %s", streamArn) + return "", fmt.Errorf("invalid stream ARN: %q", streamArn) } return parts[0], nil @@ -17,9 +17,13 @@ func GetTableArnFromStreamArn(streamArn string) (string, error) { func ParseManifestFile(bucket string, manifestFilePath string) (string, error) { if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") { - return "", fmt.Errorf("invalid manifest filepath: %s", manifestFilePath) + return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath) } parts := strings.Split(manifestFilePath, "/") + if len(parts) == 0 { + return "", fmt.Errorf("invalid manifest filepath: %q", manifestFilePath) + } + return filepath.Join(bucket, strings.Join(parts[:len(parts)-1], "/")), nil } diff --git a/lib/dynamo/util_test.go b/lib/dynamo/util_test.go index cb422cff..f77157d0 100644 --- a/lib/dynamo/util_test.go +++ b/lib/dynamo/util_test.go @@ -15,7 +15,7 @@ func TestGetTableArnFromStreamArn(t *testing.T) { { // Invalid stream ARN _, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table") - assert.ErrorContains(t, err, "invalid stream ARN: arn:aws:dynamodb:us-west-2:123456789012:table/my-table") + assert.ErrorContains(t, err, `invalid stream ARN: "arn:aws:dynamodb:us-west-2:123456789012:table/my-table"`) } } @@ -29,6 +29,6 @@ func TestParseManifestFile(t *testing.T) { { // Invalid manifest file path _, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary") - assert.ErrorContains(t, err, "invalid manifest filepath: artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary") + assert.ErrorContains(t, err, `invalid manifest filepath: "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary"`) } } From d28fbb9980fa6a5f3f83e8ac38ddbb1d93348701 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 2 Oct 2024 15:41:01 -0700 Subject: [PATCH 11/13] Clean up. --- sources/dynamodb/snapshot/export.go | 2 +- sources/dynamodb/snapshot/snapshot.go | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index e4b33078..fd1bb415 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -35,7 +35,7 @@ func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix stri return nil, nil, fmt.Errorf("failed to describe export: %w", err) } - if *exportDescription.ExportDescription.S3Bucket == s.s3BucketName && *exportDescription.ExportDescription.S3Prefix == s.s3PrefixName { + if *exportDescription.ExportDescription.S3Bucket == bucket && *exportDescription.ExportDescription.S3Prefix == prefix { if export.ExportStatus == types.ExportStatusCompleted { return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil } diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index 76bcdaf1..ba071d6e 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -19,11 +19,8 @@ import ( ) type Store struct { - tableName string - streamArn string - s3BucketName string - s3PrefixName string - + tableName string + streamArn string cfg *config.DynamoDB s3Client *s3lib.S3Client dynamoDBClient *dynamodb.Client @@ -38,8 +35,6 @@ func NewStore(ctx context.Context, cfg config.DynamoDB, awsCfg aws.Config) (*Sto store := &Store{ tableName: cfg.TableName, streamArn: cfg.StreamArn, - s3BucketName: bucketName, - s3PrefixName: prefixName, cfg: &cfg, s3Client: s3lib.NewClient(bucketName, awsCfg), dynamoDBClient: dynamodb.NewFromConfig(awsCfg), From b8c9ba87968e2b679e1bc4ad9c7e1962421ef16f Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 2 Oct 2024 15:41:58 -0700 Subject: [PATCH 12/13] Clean up. --- lib/s3lib/s3lib_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/s3lib/s3lib_test.go b/lib/s3lib/s3lib_test.go index 79fd7e72..c6775a61 100644 --- a/lib/s3lib/s3lib_test.go +++ b/lib/s3lib/s3lib_test.go @@ -39,5 +39,12 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) { assert.Equal(t, "bucket", bucket) assert.Equal(t, "prefix", prefix) } + { + // S3 prefix long + bucket, prefix, err := BucketAndPrefixFromFilePath("s3://bucket/prefix/long") + assert.NoError(t, err) + assert.Equal(t, "bucket", bucket) + assert.Equal(t, "prefix/long", prefix) + } } } From 43fa058e2515335bd23e29c4f81066106c32a751 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 2 Oct 2024 15:45:49 -0700 Subject: [PATCH 13/13] Clean up. --- sources/dynamodb/snapshot/export.go | 52 +++++++++++++-------------- sources/dynamodb/snapshot/snapshot.go | 13 +++---- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/sources/dynamodb/snapshot/export.go b/sources/dynamodb/snapshot/export.go index fd1bb415..b19f025c 100644 --- a/sources/dynamodb/snapshot/export.go +++ b/sources/dynamodb/snapshot/export.go @@ -3,16 +3,36 @@ package snapshot import ( "context" "fmt" - "github.com/artie-labs/reader/lib/dynamo" + "log/slog" + "time" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "log/slog" - "time" + + "github.com/artie-labs/reader/lib/dynamo" ) -// findRecentExport - This will check against the DynamoDB table to see if there is a recent export for the given S3 file path. -// It will then return the exportARN, manifestFilePath and error if any. +func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) { + var out []types.ExportSummary + var nextToken *string + for { + exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{TableArn: aws.String(tableARN), NextToken: nextToken}) + if err != nil { + return nil, fmt.Errorf("failed to list exports: %w", err) + } + + out = append(out, exports.ExportSummaries...) + if exports.NextToken == nil { + break + } + + nextToken = exports.NextToken + } + + return out, nil +} + func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix string) (*string, *string, error) { tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn) if err != nil { @@ -26,7 +46,7 @@ func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix stri for _, export := range exports { if export.ExportStatus == types.ExportStatusFailed { - slog.Info("Filtering out failed export", slog.String("exportARN", *export.ExportArn)) + slog.Info("Filtering out failed exports", slog.String("exportARN", *export.ExportArn)) continue } @@ -59,26 +79,6 @@ func (s *Store) findRecentExport(ctx context.Context, bucket string, prefix stri return result.ExportDescription.ExportArn, nil, nil } -func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) { - var out []types.ExportSummary - var nextToken *string - for { - exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{TableArn: aws.String(tableARN), NextToken: nextToken}) - if err != nil { - return nil, fmt.Errorf("failed to list exports: %w", err) - } - - out = append(out, exports.ExportSummaries...) - if exports.NextToken == nil { - break - } - - nextToken = exports.NextToken - } - - return out, nil -} - func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) { for { result, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: exportARN}) diff --git a/sources/dynamodb/snapshot/snapshot.go b/sources/dynamodb/snapshot/snapshot.go index ba071d6e..5873c9c4 100644 --- a/sources/dynamodb/snapshot/snapshot.go +++ b/sources/dynamodb/snapshot/snapshot.go @@ -46,17 +46,14 @@ func NewStore(ctx context.Context, cfg config.DynamoDB, awsCfg aws.Config) (*Sto return nil, err } - if manifestFilePath != nil { - if err = store.loadFolderFromManifest(bucketName, *manifestFilePath); err != nil { - return nil, err + if manifestFilePath == nil { + // This means that the export is not done yet, so let's wait. + manifestFilePath, err = store.checkExportStatus(ctx, exportARN) + if err != nil { + return nil, fmt.Errorf("failed to check export status: %w", err) } } - manifestFilePath, err = store.checkExportStatus(ctx, exportARN) - if err != nil { - return nil, fmt.Errorf("failed to check export status: %w", err) - } - if err = store.loadFolderFromManifest(bucketName, *manifestFilePath); err != nil { return nil, err }