Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 23, 2024
1 parent e0fa8ec commit 3709efa
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 3 deletions.
26 changes: 26 additions & 0 deletions lib/dynamo/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dynamo

import (
"fmt"
"path/filepath"
"strings"
)

func GetTableArnFromStreamArn(streamArn string) (string, error) {
parts := strings.Split(streamArn, "/stream/")
if len(parts) != 2 {
return "", fmt.Errorf("invalid stream ARN: %s", streamArn)
}

return parts[0], nil
}

func ParseManifestFile(bucket string, manifestFilePath string) (string, error) {
// artie-ddb-export/AWSDynamoDB/01722458674792-8831c8f6/manifest-summary.json
if !strings.HasSuffix(manifestFilePath, "manifest-summary.json") {
return "", fmt.Errorf("invalid manifest filepath: %s", manifestFilePath)
}

parts := strings.Split(manifestFilePath, "/")
return filepath.Join(bucket, strings.Join(parts[:len(parts)-1], "/")), nil
}
34 changes: 34 additions & 0 deletions lib/dynamo/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dynamo

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetTableArnFromStreamArn(t *testing.T) {
{
// Valid stream ARN
tableArn, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table/stream/2021-01-01T00:00:00.000")
assert.NoError(t, err)
assert.Equal(t, "arn:aws:dynamodb:us-west-2:123456789012:table/my-table", tableArn)
}
{
// Invalid stream ARN
_, err := GetTableArnFromStreamArn("arn:aws:dynamodb:us-west-2:123456789012:table/my-table")
assert.ErrorContains(t, err, "invalid stream ARN: arn:aws:dynamodb:us-west-2:123456789012:table/my-table")
}
}

func TestParseManifestFile(t *testing.T) {
{
// Valid manifest file path
bucket, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary.json")
assert.NoError(t, err)
assert.Equal(t, "bucket/artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6", bucket)
}
{
// Invalid manifest file path
_, err := ParseManifestFile("bucket", "artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary")
assert.ErrorContains(t, err, "invalid manifest filepath: artie-ddb-export/AWSDynamoDB/abcdef-8831c8f6/manifest-summary")
}
}
4 changes: 2 additions & 2 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client {
}
}

func bucketAndPrefixFromFilePath(fp string) (*string, *string, error) {
func BucketAndPrefixFromFilePath(fp string) (*string, *string, error) {
// Remove the s3:// prefix if it's there
fp = strings.TrimPrefix(fp, "s3://")

Expand All @@ -43,7 +43,7 @@ type S3File struct {
}

func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
bucket, prefix, err := bucketAndPrefixFromFilePath(fp)
bucket, prefix, err := BucketAndPrefixFromFilePath(fp)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion lib/s3lib/s3lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) {
}

for _, tc := range tcs {
actualBucket, actualPrefix, actualErr := bucketAndPrefixFromFilePath(tc.fp)
actualBucket, actualPrefix, actualErr := BucketAndPrefixFromFilePath(tc.fp)
if tc.expectedErr != "" {
assert.ErrorContains(t, actualErr, tc.expectedErr, tc.name)
} else {
Expand Down
102 changes: 102 additions & 0 deletions sources/dynamodb/snapshot/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package snapshot

import (
"context"
"fmt"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"log/slog"
"time"
)

func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) {

Check failure on line 15 in sources/dynamodb/snapshot/export.go

View workflow job for this annotation

GitHub Actions / test

func (*Store).findRecentExport is unused (U1000)
bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(s3FilePath)
if err != nil {
return nil, nil, err
}

tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn)
if err != nil {
return nil, nil, fmt.Errorf("failed to get table ARN from stream ARN: %w", err)
}

exports, err := s.listExports(ctx, tableARN)
if err != nil {
return nil, nil, fmt.Errorf("failed to list exports: %w", err)
}

for _, export := range exports {
if export.ExportStatus == types.ExportStatusFailed {
slog.Info("Filtering out failed export", slog.String("exportARN", *export.ExportArn))
continue
}

exportDescription, err := s.dynamoDBClient.DescribeExport(ctx, &dynamodb.DescribeExportInput{ExportArn: export.ExportArn})
if err != nil {
return nil, nil, fmt.Errorf("failed to describe export: %w", err)
}

if exportDescription.ExportDescription.S3Bucket == bucketName && exportDescription.ExportDescription.S3Prefix == prefixName {
if export.ExportStatus == types.ExportStatusCompleted {
return export.ExportArn, exportDescription.ExportDescription.ExportManifest, nil
}

return export.ExportArn, nil, nil
}
}

return nil, nil, fmt.Errorf("no recent export found for %s", s3FilePath)
}

func (s *Store) listExports(ctx context.Context, tableARN string) ([]types.ExportSummary, error) {

Check failure on line 54 in sources/dynamodb/snapshot/export.go

View workflow job for this annotation

GitHub Actions / test

func (*Store).listExports is unused (U1000)
var out []types.ExportSummary

var nextToken *string
for {
exports, err := s.dynamoDBClient.ListExports(ctx, &dynamodb.ListExportsInput{
TableArn: aws.String(tableARN),
NextToken: nextToken,
})

if err != nil {
return nil, fmt.Errorf("failed to list exports: %w", err)
}

out = append(out, exports.ExportSummaries...)
if exports.NextToken == nil {
break
}

nextToken = exports.NextToken
}

return out, nil
}

func (s *Store) checkExportStatus(ctx context.Context, exportARN *string) (*string, error) {

Check failure on line 79 in sources/dynamodb/snapshot/export.go

View workflow job for this annotation

GitHub Actions / test

func (*Store).checkExportStatus is unused (U1000)
for {
describeInput := &dynamodb.DescribeExportInput{
ExportArn: exportARN,
}

result, err := s.dynamoDBClient.DescribeExport(ctx, describeInput)
if err != nil {
return nil, fmt.Errorf("failed to describe export: %w", err)
}

switch result.ExportDescription.ExportStatus {
case types.ExportStatusCompleted:
return result.ExportDescription.ExportManifest, nil
case types.ExportStatusFailed:
return nil, fmt.Errorf("export has failed: %s", *result.ExportDescription.FailureMessage)
case types.ExportStatusInProgress:
slog.Info("Export is still in progress")
time.Sleep(30 * time.Second)
default:
return nil, fmt.Errorf("unknown export status: %s", string(result.ExportDescription.ExportStatus))
}
}
}

0 comments on commit 3709efa

Please sign in to comment.