diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index 1a9e554f..be10ce18 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -23,7 +23,7 @@ func (s *StreamStore) ListenToChannel(ctx context.Context, writer writers.Writer } } -func (s *StreamStore) processShard(ctx context.Context, shard *types.Shard, writer writers.Writer) { +func (s *StreamStore) processShard(ctx context.Context, shard types.Shard, writer writers.Writer) { // Is there another go-routine processing this shard? if s.storage.GetShardProcessing(*shard.ShardId) { return diff --git a/sources/dynamodb/stream.go b/sources/dynamodb/stream.go index 14f85b52..9f38790e 100644 --- a/sources/dynamodb/stream.go +++ b/sources/dynamodb/stream.go @@ -21,7 +21,7 @@ type StreamStore struct { streams *dynamodbstreams.Client storage *offsets.OffsetStorage - shardChan chan *types.Shard + shardChan chan types.Shard } func (s *StreamStore) Close() error { @@ -72,7 +72,7 @@ func (s *StreamStore) scanForNewShards(ctx context.Context) error { } for _, shard := range result.StreamDescription.Shards { - s.shardChan <- &shard + s.shardChan <- shard } if result.StreamDescription.LastEvaluatedShardId == nil {