Skip to content

Commit

Permalink
Optimize performance
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba committed Sep 26, 2023
1 parent 54f059a commit 12b2474
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public class ShardConsumer implements Runnable {
// Idle Time between Reads
private static final int GET_RECORD_INTERVAL_MILLS = 200;

private static final int MAX_GET_RECORD_BACKOFF = 3;

private static final int DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS = 60_000;

private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000;
Expand Down Expand Up @@ -125,8 +123,6 @@ public void run() {
long lastCheckpointTime = System.currentTimeMillis();
String sequenceNumber = "";

int backoff = 0;

while (!shouldStop) {
if (shardIterator == null) {
// End of Shard
Expand Down Expand Up @@ -159,17 +155,7 @@ public void run() {

shardIterator = response.nextShardIterator();

if (response.records().isEmpty()) {
if (backoff < MAX_GET_RECORD_BACKOFF) {
backoff++;
}
} else {

if (response.records().size() == MAX_GET_RECORD_ITEM_COUNT) {
// Reduce backoff when reaching to max records per GetRecords call.
backoff--;
}

if (!response.records().isEmpty()) {
// Always use the last sequence number for checkpoint
sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber();

Expand All @@ -193,8 +179,7 @@ public void run() {
}
try {
// Idle between get records call.
// Add a backoff on sleep time: 200ms, 400ms, 800ms...
Thread.sleep((long) GET_RECORD_INTERVAL_MILLS << backoff);
Thread.sleep(GET_RECORD_INTERVAL_MILLS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class StreamScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);

private static final int MAX_JOB_COUNT = 10;
private static final int MAX_JOB_COUNT = 50;
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000;

private final AtomicInteger numOfWorkers = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.SourceCoordinationStore;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;

Expand Down Expand Up @@ -78,6 +83,9 @@ void setup() {
lenient().when(pluginFactory.loadPlugin(eq(SourceCoordinationStore.class), any(PluginSetting.class)))
.thenReturn(coordinationStore);
lenient().when(sourceConfig.getTableConfigs()).thenReturn(Collections.emptyList());
lenient().doNothing().when(coordinationStore).initializeStore();
lenient().when(coordinationStore.tryCreatePartitionItem(anyString(), anyString(), any(SourcePartitionStatus.class), anyLong(), anyString())).thenReturn(true);
lenient().when(coordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class))).thenReturn(Optional.empty());
}

private DynamoDBSource createObjectUnderTest() {
Expand Down

0 comments on commit 12b2474

Please sign in to comment.