From aefc57b3fa45be30a9d0b55ede3bc118d610d9ab Mon Sep 17 00:00:00 2001
From: Aiden Dai
Date: Tue, 26 Sep 2023 14:33:02 +0800
Subject: [PATCH 1/8] Add DynamoDB source plugin
Signed-off-by: Aiden Dai
---
.../dynamodb-source/README.md | 83 +++++
.../dynamodb-source/build.gradle | 42 +++
.../source/dynamodb/ClientFactory.java | 50 +++
.../source/dynamodb/DynamoDBService.java | 306 +++++++++++++++++
.../source/dynamodb/DynamoDBSource.java | 94 ++++++
.../source/dynamodb/DynamoDBSourceConfig.java | 52 +++
.../AwsAuthenticationConfig.java | 48 +++
.../dynamodb/configuration/ExportConfig.java | 26 ++
.../dynamodb/configuration/StreamConfig.java | 24 ++
.../dynamodb/configuration/TableConfig.java | 37 +++
.../converter/ExportRecordConverter.java | 80 +++++
.../dynamodb/converter/RecordConverter.java | 95 ++++++
.../converter/StreamRecordConverter.java | 80 +++++
.../DefaultEnhancedSourceCoordinator.java | 223 +++++++++++++
.../EnhancedSourceCoordinator.java | 103 ++++++
.../dynamodb/coordination/Partition.java | 22 ++
.../coordination/PartitionFactory.java | 43 +++
.../coordination/SourcePartition.java | 100 ++++++
.../partition/DataFilePartition.java | 71 ++++
.../partition/ExportPartition.java | 70 ++++
.../coordination/partition/GlobalState.java | 59 ++++
.../coordination/partition/InitPartition.java | 46 +++
.../partition/StreamPartition.java | 61 ++++
.../state/DataFileProgressState.java | 34 ++
.../state/EmptyProgressState.java | 9 +
.../state/ExportProgressState.java | 67 ++++
.../state/StreamProgressState.java | 45 +++
.../dynamodb/export/DataFileCheckpointer.java | 78 +++++
.../dynamodb/export/DataFileLoader.java | 182 ++++++++++
.../export/DataFileLoaderFactory.java | 56 ++++
.../dynamodb/export/DataFileScheduler.java | 185 +++++++++++
.../dynamodb/export/ExportScheduler.java | 265 +++++++++++++++
.../dynamodb/export/ExportTaskManager.java | 88 +++++
.../dynamodb/export/ManifestFileReader.java | 83 +++++
.../dynamodb/export/S3ObjectReader.java | 41 +++
.../source/dynamodb/model/ExportSummary.java | 123 +++++++
.../source/dynamodb/model/LoadStatus.java | 81 +++++
.../source/dynamodb/model/TableInfo.java | 27 ++
.../source/dynamodb/model/TableMetadata.java | 174 ++++++++++
.../source/dynamodb/stream/ShardConsumer.java | 220 ++++++++++++
.../dynamodb/stream/ShardConsumerFactory.java | 98 ++++++
.../source/dynamodb/stream/ShardManager.java | 158 +++++++++
.../dynamodb/stream/StreamCheckpointer.java | 94 ++++++
.../dynamodb/stream/StreamScheduler.java | 125 +++++++
.../source/dynamodb/DynamoDBServiceTest.java | 312 ++++++++++++++++++
.../dynamodb/DynamoDBSourceConfigTest.java | 87 +++++
.../source/dynamodb/DynamoDBSourceTest.java | 109 ++++++
.../converter/ExportRecordConverterTest.java | 112 +++++++
.../converter/StreamRecordConverterTest.java | 146 ++++++++
.../coordination/DefaultCoordinatorTest.java | 231 +++++++++++++
.../coordination/PartitionFactoryTest.java | 180 ++++++++++
.../export/DataFileLoaderFactoryTest.java | 94 ++++++
.../dynamodb/export/DataFileLoaderTest.java | 143 ++++++++
.../export/DataFileSchedulerTest.java | 144 ++++++++
.../dynamodb/export/ExportSchedulerTest.java | 171 ++++++++++
.../export/ManifestFileReaderTest.java | 110 ++++++
.../stream/ShardConsumerFactoryTest.java | 97 ++++++
.../dynamodb/stream/ShardConsumerTest.java | 154 +++++++++
.../dynamodb/stream/ShardManagerTest.java | 150 +++++++++
.../dynamodb/stream/StreamSchedulerTest.java | 110 ++++++
settings.gradle | 2 +-
61 files changed, 6399 insertions(+), 1 deletion(-)
create mode 100644 data-prepper-plugins/dynamodb-source/README.md
create mode 100644 data-prepper-plugins/dynamodb-source/build.gradle
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/AwsAuthenticationConfig.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultEnhancedSourceCoordinator.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/EnhancedSourceCoordinator.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/Partition.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactory.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/SourcePartition.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/ExportPartition.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/GlobalState.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/InitPartition.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/DataFileProgressState.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/EmptyProgressState.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/ExportProgressState.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/StreamProgressState.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReader.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/S3ObjectReader.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ExportSummary.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/LoadStatus.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/TableInfo.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/TableMetadata.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfigTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultCoordinatorTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactoryTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReaderTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManagerTest.java
create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java
diff --git a/data-prepper-plugins/dynamodb-source/README.md b/data-prepper-plugins/dynamodb-source/README.md
new file mode 100644
index 0000000000..1219b9abe5
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/README.md
@@ -0,0 +1,83 @@
+# DynamoDB Source
+
+This is a source plugin that supports retrieve data from DynamoDB tables. Basic use case of this source plugin is to
+sync the data from DynamoDB tables to OpenSearch indexes. With this CDC support, customer can run the end to end data
+sync pipeline and capture changed data in near real-time without writing any codes and without any downtime of business.
+Such pipeline can run on multiple nodes in parallel to support data capture of large scale tables.
+
+This plugin can support below three different modes:
+
+1. Full load only: One time full data export and load
+2. CDC Only: DynamoDB Stream
+3. Full load + CDC: One time full export and load + DynamoDB Stream.
+
+## Usages
+
+To get started with this DynamoDB source, create the following source configuration:
+
+```yaml
+source:
+ dynamodb:
+ tables:
+ - table_arn: "arn:aws:dynamodb:us-west-2:123456789012:table/my-table"
+ stream:
+ start_position:
+ export:
+ s3_bucket: "my-bucket"
+ s3_prefix: "export/"
+ aws:
+ region: "us-west-2"
+ sts_role_arn: "arn:aws:iam::123456789012:role/DataPrepperRole"
+
+ coordinator:
+ dynamodb:
+ table_name: "coordinator-demo"
+ region: "us-west-2"
+
+
+```
+
+## Configurations
+
+### Shared Configurations:
+
+* coordinator (Required): Coordination store setting. This design create a custom coordinator based on existing
+ coordination store implementation. Only DynamoDB is tested so far.
+* aws (Required): High level AWS Auth. Note Data Prepper will use the same AWS auth to access all tables, check
+ Security for more details.
+ * region
+ * sts_role_arn
+
+### Export Configurations:
+
+* s3_bucket (Required): The destination bucket to store the exported data files
+* s3_prefix (Optional): Custom prefix.
+
+### Stream Configurations
+
+* start_position (Optional): start position of the stream, can be either BEGINNING or LATEST. If export is required,
+ this value will be ignored and set to LATEST by default. This is useful if customer don’t want to run initial export,
+ so they can
+ choose either from the beginning of the stream (up to 24 hours) or from latest (from the time point when pipeline is
+ started)
+
+## Metrics
+
+### Counter
+
+- `exportJobsSuccess`: measures total number of export jobs run with status completed.
+- `exportJobsErrors`: measures total number of export jobs cannot be submitted or run with status failed.
+- `exportFilesTotal`: measures total number of export files generated.
+- `exportFilesSuccess`: measures total number of export files read (till the last line) successfully.
+- `exportRecordsTotal`: measures total number of export records generated
+- `exportRecordsSuccess`: measures total number of export records processed successfully .
+- `exportRecordsErrors`: measures total number of export records processed failed
+- `changeEventsSucceeded`: measures total number of changed events in total processed successfully
+- `changeEventsFailed`: measures total number of changed events in total processed failed
+
+## Developer Guide
+
+This plugin is compatible with Java 17. See
+
+- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
+- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle
new file mode 100644
index 0000000000..b93a79262f
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/build.gradle
@@ -0,0 +1,42 @@
+plugins {
+ id 'java'
+}
+
+group = 'org.opensearch.dataprepper.plugins.source'
+version = '2.4.0-SNAPSHOT'
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ implementation project(path: ':data-prepper-api')
+ implementation project(path: ':data-prepper-core')
+ implementation project(path: ':data-prepper-plugins:dynamodb-source-coordination-store')
+
+ implementation libs.armeria.core
+ implementation 'io.micrometer:micrometer-core'
+
+ implementation 'software.amazon.awssdk:sts'
+ implementation 'software.amazon.awssdk:arns'
+ implementation 'software.amazon.awssdk:dynamodb'
+ implementation 'software.amazon.awssdk:dynamodb-enhanced'
+ implementation 'software.amazon.awssdk:s3'
+ implementation 'software.amazon.awssdk:netty-nio-client'
+
+ implementation 'com.fasterxml.jackson.core:jackson-core'
+ implementation 'com.fasterxml.jackson.core:jackson-databind'
+ // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-ion
+ implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
+
+ implementation project(path: ':data-prepper-plugins:aws-plugin-api')
+
+
+ testImplementation platform('org.junit:junit-bom:5.9.1')
+ testImplementation 'org.junit.jupiter:junit-jupiter'
+ testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
+}
+
+test {
+ useJUnitPlatform()
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java
new file mode 100644
index 0000000000..867903e8e9
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb;
+
+import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+public class ClientFactory {
+
+ private final AwsCredentialsProvider awsCredentialsProvider;
+
+ public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthenticationConfig awsAuthenticationConfig) {
+ awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
+ .withRegion(awsAuthenticationConfig.getAwsRegion())
+ .withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
+ .withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
+ .withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
+ .build());
+ }
+
+
+ public DynamoDbStreamsClient buildDynamoDbStreamClient() {
+ return DynamoDbStreamsClient.builder()
+ .credentialsProvider(awsCredentialsProvider)
+ .build();
+ }
+
+
+ public DynamoDbClient buildDynamoDBClient() {
+ return DynamoDbClient.builder()
+ .credentialsProvider(awsCredentialsProvider)
+ .build();
+ }
+
+
+ public S3Client buildS3Client() {
+ return S3Client.builder()
+ .credentialsProvider(awsCredentialsProvider)
+ .build();
+ }
+
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java
new file mode 100644
index 0000000000..8723a165a9
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb;
+
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.model.buffer.Buffer;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
+import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.InitPartition;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
+import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoaderFactory;
+import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler;
+import org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler;
+import org.opensearch.dataprepper.plugins.source.dynamodb.export.ManifestFileReader;
+import org.opensearch.dataprepper.plugins.source.dynamodb.export.S3ObjectReader;
+import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
+import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
+import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory;
+import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardManager;
+import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.arns.Arn;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeContinuousBackupsRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeContinuousBackupsResponse;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+public class DynamoDBService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DynamoDBService.class);
+
+ private final List tableConfigs;
+
+ private final EnhancedSourceCoordinator coordinator;
+
+ private final DynamoDbClient dynamoDbClient;
+ //
+ private final DynamoDbStreamsClient dynamoDbStreamsClient;
+
+ private final S3Client s3Client;
+
+ private final ShardManager shardManager;
+
+ private final ExecutorService executor;
+
+ private final PluginMetrics pluginMetrics;
+
+
+ public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics) {
+ this.coordinator = coordinator;
+ this.pluginMetrics = pluginMetrics;
+
+ // Initialize AWS clients
+ dynamoDbClient = clientFactory.buildDynamoDBClient();
+ dynamoDbStreamsClient = clientFactory.buildDynamoDbStreamClient();
+ s3Client = clientFactory.buildS3Client();
+
+ // A shard manager is responsible to retrieve the shard information from streams.
+ shardManager = new ShardManager(dynamoDbStreamsClient);
+ tableConfigs = sourceConfig.getTableConfigs();
+ executor = Executors.newFixedThreadPool(3);
+ }
+
+ /**
+ * This service start three long-running threads (scheduler)
+ * Each thread is responsible for one type of job.
+ * The data will be guaranteed to be sent to {@link Buffer} in order.
+ *
+ * @param buffer Data Prepper Buffer
+ */
+ public void start(Buffer> buffer) {
+
+ LOG.info("Start running DynamoDB service");
+ ManifestFileReader manifestFileReader = new ManifestFileReader(new S3ObjectReader(s3Client));
+ Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics);
+
+ DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
+ Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics);
+
+ ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer);
+ Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager);
+
+ // May consider start or shutdown the scheduler on demand
+ // Currently, event after the exports are done, the related scheduler will not be shutdown
+ // This is because in the future we may support incremental exports.
+ executor.submit(exportScheduler);
+ executor.submit(fileLoaderScheduler);
+ executor.submit(streamScheduler);
+
+ }
+
+ /**
+ * Interrupt the running of schedulers.
+ * Each scheduler must implement logic for gracefully shutdown.
+ */
+ public void shutdown() {
+ LOG.info("shutdown DynamoDB schedulers");
+ executor.shutdownNow();
+ }
+
+ /**
+ * Perform initialization of the service from pipeline configuration
+ * The initialization is currently performed once per pipeline.
+ * Hence, the configuration change after first initialization process will be ignored.
+ * This is controlled by a lease in the coordination table.
+ * Future optimization can be done to accept configuration changes
+ */
+ public void init() {
+ LOG.info("Start initialize DynamoDB service");
+
+ final Optional initPartition = coordinator.acquireAvailablePartition(InitPartition.PARTITION_TYPE);
+ if (initPartition.isEmpty()) {
+ // Already initialized. Do nothing.
+ return;
+ }
+
+ LOG.info("Start validating table configurations");
+ List tableInfos;
+ try {
+ tableInfos = tableConfigs.stream().map(this::getTableInfo).collect(Collectors.toList());
+ } catch (Exception e) {
+ coordinator.giveUpPartition(initPartition.get());
+ throw e;
+ }
+
+ tableInfos.forEach(tableInfo -> {
+ // Create a Global state in the coordination table for the configuration.
+ // Global State here is designed to be able to read whenever needed
+ // So that the jobs can refer to the configuration.
+ coordinator.createPartition(new GlobalState(tableInfo.getTableArn(), Optional.of(tableInfo.getMetadata().toMap())));
+
+ Instant startTime = Instant.now();
+
+ if (tableInfo.getMetadata().isExportRequired()) {
+// exportTime = Instant.now();
+ createExportPartition(tableInfo.getTableArn(), startTime, tableInfo.getMetadata().getExportBucket(), tableInfo.getMetadata().getExportPrefix());
+ }
+
+ if (tableInfo.getMetadata().isStreamRequired()) {
+ List shardIds;
+ // start position by default is beginning if not provided.
+ if (tableInfo.getMetadata().isExportRequired() || "LATEST".equals(tableInfo.getMetadata().getStreamStartPosition())) {
+ // For a continued data extraction process that involves both export and stream
+ // The export must be completed and loaded before stream can start.
+ // Moreover, there should not be any gaps between the export time and the time start reading the stream
+ // The design here is to start reading from the beginning of current active shards
+ // and then check if the change event datetime is greater than the export time.
+ shardIds = shardManager.getActiveShards(tableInfo.getMetadata().getStreamArn());
+ shardIds.forEach(shardId -> {
+ createStreamPartition(tableInfo.getMetadata().getStreamArn(), shardId, startTime, tableInfo.getMetadata().isExportRequired());
+ });
+ } else {
+ shardIds = shardManager.getRootShardIds(tableInfo.getMetadata().getStreamArn());
+ shardIds.forEach(shardId -> {
+ createStreamPartition(tableInfo.getMetadata().getStreamArn(), shardId, null, false);
+ });
+ }
+ }
+ });
+ // Mark initialization as done, so that it won't be triggered again.
+ coordinator.completePartition(initPartition.get());
+ }
+
+
+ /**
+ * Create a partition for an export job in the coordination table. The bucket and prefix will be stored in the progress state.
+ * This is to support that different tables can use different destinations.
+ *
+ * @param tableArn Table Arn
+ * @param exportTime Export Time
+ * @param bucket Export bucket
+ * @param prefix Export Prefix
+ */
+ private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix) {
+ ExportProgressState exportProgressState = new ExportProgressState();
+ exportProgressState.setBucket(bucket);
+ exportProgressState.setPrefix(prefix);
+ exportProgressState.setExportTime(exportTime.toString()); // information purpose
+ ExportPartition exportPartition = new ExportPartition(tableArn, exportTime, Optional.of(exportProgressState));
+ coordinator.createPartition(exportPartition);
+ }
+
+
+ /**
+ * Create a partition for a stream job in the coordination table.
+ *
+ * @param streamArn Stream Arn
+ * @param shardId Shard Id
+ * @param exportTime the start time for change events, any change events with creation datetime before this should be ignored.
+ */
+ private void createStreamPartition(String streamArn, String shardId, Instant exportTime, boolean waitForExport) {
+ StreamProgressState streamProgressState = new StreamProgressState();
+ streamProgressState.setWaitForExport(waitForExport);
+ if (exportTime != null) {
+ streamProgressState.setStartTime(exportTime.toEpochMilli());
+ }
+ coordinator.createPartition(new StreamPartition(streamArn, shardId, Optional.of(streamProgressState)));
+ }
+
+ private String getContinuousBackupsStatus(String tableName) {
+ // Validate Point in time recovery is enabled or not
+ DescribeContinuousBackupsRequest req = DescribeContinuousBackupsRequest.builder()
+ .tableName(tableName)
+ .build();
+ DescribeContinuousBackupsResponse resp = dynamoDbClient.describeContinuousBackups(req);
+ return resp.continuousBackupsDescription().pointInTimeRecoveryDescription().pointInTimeRecoveryStatus().toString();
+ }
+
+ private String getTableName(String tableArn) {
+ Arn arn = Arn.fromString(tableArn);
+ // resourceAsString is table/xxx
+ return arn.resourceAsString().substring(6);
+ }
+
+ /**
+ * Conduct Metadata info for table and also perform validation on configuration.
+ * Once created, the info should not be changed.
+ */
+ private TableInfo getTableInfo(TableConfig tableConfig) {
+ String tableName = getTableName(tableConfig.getTableArn());
+
+ // Need to call describe table to get the Key schema for table
+ // The key schema will be used when adding the metadata to event.
+ DescribeTableRequest req = DescribeTableRequest.builder()
+ .tableName(tableName)
+ .build();
+ DescribeTableResponse describeTableResult = dynamoDbClient.describeTable(req);
+ Map keys = describeTableResult.table().keySchema().stream().collect(Collectors.toMap(
+ e -> e.keyTypeAsString(), e -> e.attributeName()
+ ));
+
+ // Validate if PITR is turn on or not for exports.
+ if (tableConfig.getExportConfig() != null) {
+ String status = getContinuousBackupsStatus(tableName);
+ LOG.debug("The PITR status for table " + tableName + " is " + status);
+ if (!"ENABLED".equals(status)) {
+ String errorMessage = "Point-in-time recovery (PITR) needs to be enabled for exporting data from table " + tableConfig.getTableArn();
+ LOG.error(errorMessage);
+ throw new InvalidPluginConfigurationException(errorMessage);
+ }
+ }
+
+ StreamConfig.StartPosition streamStartPosition = null;
+
+ if (tableConfig.getStreamConfig() != null) {
+ // Validate if DynamoDB Stream is turn on or not
+ if (describeTableResult.table().streamSpecification() == null) {
+ String errorMessage = "Steam is not enabled for table " + tableConfig.getTableArn();
+ LOG.error(errorMessage);
+ throw new InvalidPluginConfigurationException(errorMessage);
+ }
+ // Validate view type of DynamoDB stream
+ if (describeTableResult.table().streamSpecification() != null) {
+ String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString();
+ LOG.debug("The stream view type for table " + tableName + " is " + viewType);
+ List supportedType = List.of("NEW_IMAGE", "NEW_AND_OLD_IMAGES");
+ if (!supportedType.contains(viewType)) {
+ String errorMessage = "Stream " + tableConfig.getTableArn() + " is enabled with " + viewType + ". Supported types are " + supportedType;
+ LOG.error(errorMessage);
+ throw new InvalidPluginConfigurationException(errorMessage);
+ }
+ }
+ streamStartPosition = tableConfig.getStreamConfig().getStartPosition();
+ }
+
+ // Conduct metadata info
+ // May consider to remove export bucket and prefix
+ TableMetadata metadata = TableMetadata.builder()
+ .partitionKeyAttributeName(keys.get("HASH"))
+ .sortKeyAttributeName(keys.get("RANGE"))
+ .streamArn(describeTableResult.table().latestStreamArn())
+ .streamRequired(tableConfig.getStreamConfig() != null)
+ .exportRequired(tableConfig.getExportConfig() != null)
+ .streamStartPosition(streamStartPosition == null ? null : streamStartPosition.name())
+ .exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket())
+ .exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix())
+ .build();
+ return new TableInfo(tableConfig.getTableArn(), metadata);
+ }
+
+
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java
new file mode 100644
index 0000000000..06e5ce23ae
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb;
+
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
+import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
+import org.opensearch.dataprepper.model.buffer.Buffer;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.source.Source;
+import org.opensearch.dataprepper.model.source.SourceCoordinationStore;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.DefaultEnhancedSourceCoordinator;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator;
+import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.PartitionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+@DataPrepperPlugin(name = "dynamodb", pluginType = Source.class, pluginConfigurationType = DynamoDBSourceConfig.class)
+public class DynamoDBSource implements Source> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DynamoDBSource.class);
+
+ private static final String SOURCE_COORDINATOR_METRIC_PREFIX = "source-coordinator";
+
+ private final PluginMetrics pluginMetrics;
+
+ private final DynamoDBSourceConfig sourceConfig;
+
+ private final PluginFactory pluginFactory;
+
+ private final SourceCoordinationStore coordinationStore;
+
+ private final EnhancedSourceCoordinator coordinator;
+
+ private final DynamoDBService dynamoDBService;
+
+
+ @DataPrepperPluginConstructor
+ public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting, final AwsCredentialsSupplier awsCredentialsSupplier) {
+ LOG.info("Create DynamoDB Source");
+ this.pluginMetrics = pluginMetrics;
+ this.sourceConfig = sourceConfig;
+ this.pluginFactory = pluginFactory;
+
+
+ // Load Coordination Store vai PluginFactory
+ PluginSetting sourceCoordinationStoreSetting = new PluginSetting(sourceConfig.getCoordinationStoreConfig().getPluginName(), sourceConfig.getCoordinationStoreConfig().getPluginSettings());
+ sourceCoordinationStoreSetting.setPipelineName(SOURCE_COORDINATOR_METRIC_PREFIX);
+ coordinationStore = pluginFactory.loadPlugin(SourceCoordinationStore.class, sourceCoordinationStoreSetting);
+ String pipelineName = pluginSetting.getPipelineName();
+
+ // Create and initialize coordinator
+ coordinator = new DefaultEnhancedSourceCoordinator(coordinationStore, pipelineName, new PartitionFactory());
+ coordinator.initialize();
+
+ ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
+
+ // Create DynamoDB Service
+ dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics);
+ dynamoDBService.init();
+ }
+
+ @Override
+ public void start(Buffer> buffer) {
+ LOG.info("Start Processing");
+ dynamoDBService.start(buffer);
+ }
+
+
+ @Override
+ public void stop() {
+ LOG.info("Stop DynamoDB Source");
+ if (Objects.nonNull(dynamoDBService)) {
+ dynamoDBService.shutdown();
+ }
+
+ }
+
+
+ @Override
+ public boolean areAcknowledgementsEnabled() {
+ return Source.super.areAcknowledgementsEnabled();
+ }
+
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java
new file mode 100644
index 0000000000..379f837767
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import org.opensearch.dataprepper.model.configuration.PluginModel;
+import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
+import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
+
+import java.util.List;
+
+/**
+ * Configuration for DynamoDB Source
+ */
+public class DynamoDBSourceConfig {
+
+ @JsonProperty("tables")
+ private List tableConfigs;
+
+
+ @JsonProperty("aws")
+ @NotNull
+ @Valid
+ private AwsAuthenticationConfig awsAuthenticationConfig;
+
+
+ @JsonProperty("coordinator")
+ private PluginModel coordinationStoreConfig;
+
+
+ public DynamoDBSourceConfig() {
+ }
+
+
+ public List getTableConfigs() {
+ return tableConfigs;
+ }
+
+ public AwsAuthenticationConfig getAwsAuthenticationConfig() {
+ return awsAuthenticationConfig;
+ }
+
+ public PluginModel getCoordinationStoreConfig() {
+ return coordinationStoreConfig;
+ }
+
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/AwsAuthenticationConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/AwsAuthenticationConfig.java
new file mode 100644
index 0000000000..124806e0c7
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/AwsAuthenticationConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.Size;
+import software.amazon.awssdk.regions.Region;
+
+import java.util.Map;
+
+public class AwsAuthenticationConfig {
+
+ @JsonProperty("region")
+ @Size(min = 1, message = "Region cannot be empty string")
+ private String awsRegion;
+
+ @JsonProperty("sts_role_arn")
+ @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
+ private String awsStsRoleArn;
+
+ @JsonProperty("sts_external_id")
+ @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
+ private String awsStsExternalId;
+
+ @JsonProperty("sts_header_overrides")
+ @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
+ private Map awsStsHeaderOverrides;
+
+ public String getAwsStsRoleArn() {
+ return awsStsRoleArn;
+ }
+
+ public String getAwsStsExternalId() {
+ return awsStsExternalId;
+ }
+
+ public Region getAwsRegion() {
+ return awsRegion != null ? Region.of(awsRegion) : null;
+ }
+
+ public Map getAwsStsHeaderOverrides() {
+ return awsStsHeaderOverrides;
+ }
+}
+
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java
new file mode 100644
index 0000000000..cb3463a3b6
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.NotBlank;
+
+public class ExportConfig {
+
+ @JsonProperty("s3_bucket")
+ @NotBlank(message = "Bucket Name is required for export")
+ private String s3Bucket;
+ @JsonProperty("s3_prefix")
+ private String s3Prefix;
+
+ public String getS3Bucket() {
+ return s3Bucket;
+ }
+
+ public String getS3Prefix() {
+ return s3Prefix;
+ }
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java
new file mode 100644
index 0000000000..c1a698c366
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class StreamConfig {
+
+ public enum StartPosition {
+ LATEST,
+ BEGINNING
+ }
+
+ @JsonProperty(value = "start_position")
+ private StartPosition startPosition;
+
+ public StartPosition getStartPosition() {
+ return startPosition;
+ }
+
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java
new file mode 100644
index 0000000000..d7c90ea1dd
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.NotEmpty;
+import jakarta.validation.constraints.NotNull;
+
+public class TableConfig {
+
+ @JsonProperty("table_arn")
+ @NotNull
+ @NotEmpty(message = "DynamoDB Table ARN cannot be null or empty")
+ private String tableArn;
+
+ @JsonProperty("export")
+ private ExportConfig exportConfig;
+
+ @JsonProperty(value = "stream")
+ private StreamConfig streamConfig;
+
+
+ public String getTableArn() {
+ return tableArn;
+ }
+
+ public ExportConfig getExportConfig() {
+ return exportConfig;
+ }
+
+ public StreamConfig getStreamConfig() {
+ return streamConfig;
+ }
+}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java
new file mode 100644
index 0000000000..b53d5df2b6
--- /dev/null
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source.dynamodb.converter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
+import io.micrometer.core.instrument.Counter;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.model.buffer.Buffer;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ExportRecordConverter extends RecordConverter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExportRecordConverter.class);
+
+ private static final String ITEM_KEY = "Item";
+
+ static final String EXPORT_RECORD_SUCCESS_COUNT = "exportRecordSuccess";
+ static final String EXPORT_RECORD_ERROR_COUNT = "exportRecordErrors";
+
+
+ IonObjectMapper MAPPER = new IonObjectMapper();
+
+ private final PluginMetrics pluginMetrics;
+
+ private final Counter exportRecordSuccessCounter;
+ private final Counter exportRecordErrorCounter;
+
+ public ExportRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) {
+ super(buffer, tableInfo);
+ this.pluginMetrics = pluginMetrics;
+ this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORD_SUCCESS_COUNT);
+ this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORD_ERROR_COUNT);
+
+ }
+
+ private Map convertToMap(String jsonData) {
+ try {
+ return MAPPER.readValue(jsonData, Map.class);
+ } catch (JsonProcessingException e) {
+ return null;
+ }
+ }
+
+
+ @Override
+ String getEventType() {
+ return "EXPORT";
+ }
+
+ public void writeToBuffer(List lines) {
+ List
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class ExportSummary {
@JsonProperty("version")
private String version;
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
index f9fa6b355f..fd9855547b 100644
--- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
@@ -26,16 +26,29 @@ public class ShardConsumer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
- // A flag to interrupt the process
+ /**
+ * A flag to interrupt the process
+ */
private static volatile boolean shouldStop = false;
+ /**
+ * Max number of items to return per GetRecords call, maximum 1000.
+ */
private static final int MAX_GET_RECORD_ITEM_COUNT = 1000;
- // Idle Time between Reads
+ /**
+ * Idle Time between GetRecords Reads
+ */
private static final int GET_RECORD_INTERVAL_MILLS = 200;
+ /**
+ * Default interval to check if export is completed.
+ */
private static final int DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS = 60_000;
+ /**
+ * Default regular checkpoint interval
+ */
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000;
private final DynamoDbStreamsClient dynamoDbStreamsClient;
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java
index e69695b0a6..6e141c6e27 100644
--- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java
@@ -29,6 +29,8 @@
public class ShardConsumerFactory {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
+ private static final int STREAM_TO_TABLE_OFFSET = "stream/".length();
+
private final DynamoDbStreamsClient streamsClient;
private final EnhancedSourceCoordinator enhancedSourceCoordinator;
@@ -93,6 +95,6 @@ private TableInfo getTableInfo(String tableArn) {
private String getTableArn(String streamArn) {
// e.g. Given a stream arn: arn:aws:dynamodb:us-west-2:xxx:table/test-table/stream/2023-07-31T04:59:58.190
// Returns arn:aws:dynamodb:us-west-2:xxx:table/test-table
- return streamArn.substring(0, streamArn.lastIndexOf('/') - 7);
+ return streamArn.substring(0, streamArn.lastIndexOf('/') - STREAM_TO_TABLE_OFFSET);
}
}
diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
index 4fa76da968..cd7775c5ba 100644
--- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
+++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
@@ -93,7 +93,7 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) {
LOG.debug("Start creating new stream partitions for Child Shards");
List childShardIds = shardManager.getChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId());
- LOG.debug("Child Ids Retrieved: " + childShardIds);
+ LOG.debug("Child Ids Retrieved: {}", childShardIds);
createStreamPartitions(streamPartition.getStreamArn(), childShardIds);
LOG.debug("Create child shard completed");