Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 23, 2024
1 parent e9bbca4 commit 8f42175
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 40 deletions.
7 changes: 1 addition & 6 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (d *DynamoDB) Validate() error {
}

type SnapshotSettings struct {
S3Bucket string `yaml:"s3Bucket"`
Folder string `yaml:"folder"`
Folder string `yaml:"folder"`
// If the files are not specified, that's okay.
// We will scan the folder and then load into `specifiedFiles`
SpecifiedFiles []s3lib.S3File `yaml:"specifiedFiles"`
Expand All @@ -60,9 +59,5 @@ func (s *SnapshotSettings) Validate() error {
return fmt.Errorf("folder is empty")
}

if s.S3Bucket == "" {
return fmt.Errorf("s3Bucket is empty")
}

return nil
}
10 changes: 4 additions & 6 deletions lib/s3lib/s3lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ func NewClient(bucketName string, awsCfg aws.Config) *S3Client {
}
}

func BucketAndPrefixFromFilePath(fp string) (*string, *string, error) {
func BucketAndPrefixFromFilePath(fp string) (string, string, error) {
// Remove the s3:// prefix if it's there
fp = strings.TrimPrefix(fp, "s3://")

parts := strings.SplitN(fp, "/", 2)
if len(parts) < 2 {
return nil, nil, fmt.Errorf("invalid S3 path, missing prefix")
return "", "", fmt.Errorf("invalid S3 path, missing prefix")
}

bucket := parts[0]
prefix := parts[1]
return &bucket, &prefix, nil
return parts[0], parts[1], nil
}

type S3File struct {
Expand All @@ -49,7 +47,7 @@ func (s *S3Client) ListFiles(ctx context.Context, fp string) ([]S3File, error) {
}

var files []S3File
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: bucket, Prefix: prefix})
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &prefix})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
Expand Down
33 changes: 16 additions & 17 deletions lib/s3lib/s3lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,53 @@ package s3lib
import (
"testing"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/stretchr/testify/assert"
)

func TestBucketAndPrefixFromFilePath(t *testing.T) {
tcs := []struct {
name string
fp string
expectedBucket *string
expectedPrefix *string
expectedBucket string
expectedPrefix string
expectedErr string
}{
{
name: "valid path (w/ S3 prefix)",
fp: "s3://bucket/prefix",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix"),
expectedBucket: "bucket",
expectedPrefix: "prefix",
},
{
name: "valid path (w/ S3 prefix) with trailing slash",
fp: "s3://bucket/prefix/",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/"),
expectedBucket: "bucket",
expectedPrefix: "prefix/",
},
{
name: "valid path (w/ S3 prefix) with multiple slashes",
fp: "s3://bucket/prefix/with/multiple/slashes",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"),
expectedBucket: "bucket",
expectedPrefix: "prefix/with/multiple/slashes",
},
// Without S3 prefix
{
name: "valid path (w/o S3 prefix)",
fp: "bucket/prefix",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix"),
expectedBucket: "bucket",
expectedPrefix: "prefix",
},
{
name: "valid path (w/o S3 prefix) with trailing slash",
fp: "bucket/prefix/",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/"),
expectedBucket: "bucket",
expectedPrefix: "prefix/",
},
{
name: "valid path (w/o S3 prefix) with multiple slashes",
fp: "bucket/prefix/with/multiple/slashes",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"),
expectedBucket: "bucket",
expectedPrefix: "prefix/with/multiple/slashes",
},
{
name: "invalid path",
Expand All @@ -67,8 +66,8 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) {
assert.NoError(t, actualErr, tc.name)

// Now check the actualBucket and prefix
assert.Equal(t, *tc.expectedBucket, *actualBucket, tc.name)
assert.Equal(t, *tc.expectedPrefix, *actualPrefix, tc.name)
assert.Equal(t, tc.expectedBucket, actualBucket, tc.name)
assert.Equal(t, tc.expectedPrefix, actualPrefix, tc.name)
}
}
}
7 changes: 6 additions & 1 deletion sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error
}

if cfg.Snapshot {
return snapshot.NewStore(cfg, _awsCfg), false, nil
store, err := snapshot.NewStore(cfg, _awsCfg)
if err != nil {
return nil, false, err
}

return store, false, nil
} else {
return stream.NewStore(cfg, _awsCfg), true, nil
}
Expand Down
5 changes: 0 additions & 5 deletions sources/dynamodb/snapshot/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/s3lib"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
Expand All @@ -13,10 +12,6 @@ import (
)

func (s *Store) findRecentExport(ctx context.Context, s3FilePath string) (*string, *string, error) {
bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(s3FilePath)
if err != nil {
return nil, nil, err
}

tableARN, err := dynamo.GetTableArnFromStreamArn(s.streamArn)
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions sources/dynamodb/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,31 @@ import (
)

type Store struct {
tableName string
streamArn string
tableName string
streamArn string
s3BucketName string
s3PrefixName string

cfg *config.DynamoDB
s3Client *s3lib.S3Client
dynamoDBClient *dynamodb.Client
}

func NewStore(cfg config.DynamoDB, awsCfg aws.Config) *Store {
func NewStore(cfg config.DynamoDB, awsCfg aws.Config) (*Store, error) {
bucketName, prefixName, err := s3lib.BucketAndPrefixFromFilePath(cfg.SnapshotSettings.Folder)
if err != nil {
return nil, err
}

return &Store{
tableName: cfg.TableName,
streamArn: cfg.StreamArn,
s3BucketName: bucketName,
s3PrefixName: prefixName,
cfg: &cfg,
s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, awsCfg),
s3Client: s3lib.NewClient(bucketName, awsCfg),
dynamoDBClient: dynamodb.NewFromConfig(awsCfg),
}
}, nil
}

func (s *Store) Close() error {
Expand Down

0 comments on commit 8f42175

Please sign in to comment.