diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 01d9ca99c0..f9fa6b355f 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -34,8 +34,6 @@ public class ShardConsumer implements Runnable { // Idle Time between Reads private static final int GET_RECORD_INTERVAL_MILLS = 200; - private static final int MAX_GET_RECORD_BACKOFF = 3; - private static final int DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS = 60_000; private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000; @@ -125,8 +123,6 @@ public void run() { long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = ""; - int backoff = 0; - while (!shouldStop) { if (shardIterator == null) { // End of Shard @@ -159,17 +155,7 @@ public void run() { shardIterator = response.nextShardIterator(); - if (response.records().isEmpty()) { - if (backoff < MAX_GET_RECORD_BACKOFF) { - backoff++; - } - } else { - - if (response.records().size() == MAX_GET_RECORD_ITEM_COUNT) { - // Reduce backoff when reaching to max records per GetRecords call. - backoff--; - } - + if (!response.records().isEmpty()) { // Always use the last sequence number for checkpoint sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber(); @@ -193,8 +179,7 @@ public void run() { } try { // Idle between get records call. - // Add a backoff on sleep time: 200ms, 400ms, 800ms... - Thread.sleep((long) GET_RECORD_INTERVAL_MILLS << backoff); + Thread.sleep(GET_RECORD_INTERVAL_MILLS); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index f533098b6d..4fa76da968 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -26,7 +26,7 @@ public class StreamScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); - private static final int MAX_JOB_COUNT = 10; + private static final int MAX_JOB_COUNT = 50; private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000; private final AtomicInteger numOfWorkers = new AtomicInteger(0); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java index 2e1d0cae85..5b78965846 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java @@ -19,14 +19,19 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.SourceCoordinationStore; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; +import java.util.Optional; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; @@ -78,6 +83,9 @@ void setup() { lenient().when(pluginFactory.loadPlugin(eq(SourceCoordinationStore.class), any(PluginSetting.class))) .thenReturn(coordinationStore); lenient().when(sourceConfig.getTableConfigs()).thenReturn(Collections.emptyList()); + lenient().doNothing().when(coordinationStore).initializeStore(); + lenient().when(coordinationStore.tryCreatePartitionItem(anyString(), anyString(), any(SourcePartitionStatus.class), anyLong(), anyString())).thenReturn(true); + lenient().when(coordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class))).thenReturn(Optional.empty()); } private DynamoDBSource createObjectUnderTest() {