diff --git a/config/dynamodb.go b/config/dynamodb.go index 9988df3f..a5daea6c 100644 --- a/config/dynamodb.go +++ b/config/dynamodb.go @@ -15,6 +15,7 @@ type DynamoDB struct { AwsSecretAccessKey string `yaml:"awsSecretAccessKey"` StreamArn string `yaml:"streamArn"` TableName string `yaml:"tableName"` + MaxConcurrency int64 `yaml:"__maxConcurrency"` Snapshot bool `yaml:"snapshot"` SnapshotSettings *SnapshotSettings `yaml:"snapshotSettings"` diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 3980d3bd..1826775b 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -1,6 +1,7 @@ package dynamodb import ( + "cmp" "fmt" "time" @@ -19,10 +20,9 @@ import ( ) const ( - jitterSleepBaseMs = 100 - shardScannerInterval = 5 * time.Minute - // concurrencyLimit is the maximum number of shards we should be processing at once - concurrencyLimit = 30 + jitterSleepBaseMs = 100 + shardScannerInterval = 5 * time.Minute + defaultConcurrencyLimit int64 = 100 ) func Load(cfg config.DynamoDB) (sources.Source, bool, error) { @@ -43,7 +43,8 @@ func Load(cfg config.DynamoDB) (sources.Source, bool, error) { s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, sess), }, false, nil } else { - _throttler, err := throttler.NewThrottler(concurrencyLimit) + // TODO: Should we be throttling based on go-routines? Or should we be using buffered channels? + _throttler, err := throttler.NewThrottler(cmp.Or(cfg.MaxConcurrency, defaultConcurrencyLimit)) if err != nil { return nil, false, fmt.Errorf("failed to create throttler: %w", err) }