Skip to content

Commit

Permalink
Minor updates and bug fixes for RDS source (#4887)
Browse files Browse the repository at this point in the history
* Minor updates and bug fixes to prepare for performance testing

Signed-off-by: Hai Yan <[email protected]>

* Address comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Aug 29, 2024
1 parent b5358b5 commit 8eef2f6
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void start(Buffer<Record<Event>> buffer) {

if (sourceConfig.isStreamEnabled()) {
BinaryLogClient binaryLogClient = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata).create();
if (sourceConfig.getTlsConfig() == null || !sourceConfig.getTlsConfig().isInsecure()) {
if (sourceConfig.isTlsEnabled()) {
binaryLogClient.setSSLMode(SSLMode.REQUIRED);
} else {
binaryLogClient.setSSLMode(SSLMode.DISABLED);
Expand Down Expand Up @@ -146,7 +146,7 @@ private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
!sourceConfig.getTlsConfig().isInsecure());
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class RdsSourceConfig {
@JsonProperty("tls")
private TlsConfig tlsConfig;

@JsonProperty("disable_s3_read_for_leader")
private boolean disableS3ReadForLeader = false;

public String getDbIdentifier() {
return dbIdentifier;
}
Expand Down Expand Up @@ -153,6 +156,14 @@ public TlsConfig getTlsConfig() {
return tlsConfig;
}

public boolean isTlsEnabled() {
return tlsConfig == null || !tlsConfig.isInsecure();
}

public boolean isDisableS3ReadForLeader() {
return disableS3ReadForLeader;
}

public AuthenticationConfig getAuthenticationConfig() {
return this.authenticationConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -63,6 +64,11 @@ public Event convert(final Map<String, Object> rowData,

EventMetadata eventMetadata = event.getMetadata();

// Only set external origination time for stream events, not export
final Instant externalOriginationTime = Instant.ofEpochMilli(eventCreateTimeEpochMillis);
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
eventMetadata.setExternalOriginationTime(externalOriginationTime);

eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName);
eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName);
eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public static ExportObjectKey fromString(final String objectKeyString) {
final String prefix = parts[0];
final String exportTaskId = parts[1];
final String databaseName = parts[2];
final String tableName = parts[3];
// fullTableName is in the format of "databaseName.tableName"
final String fullTableName = parts[3];
final String tableName = fullTableName.split("\\.")[1];
final String numberedFolder = parts[4];
final String fileName = parts[5];
return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY;


public class StreamScheduler implements Runnable {

Expand Down Expand Up @@ -57,18 +59,24 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator,
@Override
public void run() {
LOG.debug("Start running Stream Scheduler");
StreamPartition streamPartition = null;
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
try {
final Optional<EnhancedSourcePartition> sourcePartition = sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE);
if (sourcePartition.isPresent()) {
LOG.info("Acquired partition to read from stream");

final StreamPartition streamPartition = (StreamPartition) sourcePartition.get();
if (sourceConfig.isDisableS3ReadForLeader()) {
// Primary node that acquires the stream partition will not perform work on the S3 buffer
System.setProperty(STOP_S3_SCAN_PROCESSING_PROPERTY, "true");
}

streamPartition = (StreamPartition) sourcePartition.get();
final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics);
binaryLogClient.registerEventListener(new BinlogEventListener(
buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager));
final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics);
executorService.submit(() -> streamWorker.processStream(streamPartition));
executorService.submit(() -> streamWorker.processStream((StreamPartition) sourcePartition.get()));
}

try {
Expand All @@ -81,6 +89,13 @@ public void run() {

} catch (Exception e) {
LOG.error("Received an exception during stream processing, backing off and retrying", e);
if (streamPartition != null) {
if (sourceConfig.isDisableS3ReadForLeader()) {
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);
}
sourceCoordinator.giveUpPartition(streamPartition);
}

try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void test_given_export_partition_and_export_task_id_then_complete_export() throw
String tableName = UUID.randomUUID().toString();
// objectKey needs to have this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}"
S3Object s3Object = S3Object.builder()
.key("prefix/" + exportTaskId + "/my_db/" + tableName + "/1/file1" + PARQUET_SUFFIX)
.key("prefix/" + exportTaskId + "/my_db/my_db." + tableName + "/1/file1" + PARQUET_SUFFIX)
.build();
when(listObjectsV2Response.contents()).thenReturn(List.of(s3Object));
when(listObjectsV2Response.isTruncated()).thenReturn(false);
Expand Down Expand Up @@ -185,7 +185,7 @@ void test_given_export_partition_without_export_task_id_then_start_and_complete_
String tableName = UUID.randomUUID().toString();
// objectKey needs to have this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}"
S3Object s3Object = S3Object.builder()
.key("prefix/" + exportTaskId + "/my_db/" + tableName + "/1/file1" + PARQUET_SUFFIX)
.key("prefix/" + exportTaskId + "/my_db/my_db." + tableName + "/1/file1" + PARQUET_SUFFIX)
.build();
when(listObjectsV2Response.contents()).thenReturn(List.of(s3Object));
when(listObjectsV2Response.isTruncated()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ExportObjectKeyTest {

@Test
void test_fromString_with_valid_input_string() {
final String objectKeyString = "prefix/export-task-id/db-name/table-name/1/file-name.parquet";
final String objectKeyString = "prefix/export-task-id/db-name/db-name.table-name/1/file-name.parquet";
final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString);

assertThat(exportObjectKey.getPrefix(), equalTo("prefix"));
Expand Down

0 comments on commit 8eef2f6

Please sign in to comment.