Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba committed Oct 3, 2023
1 parent 7d513f8 commit c12c051
Show file tree
Hide file tree
Showing 16 changed files with 101 additions and 81 deletions.
2 changes: 0 additions & 2 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ plugins {
id 'java'
}

group = 'org.opensearch.dataprepper.plugins.source'
version = '2.4.0-SNAPSHOT'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig so

@Override
public void start(Buffer<Record<Event>> buffer) {
LOG.info("Start Processing");
LOG.info("Start DynamoDB service");
dynamoDBService.start(buffer);
}

Expand All @@ -86,10 +86,4 @@ public void stop() {

}


@Override
public boolean areAcknowledgementsEnabled() {
return Source.super.areAcknowledgementsEnabled();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,12 @@ public RecordConverter(Buffer<Record<Event>> buffer, TableInfo tableInfo) {
* Using partition key plus sort key (if any)
*/
String getId(Map<String, Object> data) {
String result;
String partitionKey = String.valueOf(data.get(tableInfo.getMetadata().getPartitionKeyAttributeName()));
if (tableInfo.getMetadata().getSortKeyAttributeName() == null) {
result = partitionKey;
} else {
String sortKey = String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName()));
return partitionKey + "_" + sortKey;
return partitionKey;
}
return result.replaceAll("\\s", "_");
String sortKey = String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName()));
return partitionKey + "_" + sortKey;
}

void writeEventsToBuffer(List<Record<Event>> events) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ public class DefaultEnhancedSourceCoordinator implements EnhancedSourceCoordinat

private static final Logger LOG = LoggerFactory.getLogger(DefaultEnhancedSourceCoordinator.class);

//Default time out duration for lease.
/**
* Default time out duration for lease.
*/
private static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10);

// Default identifier For global state
/**
* Default identifier For global state
*/
private static final String DEFAULT_GLOBAL_STATE_PARTITION_TYPE = "GLOBAL";

/**
* A backend coordination store
*/
private final SourceCoordinationStore coordinationStore;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,34 +51,20 @@ protected void setSourcePartitionStoreItem(SourcePartitionStoreItem sourcePartit
* Helper method to convert progress state.
* This is because the state is currently stored as a String in the coordination store.
*/
protected Optional<T> convertStringToPartitionProgressState(Class<T> T, final String serializedPartitionProgressState) {
protected T convertStringToPartitionProgressState(Class<T> progressStateClass, final String serializedPartitionProgressState) {
if (Objects.isNull(serializedPartitionProgressState)) {
return Optional.empty();
}

try {
return Optional.of(MAPPER.readValue(serializedPartitionProgressState, T));
} catch (final JsonProcessingException e) {
LOG.error("Unable to convert string to partition progress state class {}: {}", T.getName(), e);
return Optional.empty();
}
}

/**
* Helper method to convert progress state to map (for global state)
* This is because the state is currently stored as a String in the coordination store.
*/
protected Optional<T> convertStringToMap(final String serializedPartitionProgressState) {
if (Objects.isNull(serializedPartitionProgressState)) {
return Optional.empty();
return null;
}

try {
return Optional.of(MAPPER.readValue(serializedPartitionProgressState, new TypeReference<>() {
}));
if (progressStateClass != null) {
return MAPPER.readValue(serializedPartitionProgressState, progressStateClass);
}
return MAPPER.readValue(serializedPartitionProgressState, new TypeReference<>() {
});
} catch (final JsonProcessingException e) {
LOG.error("Unable to convert string to map: {}", e);
return Optional.empty();
LOG.error("Unable to convert string to partition progress state class {}: {}", progressStateClass.getName(), e);
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DataFilePartition extends SourcePartition<DataFileProgressState> {
private final String bucket;
private final String key;

private final Optional<DataFileProgressState> state;
private final DataFileProgressState state;

public DataFilePartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
Expand All @@ -39,7 +39,7 @@ public DataFilePartition(String exportArn, String bucket, String key, Optional<D
this.exportArn = exportArn;
this.bucket = bucket;
this.key = key;
this.state = state;
this.state = state.orElse(null);
}

@Override
Expand All @@ -54,7 +54,10 @@ public String getPartitionKey() {

@Override
public Optional<DataFileProgressState> getProgressState() {
return state;
if (state != null) {
return Optional.of(state);
}
return Optional.empty();
}

public String getExportArn() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ExportPartition extends SourcePartition<ExportProgressState> {

private final Instant exportTime;

private final Optional<ExportProgressState> state;
private final ExportProgressState state;

public ExportPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
Expand All @@ -39,18 +39,10 @@ public ExportPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
public ExportPartition(String tableArn, Instant exportTime, Optional<ExportProgressState> state) {
this.tableArn = tableArn;
this.exportTime = exportTime;
this.state = state;
this.state = state.orElse(null);

}

public String getTableArn() {
return tableArn;
}

public Instant getExportTime() {
return exportTime;
}


@Override
public String getPartitionType() {
return PARTITION_TYPE;
Expand All @@ -63,7 +55,19 @@ public String getPartitionKey() {

@Override
public Optional<ExportProgressState> getProgressState() {
return state;
if (state != null) {
return Optional.of(state);
}
return Optional.empty();
}


public String getTableArn() {
return tableArn;
}

public Instant getExportTime() {
return exportTime;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ public class GlobalState extends SourcePartition<Map<String, Object>> {

private final String stateName;

private Optional<Map<String, Object>> state;
private Map<String, Object> state;

public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
this.stateName = sourcePartitionStoreItem.getSourcePartitionKey();
this.state = convertStringToMap(sourcePartitionStoreItem.getPartitionProgressState());
this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState());

}

public GlobalState(String stateName, Optional<Map<String, Object>> state) {
this.stateName = stateName;
this.state = state;
this.state = state.orElse(null);

}

Expand All @@ -48,11 +48,14 @@ public String getPartitionKey() {

@Override
public Optional<Map<String, Object>> getProgressState() {
return state;
if (state != null) {
return Optional.of(state);
}
return Optional.empty();
}

public void setProgressState(Map<String, Object> state) {
this.state = Optional.of(state);
this.state = state;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public class StreamPartition extends SourcePartition<StreamProgressState> {

private final String shardId;

private final Optional<StreamProgressState> state;
private final StreamProgressState state;

public StreamPartition(String streamArn, String shardId, Optional<StreamProgressState> state) {
this.streamArn = streamArn;
this.shardId = shardId;
this.state = state;
this.state = state.orElse(null);
}

public StreamPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
Expand All @@ -48,7 +48,10 @@ public String getPartitionKey() {

@Override
public Optional<StreamProgressState> getProgressState() {
return state;
if (state != null) {
return Optional.of(state);
}
return Optional.empty();
}

public String getStreamArn() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ public class DataFileLoader implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);

// A flag to interrupt the process
/**
* A flag to interrupt the process
*/
private static volatile boolean shouldStop = false;

// Number of lines to be read in a batch
/**
* Number of lines to be read in a batch
*/
private static final int DEFAULT_BATCH_SIZE = 1000;

// Default checkpoint interval
/**
* Default regular checkpoint interval
*/
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000;

private final String bucketName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) {
} else {
// The data loader must have already done one last checkpointing.
LOG.debug("Data Loader completed with exception");
LOG.error(ex.toString());
LOG.error("{}", ex);
// Release the ownership
coordinator.giveUpPartition(dataFilePartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,37 @@ public class ManifestFileReader {

private static final Logger LOG = LoggerFactory.getLogger(ManifestFileReader.class);

private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {
};

private static final String DATA_FILE_S3_KEY = "dataFileS3Key";
private static final String DATA_FILE_ITEM_COUNT_KEY = "itemCount";

private static final ObjectMapper MAPPER = new ObjectMapper();

private final S3ObjectReader fileReader;
private final S3ObjectReader objectReader;

public ManifestFileReader(S3ObjectReader fileReader) {
this.fileReader = fileReader;
public ManifestFileReader(S3ObjectReader objectReader) {
this.objectReader = objectReader;
}

public ExportSummary parseSummaryFile(String bucket, String key) {
LOG.debug("Try to read the manifest summary file");
InputStream object = fileReader.readFile(bucket, key);
InputStream object = objectReader.readFile(bucket, key);

BufferedReader reader = new BufferedReader(new InputStreamReader(object));
try {
String line = reader.readLine();
LOG.debug("Manifest summary: " + line);
LOG.debug("Manifest summary: {}", line);
ExportSummary summaryInfo = MAPPER.readValue(line, ExportSummary.class);
return summaryInfo;

} catch (JsonProcessingException e) {
LOG.error("Failed to parse the summary info due to :" + e.getMessage());
LOG.error("Failed to parse the summary info due to {}", e.getMessage());
throw new RuntimeException(e);

} catch (IOException e) {
LOG.error("IO Exception due to :" + e.getMessage());
LOG.error("IO Exception due to {}", e.getMessage());
throw new RuntimeException(e);
}

Expand All @@ -60,16 +63,15 @@ public Map<String, Integer> parseDataFile(String bucket, String key) {
LOG.debug("Try to read the manifest data file");

Map<String, Integer> result = new HashMap<>();
InputStream object = fileReader.readFile(bucket, key);
InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object));

String line;
try {
while ((line = reader.readLine()) != null) {
// An example line as below:
// {"itemCount":46331,"md5Checksum":"a0k21IY3eelgr2PuWJLjJw==","etag":"51f9f394903c5d682321c6211aae8b6a-1","dataFileS3Key":"test-table-export/AWSDynamoDB/01692350182719-6de2c037/data/fpgzwz7ome3s7a5gqn2mu3ogtq.json.gz"}
Map<String, String> map = MAPPER.readValue(line, new TypeReference<>() {
});
Map<String, String> map = MAPPER.readValue(line, MAP_TYPE_REFERENCE);
LOG.debug("Get a file {} with item count {}", map.get(DATA_FILE_S3_KEY), map.get(DATA_FILE_ITEM_COUNT_KEY));
result.put(map.get(DATA_FILE_S3_KEY), Integer.valueOf(map.get(DATA_FILE_ITEM_COUNT_KEY)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
Expand All @@ -14,6 +15,7 @@
* https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.Output.html#S3DataExport.Output_Manifest
* </p>
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ExportSummary {
@JsonProperty("version")
private String version;
Expand Down
Loading

0 comments on commit c12c051

Please sign in to comment.