Skip to content

Commit

Permalink
[DynamoDB] Increase concurrency limit (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 5, 2024
1 parent 155c9fa commit bf52cac
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
1 change: 1 addition & 0 deletions config/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type DynamoDB struct {
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
StreamArn string `yaml:"streamArn"`
TableName string `yaml:"tableName"`
MaxConcurrency int64 `yaml:"__maxConcurrency"`

Snapshot bool `yaml:"snapshot"`
SnapshotSettings *SnapshotSettings `yaml:"snapshotSettings"`
Expand Down
11 changes: 6 additions & 5 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dynamodb

import (
"cmp"
"fmt"
"time"

Expand All @@ -19,10 +20,9 @@ import (
)

const (
jitterSleepBaseMs = 100
shardScannerInterval = 5 * time.Minute
// concurrencyLimit is the maximum number of shards we should be processing at once
concurrencyLimit = 30
jitterSleepBaseMs = 100
shardScannerInterval = 5 * time.Minute
defaultConcurrencyLimit int64 = 100
)

func Load(cfg config.DynamoDB) (sources.Source, bool, error) {
Expand All @@ -43,7 +43,8 @@ func Load(cfg config.DynamoDB) (sources.Source, bool, error) {
s3Client: s3lib.NewClient(cfg.SnapshotSettings.S3Bucket, sess),
}, false, nil
} else {
_throttler, err := throttler.NewThrottler(concurrencyLimit)
// TODO: Should we be throttling based on go-routines? Or should we be using buffered channels?
_throttler, err := throttler.NewThrottler(cmp.Or(cfg.MaxConcurrency, defaultConcurrencyLimit))
if err != nil {
return nil, false, fmt.Errorf("failed to create throttler: %w", err)
}
Expand Down

0 comments on commit bf52cac

Please sign in to comment.