Skip to content

Commit

Permalink
Optimize performance
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba committed Sep 25, 2023
1 parent 63c26df commit 54f059a
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ public void init() {
}

LOG.info("Start validating table configurations");
List<TableInfo> tableInfos = tableConfigs.stream().map(this::getTableInfo).collect(Collectors.toList());
List<TableInfo> tableInfos;
try {
tableInfos = tableConfigs.stream().map(this::getTableInfo).collect(Collectors.toList());
} catch (Exception e) {
coordinator.giveUpPartition(initPartition.get());
throw e;
}

tableInfos.forEach(tableInfo -> {
// Create a Global state in the coordination table for the configuration.
Expand Down Expand Up @@ -259,15 +265,15 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
}

StreamConfig.StartPosition streamStartPosition = null;
// Validate if DynamoDB Stream is turn on or not

if (tableConfig.getStreamConfig() != null) {
streamStartPosition = tableConfig.getStreamConfig().getStartPosition();
// Validate if DynamoDB Stream is turn on or not
if (describeTableResult.table().streamSpecification() == null) {
String errorMessage = "Steam is not enabled for table " + tableConfig.getTableArn();
LOG.error(errorMessage);
throw new InvalidPluginConfigurationException(errorMessage);
}

// Validate view type of DynamoDB stream
if (describeTableResult.table().streamSpecification() != null) {
String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString();
LOG.debug("The stream view type for table " + tableName + " is " + viewType);
Expand All @@ -278,6 +284,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
throw new InvalidPluginConfigurationException(errorMessage);
}
}
streamStartPosition = tableConfig.getStreamConfig().getStartPosition();
}

// Conduct metadata info
Expand All @@ -288,7 +295,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
.streamArn(describeTableResult.table().latestStreamArn())
.streamRequired(tableConfig.getStreamConfig() != null)
.exportRequired(tableConfig.getExportConfig() != null)
.streamStartPosition(streamStartPosition.name())
.streamStartPosition(streamStartPosition == null ? null : streamStartPosition.name())
.exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket())
.exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class RecordConverter {

private static final String DEFAULT_ACTION = "index";

private static final int DEFAULT_WRITE_TIMEOUT_MILLIS = 10_000;
private static final int DEFAULT_WRITE_TIMEOUT_MILLIS = 60_000;

private final Buffer<Record<Event>> buffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public <T> void giveUpPartition(SourcePartition<T> partition) {

// Throws UpdateException if update failed.
coordinationStore.tryUpdateSourcePartitionItem(updateItem);
LOG.info("Partition key {} was given up by owner {}", partition.getPartitionKey());
LOG.info("Partition key {} was given up by owner {}", partition.getPartitionKey(), hostName);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* <p>
* Still trying to make this interface generic, considering that it may be merged with {@link org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator} in the future.
* </p>
* Some of the key differences includes:
* The major differences include:
* <ul>
* <li>Support different types of partitions</li>
* <li>Support multiple types of partition</li>
* <li>Support multiple leases at the same time</li>
* </ul>
*/
Expand All @@ -41,7 +41,7 @@ public interface EnhancedSourceCoordinator {
Optional<SourcePartition> acquireAvailablePartition(String partitionType);

/**
* This method is used to upgate progress state for a partition in the coordination store.
* This method is used to update progress state for a partition in the coordination store.
* It will also extend the timeout for ownership.
*
* @param partition The partition to be updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -51,7 +50,7 @@ private DataFileLoader(Builder builder) {
this.bucketName = builder.bucketName;
this.key = builder.key;
this.checkpointer = builder.checkpointer;
this.startLine = builder().startLine;
this.startLine = builder.startLine;
}

public static Builder builder() {
Expand Down Expand Up @@ -121,6 +120,7 @@ public void run() {

// line count regardless the start line number
int lineCount = 0;
int lastLineProcessed = 0;

try (GZIPInputStream gzipInputStream = new GZIPInputStream(s3ObjectReader.readFile(bucketName, key))) {
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));
Expand All @@ -129,9 +129,9 @@ public void run() {
while ((line = reader.readLine()) != null) {

if (shouldStop) {
checkpointer.release(lineCount);
LOG.error("Load is interrupted");
break;
checkpointer.checkpoint(lastLineProcessed);
LOG.debug("Should Stop flag is set to True, looks like shutdown has triggered");
throw new RuntimeException("Load is interrupted");
}

lineCount += 1;
Expand All @@ -143,33 +143,34 @@ public void run() {
lines.add(line);

if ((lineCount - startLine) % DEFAULT_BATCH_SIZE == 0) {
LOG.debug("Write to buffer for line " + (lineCount - DEFAULT_BATCH_SIZE) + " to " + lineCount);
// LOG.debug("Write to buffer for line " + (lineCount - DEFAULT_BATCH_SIZE) + " to " + lineCount);
recordConverter.writeToBuffer(lines);
lines.clear();
lastLineProcessed = lineCount;
}

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Perform regular checkpointing for Data File Loader");
checkpointer.checkpoint(lineCount);
checkpointer.checkpoint(lastLineProcessed);
lastCheckpointTime = System.currentTimeMillis();

}

}
if (!lines.isEmpty()) {
// Do final checkpoint.
recordConverter.writeToBuffer(lines);
checkpointer.checkpoint(lineCount);
}

lines.clear();
reader.close();
} catch (IOException e) {
LOG.error(e.getMessage());
checkpointer.release(lineCount);
}
if (!shouldStop) {
checkpointer.complete(lineCount);
LOG.debug("Data Loader completed successfully");
} catch (Exception e) {
checkpointer.checkpoint(lineCount);
throw new RuntimeException("Data Loader completed with Exception: " + e.getMessage());
}

LOG.debug("Quit DataLoader Thread");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Factory class for DataFileLoader thread.
*/
public class DataFileLoaderFactory {


private final EnhancedSourceCoordinator coordinator;


private final S3ObjectReader fileReader;

private final PluginMetrics pluginMetrics;
Expand All @@ -35,13 +36,10 @@ public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3C
}

public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo) {
// String exportArn = dataFilePartition.getExportArn();
// String tableArn = getTableArn(exportArn);
//
// TableInfo tableInfo = getTableInfo(tableArn);
ExportRecordConverter recordProcessor = new ExportRecordConverter(buffer, tableInfo, pluginMetrics);

DataFileCheckpointer checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition);

// Start a data loader thread.
DataFileLoader loader = DataFileLoader.builder()
.s3ObjectReader(fileReader)
Expand All @@ -53,12 +51,6 @@ public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableI
.build();

return loader;

}

// private TableInfo getTableInfo(String tableArn) {
// GlobalState tableState = (GlobalState) coordinator.getPartition(tableArn).get();
// TableInfo tableInfo = new TableInfo(tableArn, TableMetadata.fromMap(tableState.getProgressState().get()));
// return tableInfo;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) {
exportFileSuccessCounter.increment();
// Update global state
updateState(dataFilePartition.getExportArn(), dataFilePartition.getProgressState().get().getLoaded());
// After global state is updated, mask the partition as completed.
coordinator.completePartition(dataFilePartition);

} else {
// Do nothing
// The consumer must already do one last checkpointing and then release the lease.
LOG.debug("Shard consumer completed with exception");
// The data loader must have already done one last checkpointing.
LOG.debug("Data Loader completed with exception");
LOG.error(ex.toString());
// Release the ownership
coordinator.giveUpPartition(dataFilePartition);
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ExportSummary parseSummaryFile(String bucket, String key) {
BufferedReader reader = new BufferedReader(new InputStreamReader(object));
try {
String line = reader.readLine();
System.out.println(line);
LOG.debug("Manifest summary: " + line);
ExportSummary summaryInfo = MAPPER.readValue(line, ExportSummary.class);
return summaryInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ public class ShardConsumer implements Runnable {
// A flag to interrupt the process
private static volatile boolean shouldStop = false;

private static final int GET_RECORD_MAX_ITEM_COUNT = 1000;
private static final int MAX_GET_RECORD_ITEM_COUNT = 1000;

// Maybe a good practice to add Idle Time between Reads
private static final int GET_RECORD_INTERVAL_MILLS = 1000;
// Idle Time between Reads
private static final int GET_RECORD_INTERVAL_MILLS = 200;

private static final int MAX_GET_RECORD_BACKOFF = 3;

private static final int DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS = 60_000;

Expand Down Expand Up @@ -123,11 +125,13 @@ public void run() {
long lastCheckpointTime = System.currentTimeMillis();
String sequenceNumber = "";

int backoff = 0;

while (!shouldStop) {
if (shardIterator == null) {
// End of Shard
LOG.debug("Reach end of shard");
checkpointer.complete(sequenceNumber);
checkpointer.checkpoint(sequenceNumber);
break;
}

Expand All @@ -140,14 +144,13 @@ public void run() {
// Use the shard iterator to read the stream records
GetRecordsRequest req = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(GET_RECORD_MAX_ITEM_COUNT)
.limit(MAX_GET_RECORD_ITEM_COUNT)
.build();


List<Record> records;
GetRecordsResponse response;
try {
// TODO: May need to add an idle time between read.
response = dynamoDbStreamsClient.getRecords(req);
} catch (SdkException e) {
checkpointer.checkpoint(sequenceNumber);
Expand All @@ -157,35 +160,51 @@ public void run() {
shardIterator = response.nextShardIterator();

if (response.records().isEmpty()) {
continue;
}

// Always use the last sequence number for checkpoint
sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber();
if (backoff < MAX_GET_RECORD_BACKOFF) {
backoff++;
}
} else {

if (waitForExport) {
Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();
if (lastEventTime.compareTo(startTime) <= 0) {
LOG.debug("Get {} events before start time, ignore...", response.records().size());
continue;
if (response.records().size() == MAX_GET_RECORD_ITEM_COUNT) {
// Reduce backoff when reaching to max records per GetRecords call.
backoff--;
}
checkpointer.checkpoint(sequenceNumber);
waitForExport();
waitForExport = false;

records = response.records().stream()
.filter(record -> record.dynamodb().approximateCreationDateTime().compareTo(startTime) > 0)
.collect(Collectors.toList());
} else {
records = response.records();
// Always use the last sequence number for checkpoint
sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber();

if (waitForExport) {
Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();
if (lastEventTime.compareTo(startTime) <= 0) {
LOG.debug("Get {} events before start time, ignore...", response.records().size());
continue;
}
checkpointer.checkpoint(sequenceNumber);
waitForExport();
waitForExport = false;

records = response.records().stream()
.filter(record -> record.dynamodb().approximateCreationDateTime().compareTo(startTime) > 0)
.collect(Collectors.toList());
} else {
records = response.records();
}
recordConverter.writeToBuffer(records);
}
try {
// Idle between get records call.
// Add a backoff on sleep time: 200ms, 400ms, 800ms...
Thread.sleep((long) GET_RECORD_INTERVAL_MILLS << backoff);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
recordConverter.writeToBuffer(records);
}

// interrupted
if (shouldStop) {
// Do last checkpoint and then release
checkpointer.release(sequenceNumber);
// Do last checkpoint and then quit
LOG.error("Should Stop flag is set to True, looks like shutdown has triggered");
checkpointer.checkpoint(sequenceNumber);
throw new RuntimeException("Shard Consumer is interrupted");
}
}
Expand Down
Loading

0 comments on commit 54f059a

Please sign in to comment.