Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-s3-sdk-only
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Tang <[email protected]>
  • Loading branch information
Tang8330 authored Aug 6, 2024
2 parents 7759d05 + dea17ce commit f979ca0
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ Artie Reader reads from databases to perform historical snapshots and also reads

## Supports:

| | Snapshot | Streaming |
|------------|----------|-----------------|
| DynamoDB || |
| MongoDB || 🚧 Experimental |
| MySQL || |
| PostgreSQL || |
| SQL Server || |
| | Snapshot | Streaming |
|------------|----------|-----------|
| DynamoDB |||
| MongoDB || |
| MySQL |||
| PostgreSQL |||
| SQL Server |||


## Running
Expand Down
1 change: 1 addition & 0 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type DynamoDB struct {
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
StreamArn string `yaml:"streamArn"`
TableName string `yaml:"tableName"`
MaxConcurrency int64 `yaml:"__maxConcurrency"`

Snapshot bool `yaml:"snapshot"`
SnapshotSettings *SnapshotSettings `yaml:"snapshotSettings"`
Expand Down
1 change: 1 addition & 0 deletions config/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type MongoDB struct {
Database string `yaml:"database"`
Collections []Collection `yaml:"collections"`
StreamingSettings StreamingSettings `yaml:"streamingSettings,omitempty"`
DisableTLS bool `yaml:"disableTLS"`
}

type Collection struct {
Expand Down
11 changes: 6 additions & 5 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamodb

import (
"context"
"cmp"
"fmt"
"time"

Expand All @@ -22,10 +23,9 @@ import (
)

const (
jitterSleepBaseMs = 100
shardScannerInterval = 5 * time.Minute
// concurrencyLimit is the maximum number of shards we should be processing at once
concurrencyLimit = 30
jitterSleepBaseMs = 100
shardScannerInterval = 5 * time.Minute
defaultConcurrencyLimit int64 = 100
)

func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error) {
Expand Down Expand Up @@ -56,7 +56,8 @@ func Load(ctx context.Context, cfg config.DynamoDB) (sources.Source, bool, error
s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, _awsCfg),
}, false, nil
} else {
_throttler, err := throttler.NewThrottler(concurrencyLimit)
// TODO: Should we be throttling based on go-routines? Or should we be using buffered channels?
_throttler, err := throttler.NewThrottler(cmp.Or(cfg.MaxConcurrency, defaultConcurrencyLimit))
if err != nil {
return nil, false, fmt.Errorf("failed to create throttler: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion sources/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func Load(cfg config.MongoDB) (*Source, bool, error) {
Password: cfg.Password,
}

opts := options.Client().ApplyURI(cfg.Host).SetAuth(creds).SetTLSConfig(&tls.Config{})
opts := options.Client().ApplyURI(cfg.Host).SetAuth(creds)
if !cfg.DisableTLS {
opts = opts.SetTLSConfig(&tls.Config{})
}

ctx := context.Background()
client, err := mongo.Connect(ctx, opts)
if err != nil {
Expand Down

0 comments on commit f979ca0

Please sign in to comment.