From 2698de880d1d31e02abdfb05d2c3868a321c46c0 Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Tue, 6 Dec 2022 20:55:33 +0530 Subject: [PATCH 1/6] CDAP-20176 : Save offset in state store for atleast once processing --- confluent-kafka-plugins/pom.xml | 65 ++++++--- .../source/ConfluentStreamingSource.java | 103 +++++++++++++- .../source/ConfluentStreamingSourceUtil.java | 129 +++++++++++------- .../confluent/source/ConfluentDStream.scala | 76 +++++++++++ .../sink/ConfluentStreamingSinkTest.java | 2 +- .../batch/source/KafkaPartitionOffsets.java | 4 + pom.xml | 6 +- 7 files changed, 306 insertions(+), 79 deletions(-) create mode 100644 confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala diff --git a/confluent-kafka-plugins/pom.xml b/confluent-kafka-plugins/pom.xml index 8fd5d47..36234d7 100644 --- a/confluent-kafka-plugins/pom.xml +++ b/confluent-kafka-plugins/pom.xml @@ -69,7 +69,7 @@ org.apache.kafka - kafka_2.11 + kafka_2.12 ${kafka10.version} @@ -84,16 +84,16 @@ org.apache.spark - spark-streaming-kafka-0-10_2.11 - ${spark2.version} + spark-streaming-kafka-0-10_2.12 + ${spark3.version} org.apache.kafka - kafka_2.11 + kafka_2.12 org.apache.spark - spark-tags_2.11 + spark-tags_2.12 net.jpountz.lz4 @@ -103,8 +103,8 @@ org.apache.spark - spark-mllib_2.11 - ${spark2.version} + spark-mllib_2.12 + ${spark3.version} provided @@ -115,14 +115,14 @@ org.apache.spark - spark-streaming_2.11 - ${spark2.version} + spark-streaming_2.12 + ${spark3.version} provided org.apache.spark - spark-core_2.11 - ${spark2.version} + spark-core_2.12 + ${spark3.version} provided @@ -173,19 +173,19 @@ io.cdap.cdap - cdap-spark-core2_2.11 + cdap-spark-core3_2.12 ${cdap.version} test io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline3_2.12 ${cdap.version} test io.cdap.cdap - cdap-data-streams2_2.11 + cdap-data-streams3_2.12 ${cdap.version} test @@ -221,19 +221,44 @@ + + net.alchim31.maven + scala-maven-plugin + 3.3.1 + + + compile + + compile + + compile + + + test-compile + + testCompile + + test-compile + + + process-resources + + compile + + + + org.apache.felix maven-bundle-plugin - 3.3.0 + 3.5.1 <_exportcontents> io.cdap.plugin.confluent.*; org.apache.spark.streaming.kafka010.*; - org.apache.kafka.common.*; - org.apache.kafka.common.serialization.*; + org.apache.kafka.*; io.confluent.kafka.serializers.*; - org.apache.kafka.clients.*; *;inline=false;scope=compile true @@ -255,8 +280,8 @@ 1.1.0 - system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT) - system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-pipeline[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-streams[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT) diff --git a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java index 8f4aed6..3b7c0ff 100644 --- a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java +++ b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java @@ -16,6 +16,7 @@ package io.cdap.plugin.confluent.streaming.source; +import com.google.gson.Gson; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; @@ -27,17 +28,36 @@ import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.streaming.StreamingContext; import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.cdap.etl.api.streaming.StreamingStateHandler; +import io.cdap.plugin.batch.source.KafkaPartitionOffsets; import io.cdap.plugin.common.Constants; +import io.cdap.plugin.confluent.source.ConfluentDStream; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.kafka010.OffsetRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Confluent Kafka Streaming source. @@ -45,7 +65,10 @@ @Plugin(type = StreamingSource.PLUGIN_TYPE) @Name(ConfluentStreamingSource.PLUGIN_NAME) @Description("Confluent Kafka streaming source.") -public class ConfluentStreamingSource extends StreamingSource { +public class ConfluentStreamingSource extends StreamingSource implements StreamingStateHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ConfluentStreamingSource.class); + private static final Gson gson = new Gson(); public static final String PLUGIN_NAME = "Confluent"; private final ConfluentStreamingSourceConfig conf; @@ -79,7 +102,32 @@ public JavaDStream getStream(StreamingContext context) throws collector.getOrThrowException(); context.registerLineage(conf.referenceName); - return ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(context, conf, outputSchema, collector); + JavaInputDStream> javaInputDStream = ConfluentStreamingSourceUtil + .getConsumerRecordJavaDStream(context, conf, outputSchema, collector, getStateSupplier(context)); + + JavaDStream javaDStream; + javaDStream = ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(javaInputDStream, + new ConfluentStreamingSourceUtil.RecordTransform(conf, outputSchema)); + + if (conf.getSchemaRegistryUrl() != null) { + ConfluentStreamingSourceUtil.AvroRecordTransform transform = + new ConfluentStreamingSourceUtil.AvroRecordTransform(conf, outputSchema); + javaDStream = ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(javaInputDStream, transform); + } + + if (!context.isStateStoreEnabled()) { + // Return the serializable Dstream in case checkpointing is enabled. + return javaDStream; + } + + // Use the DStream that is state aware + + ConfluentDStream confluentDStream = new ConfluentDStream(context.getSparkStreamingContext().ssc(), + javaInputDStream.inputDStream(), + ConfluentStreamingSourceUtil + .getRecordTransformFunction(conf, outputSchema), + getStateConsumer(context)); + return confluentDStream.convertToJavaDStream(); } private Schema getOutputSchema(FailureCollector failureCollector) { @@ -138,4 +186,55 @@ private Schema fetchSchema(CachedSchemaRegistryClient schemaRegistryClient, Stri } return Schema.parseJson(schemaMetadata.getSchema()); } + + private VoidFunction getStateConsumer(StreamingContext context) { + return offsetRanges -> { + try { + saveState(context, offsetRanges); + } catch (IOException e) { + LOG.warn("Exception in saving state.", e); + } + }; + } + + private void saveState(StreamingContext context, OffsetRange[] offsetRanges) throws IOException { + if (offsetRanges.length > 0) { + Map partitionOffsetMap = Arrays.stream(offsetRanges) + .collect(Collectors.toMap(OffsetRange::partition, OffsetRange::untilOffset)); + byte[] state = gson.toJson(new KafkaPartitionOffsets(partitionOffsetMap)).getBytes(StandardCharsets.UTF_8); + context.saveState(conf.getTopic(), state); + } + } + + private Supplier> getStateSupplier(StreamingContext context) { + return () -> { + try { + return getSavedState(context); + } catch (IOException e) { + throw new RuntimeException("Exception in fetching state.", e); + } + }; + } + + private Map getSavedState(StreamingContext context) throws IOException { + //State store is not enabled, do not read state + if (!context.isStateStoreEnabled()) { + return Collections.emptyMap(); + } + + //If state is not present, use configured offsets or defaults + Optional state = context.getState(conf.getTopic()); + if (!state.isPresent()) { + return Collections.emptyMap(); + } + + byte[] bytes = state.get(); + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { + KafkaPartitionOffsets partitionOffsets = gson.fromJson(reader, KafkaPartitionOffsets.class); + return partitionOffsets.getPartitionOffsets().entrySet() + .stream() + .collect(Collectors.toMap(partitionOffset -> new TopicPartition(conf.getTopic(), partitionOffset.getKey()), + Map.Entry::getValue)); + } + } } diff --git a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java index ccfcd9e..3bd3fa5 100644 --- a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java +++ b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java @@ -48,6 +48,7 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; @@ -64,6 +65,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nonnull; /** @@ -81,41 +83,36 @@ private ConfluentStreamingSourceUtil() { /** * Returns {@link JavaDStream} for {@link ConfluentStreamingSource}. - * @param context streaming context - * @param conf kafka conf + * + * @param context streaming context + * @param conf kafka conf * @param outputSchema source output schema - * @param collector failure collector + * @param collector failure collector */ - static JavaDStream getStructuredRecordJavaDStream( - StreamingContext context, ConfluentStreamingSourceConfig conf, Schema outputSchema, FailureCollector collector) { + static JavaInputDStream> getConsumerRecordJavaDStream( + StreamingContext context, ConfluentStreamingSourceConfig conf, Schema outputSchema, FailureCollector collector, + Supplier> stateSupplier) { String pipelineName = context.getPipelineName(); Map kafkaParams = getConsumerParams(conf, pipelineName); Properties properties = new Properties(); properties.putAll(kafkaParams); try (Consumer consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - Map offsets = getOffsets(conf, collector, consumer); + Map offsets = getOffsets(conf, collector, consumer, stateSupplier); LOG.info("Using initial offsets {}", offsets); - if (conf.getSchemaRegistryUrl() != null) { - AvroRecordTransform transform = new AvroRecordTransform(conf, outputSchema); - return createKafkaDirectStream(context, conf, kafkaParams, offsets, transform); - } - return createKafkaDirectStream(context, conf, kafkaParams, offsets, new RecordTransform(conf, outputSchema)); + return KafkaUtils.createDirectStream( + context.getSparkStreamingContext(), LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Collections.singleton(conf.getTopic()), kafkaParams, offsets) + ); } } - private static JavaDStream createKafkaDirectStream( - StreamingContext context, - ConfluentStreamingSourceConfig conf, - Map kafkaParams, - Map offsets, + static JavaDStream getStructuredRecordJavaDStream( + JavaInputDStream> consumerRecordJavaInputDStream, Function2>, Time, JavaRDD> transform ) { - return KafkaUtils.createDirectStream( - context.getSparkStreamingContext(), LocationStrategies.PreferConsistent(), - ConsumerStrategies.Subscribe(Collections.singleton(conf.getTopic()), kafkaParams, offsets) - ).transform(transform); + return consumerRecordJavaInputDStream.transform(transform); } @Nonnull @@ -166,10 +163,10 @@ private static Map getConsumerParams(ConfluentStreamingSourceCon @Nonnull private static Map getOffsets(ConfluentStreamingSourceConfig conf, FailureCollector collector, - Consumer consumer) { - Map offsets = conf.getInitialPartitionOffsets( - getPartitions(consumer, conf, collector), collector); - collector.getOrThrowException(); + Consumer consumer, + Supplier> stateSupplier) { + + Map offsets = getInitialPartitionOffsets(conf, stateSupplier, consumer, collector); // KafkaUtils doesn't understand -1 and -2 as smallest offset and latest offset. // so we have to replace them with the actual smallest and latest @@ -202,6 +199,23 @@ private static Map getOffsets(ConfluentStreamingSourceConf return offsets; } + static Map getInitialPartitionOffsets(ConfluentStreamingSourceConfig conf, + Supplier> stateSupplier, + Consumer consumer, + FailureCollector collector) { + Map savedPartitions = stateSupplier.get(); + if (!savedPartitions.isEmpty()) { + LOG.info("Saved partitions found {}. ", savedPartitions); + return savedPartitions; + } + + LOG.info("No saved partitions found."); + Map offsets = conf.getInitialPartitionOffsets( + getPartitions(consumer, conf, collector), collector); + collector.getOrThrowException(); + return offsets; + } + private static Set getPartitions(Consumer consumer, ConfluentStreamingSourceConfig conf, FailureCollector collector) { Set partitions = conf.getPartitions(collector); @@ -218,10 +232,16 @@ private static Set getPartitions(Consumer consumer, Con return partitions; } + static Function2, Time, StructuredRecord> + getRecordTransformFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + return conf.getFormat() == null ? + new BytesFunction(conf, outputSchema) : new FormatFunction(conf, outputSchema); + } + /** * Applies the format function to each rdd. */ - private static class AvroRecordTransform + static class AvroRecordTransform implements Function2>, Time, JavaRDD> { private final ConfluentStreamingSourceConfig conf; @@ -234,15 +254,19 @@ private static class AvroRecordTransform @Override public JavaRDD call(JavaRDD> input, Time batchTime) { - return input.map(new AvroFunction(batchTime.milliseconds(), conf, outputSchema)); + Function2, Time, StructuredRecord> recordFunction = + new AvroFunction(conf, outputSchema); + + return input.map((Function, StructuredRecord>) consumerRecord -> + recordFunction.call(consumerRecord, batchTime)); } } /** * Applies the format function to each rdd. */ - private static class RecordTransform - implements Function2>, Time, JavaRDD> { + static class RecordTransform + implements Function2>, Time, JavaRDD> { private final ConfluentStreamingSourceConfig conf; private final Schema outputSchema; @@ -253,11 +277,12 @@ private static class RecordTransform } @Override - public JavaRDD call(JavaRDD> input, Time batchTime) { - Function, StructuredRecord> recordFunction = conf.getFormat() == null ? - new BytesFunction(batchTime.milliseconds(), conf, outputSchema) : - new FormatFunction(batchTime.milliseconds(), conf, outputSchema); - return input.map(recordFunction); + public JavaRDD call(JavaRDD> input, Time batchTime) { + Function2, Time, StructuredRecord> recordFunction = conf.getFormat() == null ? + new BytesFunction(conf, outputSchema) : + new FormatFunction(conf, outputSchema); + return input.map((Function, StructuredRecord>) consumerRecord -> + recordFunction.call(consumerRecord, batchTime)); } } @@ -265,26 +290,24 @@ public JavaRDD call(JavaRDD> in * Common logic for transforming kafka key, message, partition, and offset into a structured record. * Everything here should be serializable, as Spark Streaming will serialize all functions. */ - private abstract static class BaseFunction implements Function, StructuredRecord> { + private abstract static class BaseFunction implements Function2, Time, StructuredRecord> { protected final ConfluentStreamingSourceConfig conf; - private final long ts; private final Schema outputSchema; - BaseFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - this.ts = ts; + BaseFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { this.conf = conf; this.outputSchema = outputSchema; } @Override - public StructuredRecord call(ConsumerRecord in) throws Exception { + public StructuredRecord call(ConsumerRecord in, Time batchTime) throws Exception { String timeField = conf.getTimeField(); String keyField = conf.getKeyField(); String partitionField = conf.getPartitionField(); String offsetField = conf.getOffsetField(); StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema); if (timeField != null) { - builder.set(timeField, ts); + builder.set(timeField, batchTime.milliseconds()); } if (keyField != null) { builder.set(keyField, convertKey(in.key())); @@ -304,20 +327,20 @@ public StructuredRecord call(ConsumerRecord in) throws Exception { protected abstract void addMessage(StructuredRecord.Builder builder, V message) throws Exception; } - private abstract static class BinaryBaseFunction extends BaseFunction { - BinaryBaseFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + private abstract static class BinaryBaseFunction extends BaseFunction { + BinaryBaseFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override - protected Object convertKey(byte[] key) { + protected Object convertKey(Object key) { if (key == null) { return null; } Schema keySchemaNullable = conf.getSchema().getField(conf.getKeyField()).getSchema(); Schema keySchema = keySchemaNullable.isNullable() ? keySchemaNullable.getNonNullable() : keySchemaNullable; if (keySchema.getType() == Schema.Type.STRING) { - return new String(key, StandardCharsets.UTF_8); + return new String((byte[]) key, StandardCharsets.UTF_8); } if (keySchema.getType() == Schema.Type.BYTES) { return key; @@ -333,12 +356,12 @@ protected Object convertKey(byte[] key) { private static class BytesFunction extends BinaryBaseFunction { private transient String messageField; - BytesFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + BytesFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override - protected void addMessage(StructuredRecord.Builder builder, byte[] message) { + protected void addMessage(StructuredRecord.Builder builder, Object message) { builder.set(getMessageField(), message); } @@ -367,12 +390,12 @@ private String getMessageField() { private static class FormatFunction extends BinaryBaseFunction { private transient RecordFormat recordFormat; - FormatFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + FormatFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override - protected void addMessage(StructuredRecord.Builder builder, byte[] message) throws Exception { + protected void addMessage(StructuredRecord.Builder builder, Object message) throws Exception { // first time this was called, initialize record format if (recordFormat == null) { Schema messageSchema = conf.getMessageSchema(); @@ -380,7 +403,7 @@ protected void addMessage(StructuredRecord.Builder builder, byte[] message) thro recordFormat = RecordFormats.createInitializedFormat(spec); } - StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message)); + StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap((byte[]) message)); for (Schema.Field field : messageRecord.getSchema().getFields()) { String fieldName = field.getName(); builder.set(fieldName, messageRecord.get(fieldName)); @@ -391,8 +414,8 @@ protected void addMessage(StructuredRecord.Builder builder, byte[] message) thro private static class AvroFunction extends BaseFunction { private transient AvroToStructuredTransformer transformer; - AvroFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + AvroFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override diff --git a/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala b/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala new file mode 100644 index 0000000..27f94d7 --- /dev/null +++ b/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala @@ -0,0 +1,76 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.confluent.source + +import io.cdap.cdap.api.data.format.StructuredRecord +import io.cdap.cdap.etl.api.streaming.StreamingEventHandler +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.spark.api.java.function.{Function2, VoidFunction} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.{Duration, StreamingContext, Time} +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} + +import java.util.function.Consumer + +/** + * DStream that implements {@link StreamingEventHandler} . + * This DStream will keep the Confluent offsets for each batch RDD before applying the _transformFunction. + * On calling onBatchCompleted, the _stateConsumer will be provided with these offsets. + * + * @param _ssc Spark streaming context + * @param _kafkaDStream DStream created through KafkaUtil.createDirectStream + * @param _kafkaConf Config object for Kafka Streaming Source + * @param _stateConsumer Consumer function for the state produced + */ +class ConfluentDStream(_ssc: StreamingContext, + _kafkaDStream: InputDStream[ConsumerRecord[Object, Object]], + _transformFunction: Function2[ConsumerRecord[Object, Object], Time, StructuredRecord], + _stateConsumer: VoidFunction[Array[OffsetRange]]) + extends DStream[StructuredRecord](_ssc) with StreamingEventHandler { + + // For keeping the offsets in each batch + private var offsetRanges: Array[OffsetRange] = Array[OffsetRange]() + + override def slideDuration: Duration = _kafkaDStream.slideDuration + + override def dependencies: List[DStream[_]] = List(_kafkaDStream) + + override def compute(validTime: Time): Option[RDD[StructuredRecord]] = { + val rddOption = _kafkaDStream.compute(validTime) + val transformFn = _transformFunction; + // If there is a RDD produced, cache the offsetRanges for the batch and then transform to RDD[StructuredRecord] + rddOption.map(rdd => { + offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd.map(record => transformFn.call(record, validTime)) + }) + } + + override def onBatchCompleted(context: io.cdap.cdap.etl.api.streaming.StreamingContext): Unit = { + _stateConsumer.call(offsetRanges) + } + + /** + * Convert this to a {@link JavaDStream} + * + * @return JavaDStream + */ + def convertToJavaDStream(): JavaDStream[StructuredRecord] = { + JavaDStream.fromDStream(this) + } +} diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java index 8bb7408..8fd0790 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java @@ -279,7 +279,7 @@ private List> waitForRecordsInKafka(Consumer c throws Exception { List> result = new ArrayList<>(); Stopwatch stopwatch = new Stopwatch(); - while (result.size() < expectedMessages && stopwatch.elapsed(TimeUnit.SECONDS) < 10) { + while (result.size() < expectedMessages && stopwatch.elapsedTime(TimeUnit.SECONDS) < 10) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { result.add(record); diff --git a/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java b/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java index 6c2353f..e0be8dd 100644 --- a/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java +++ b/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java @@ -53,6 +53,10 @@ public void setPartitionOffset(int partition, long offset) { public long getPartitionOffset(int partition, long defaultValue) { return partitionOffsets.getOrDefault(partition, defaultValue); } + + public Map getPartitionOffsets() { + return Collections.unmodifiableMap(partitionOffsets); + } /** * Loads the {@link KafkaPartitionOffsets} from the given input file. diff --git a/pom.xml b/pom.xml index 521b259..154e854 100644 --- a/pom.xml +++ b/pom.xml @@ -87,10 +87,10 @@ UTF-8 - 6.1.1 - 2.3.5 + 6.9.0-SNAPSHOT + 2.11.0-SNAPSHOT 1.6.1 - 2.3.0 + 3.1.2 widgets docs 0.8.2.2 From 501ef7fb55d658867a5ee69736be5a5cec39d2fd Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Thu, 8 Dec 2022 10:53:53 +0530 Subject: [PATCH 2/6] CDAP-20176 : Upgrade hadoop version to 2.6.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 154e854..2850c73 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ docs 0.8.2.2 0.10.2.0 - 2.3.0 + 2.6.0 4.1.16.Final 1.3.0 1.8.2 From cad91d3a285ea955a44e132503a2d23c42e7257d Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Thu, 8 Dec 2022 11:30:11 +0530 Subject: [PATCH 3/6] CDAP-20176 : Refactor imports and comments --- .../plugin/confluent/source/ConfluentDStream.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala b/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala index 27f94d7..6fdfded 100644 --- a/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala +++ b/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala @@ -22,21 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.api.java.function.{Function2, VoidFunction} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.api.java.JavaDStream -import org.apache.spark.streaming.{Duration, StreamingContext, Time} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} - -import java.util.function.Consumer +import org.apache.spark.streaming.{Duration, StreamingContext, Time} /** * DStream that implements {@link StreamingEventHandler} . * This DStream will keep the Confluent offsets for each batch RDD before applying the _transformFunction. * On calling onBatchCompleted, the _stateConsumer will be provided with these offsets. * - * @param _ssc Spark streaming context - * @param _kafkaDStream DStream created through KafkaUtil.createDirectStream - * @param _kafkaConf Config object for Kafka Streaming Source - * @param _stateConsumer Consumer function for the state produced + * @param _ssc Spark streaming context + * @param _kafkaDStream DStream created through KafkaUtil.createDirectStream + * @param _transformFunction Function for transforming consumer record into structured record + * @param _stateConsumer Consumer function for the state produced */ class ConfluentDStream(_ssc: StreamingContext, _kafkaDStream: InputDStream[ConsumerRecord[Object, Object]], From 478daea8f05678ce6d780e54866c1e88bb25756d Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Fri, 9 Dec 2022 20:33:10 +0530 Subject: [PATCH 4/6] Upgrade version Upgrade version for cdap-data-pipeline and commons-lang3 --- kafka-plugins-common/pom.xml | 2 +- pom.xml | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka-plugins-common/pom.xml b/kafka-plugins-common/pom.xml index 6084048..0345e7d 100644 --- a/kafka-plugins-common/pom.xml +++ b/kafka-plugins-common/pom.xml @@ -30,7 +30,7 @@ io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline3_2.12 ${cdap.version} test diff --git a/pom.xml b/pom.xml index 2850c73..9639f01 100644 --- a/pom.xml +++ b/pom.xml @@ -150,6 +150,10 @@ org.apache.spark spark-core_2.10 + + io.cdap.cdap + cdap-spark-core2_2.11 + @@ -277,7 +281,7 @@ org.apache.commons commons-lang3 - 3.0 + 3.9 com.google.guava From e4e106651d9a232b0c6d3a69a64cf2ca265e1ebd Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Fri, 9 Dec 2022 20:36:37 +0530 Subject: [PATCH 5/6] Add Integration Test Cases --- .../confluent/integration/KafkaTestUtils.java | 10 +- .../streaming/ConfluentStreamingTestBase.java | 16 +- .../sink/ConfluentStreamingSinkTest.java | 2 +- ...tStreamingSourceStateStoreFailureTest.java | 187 ++++++++++++++++ ...StreamingSourceStateStoreRecoveryTest.java | 203 +++++++++++++++++ ...onfluentStreamingSourceStateStoreTest.java | 208 ++++++++++++++++++ .../source/ConfluentStreamingSourceTest.java | 2 +- 7 files changed, 621 insertions(+), 7 deletions(-) create mode 100644 confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreFailureTest.java create mode 100644 confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreRecoveryTest.java create mode 100644 confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java index 59fa3c0..e599cf0 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java @@ -170,8 +170,14 @@ private static Map getSecurityProps() { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " + - "username=" + CLUSTER_API_KEY + " password=" + CLUSTER_API_SECRET + ";"); + /*props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"" + CLUSTER_API_KEY + "\" password=\"" + CLUSTER_API_SECRET + "\";");*/ + props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=\"%s\" password=\"%s\";", CLUSTER_API_KEY, CLUSTER_API_SECRET)); return props; + + + + } } diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java index 60b45d4..972c431 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java @@ -24,6 +24,7 @@ import io.cdap.cdap.datastreams.DataStreamsSparkLauncher; import io.cdap.cdap.etl.mock.batch.MockSink; import io.cdap.cdap.etl.mock.test.HydratorTestBase; +import io.cdap.cdap.etl.mock.transform.RecoveringTransform; import io.cdap.cdap.etl.proto.v2.DataStreamsConfig; import io.cdap.cdap.etl.proto.v2.ETLPlugin; import io.cdap.cdap.etl.proto.v2.ETLStage; @@ -72,7 +73,7 @@ public static void setupBasic() throws Exception { LOG.info("Setting up plugins"); addPluginArtifact( - NamespaceId.DEFAULT.artifact("confluent-kafka-plugins", "1.0.0"), + NamespaceId.DEFAULT.artifact("confluent-kafka-plugins", "1.1.0"), APP_ARTIFACT_ID, ConfluentStreamingSource.class, ConfluentStreamingSink.class, KafkaUtils.class, TopicPartition.class, @@ -81,9 +82,10 @@ public static void setupBasic() throws Exception { ); } - protected SparkManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin, String appName) throws Exception { + protected SparkManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin, String appName, boolean isRecovery) throws Exception { ETLStage source = new ETLStage("source", sourcePlugin); ETLStage sink = new ETLStage("sink", sinkPlugin); + DataStreamsConfig etlConfig = DataStreamsConfig.builder() .addStage(source) .addStage(sink) @@ -91,7 +93,15 @@ protected SparkManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin, S .setBatchInterval("1s") .setStopGracefully(true) .build(); - + if (isRecovery) { + etlConfig = DataStreamsConfig.builder() + .addStage(source) + .addStage(new ETLStage("retry_transform", RecoveringTransform.getPlugin())) + .addStage(sink) + .addConnection("source", "retry_transform") + .addConnection("retry_transform", "sink") + .build(); + } AppRequest appRequest = new AppRequest<>(APP_ARTIFACT, etlConfig); ApplicationId appId = NamespaceId.DEFAULT.app(appName); ApplicationManager applicationManager = deployApplication(appId, appRequest); diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java index 8fd0790..de81353 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java @@ -271,7 +271,7 @@ private SparkManager deploySourcePlugin(ETLPlugin sourcePlugin, Map kafkaProducer; + private static KafkaProducer kafkaAvroProducer; + private static final Gson GSON = new Gson(); + + @Rule + public TestName testName = new TestName(); + + private String topic; + private String outputTable; + private SparkManager programManager; + + @BeforeClass + public static void setupTestClass() { + kafkaProducer = KafkaTestUtils.createProducer(); + kafkaAvroProducer = KafkaTestUtils.createProducerForSchemaRegistry(); + } + + @AfterClass + public static void cleanupTestClass() { + RecoveringTransform.reset(); + kafkaProducer.close(); + kafkaAvroProducer.close(); + } + + @Before + public void setUp() { + outputTable = testName.getMethodName() + "_out"; + topic = ConfluentStreamingSourceStateStoreFailureTest.class.getSimpleName() + "_" + testName.getMethodName(); + KafkaTestUtils.deleteTopic(topic); + KafkaTestUtils.createTopic(topic, 2, 3); + } + + @After + public void tearDown() throws Exception { + KafkaTestUtils.deleteTopic(topic); + if (programManager != null) { + programManager.stop(); + programManager.waitForStopped(10, TimeUnit.SECONDS); + programManager.waitForRun(ProgramRunStatus.KILLED, 10, TimeUnit.SECONDS); + } + } + + @Test + public void testConfluentStreamingSource() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING)) + ); + Map properties = getConfigProperties(schema); + properties.put(ConfluentStreamingSourceConfig.NAME_FORMAT, "csv"); + programManager = deploySourcePlugin(properties); + + ApplicationId appId = NamespaceId.DEFAULT.app("KafkaSourceApp"); + + programManager.startAndWaitForRun(ProgramRunStatus.RUNNING, 2, TimeUnit.MINUTES); + + // write some messages to kafka + Map messages = new HashMap<>(); + messages.put("a", "1,samuel,jackson"); + messages.put("b", "2,dwayne,johnson"); + messages.put("c", "3,christopher,walken"); + for (Map.Entry entry : messages.entrySet()) { + sendKafkaMessage(topic, 0, entry.getKey(), entry.getValue()); + } + + // Launch the program with runtime args for timeout + programManager.start(ImmutableMap.of( + "cdap.streaming.maxRetryTimeInMins", "1", + "cdap.streaming.baseRetryDelayInSeconds", "60")); + programManager.waitForRun(ProgramRunStatus.RUNNING, 2, TimeUnit.MINUTES); + + // Should fail after retry times out + programManager.waitForRun(ProgramRunStatus.FAILED, 2, TimeUnit.MINUTES); + + // Verify that state is not saved. + AppStateStore appStateStore = TestBase.getAppStateStore(appId.getNamespace(), + appId.getApplication()); + Optional savedState = appStateStore.getState("source" + "." + topic); + Assert.assertFalse(savedState.isPresent()); + + } + + private Map getConfigProperties(Schema schema) { + Map properties = new HashMap<>(); + properties.put(Constants.Reference.REFERENCE_NAME, "confluent"); + properties.put(ConfluentStreamingSourceConfig.NAME_BROKERS, KafkaTestUtils.KAFKA_SERVER); + properties.put(ConfluentStreamingSourceConfig.NAME_TOPIC, topic); + properties.put(ConfluentStreamingSourceConfig.NAME_DEFAULT_INITIAL_OFFSET, + String.valueOf(ListOffsetRequest.EARLIEST_TIMESTAMP)); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_KEY, KafkaTestUtils.CLUSTER_API_KEY); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_SECRET, KafkaTestUtils.CLUSTER_API_SECRET); + properties.put(ConfluentStreamingSourceConfig.NAME_SCHEMA, schema.toString()); + properties.put(ConfluentStreamingSourceConfig.NAME_MAX_RATE, "1000"); + return properties; + } + + private SparkManager deploySourcePlugin(Map properties) throws Exception { + return deployETL( + new ETLPlugin(ConfluentStreamingSource.PLUGIN_NAME, StreamingSource.PLUGIN_TYPE, properties, null), + MockSink.getPlugin(outputTable), + "KafkaSourceApp", true + ); + } + + private void sendKafkaMessage(String topic, @Nullable Integer partition, @Nullable String key, String value) { + byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) : null; + try { + kafkaProducer.send(new ProducerRecord<>(topic, partition, keyBytes, valueBytes)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreRecoveryTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreRecoveryTest.java new file mode 100644 index 0000000..38af09b --- /dev/null +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreRecoveryTest.java @@ -0,0 +1,203 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.confluent.integration.streaming.source; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import io.cdap.cdap.api.app.AppStateStore; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.common.utils.Tasks; +import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.transform.RecoveringTransform; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.SparkManager; +import io.cdap.cdap.test.TestBase; +import io.cdap.plugin.batch.source.KafkaPartitionOffsets; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.confluent.integration.KafkaTestUtils; +import io.cdap.plugin.confluent.integration.streaming.ConfluentStreamingTestBase; +import io.cdap.plugin.confluent.streaming.source.ConfluentStreamingSource; +import io.cdap.plugin.confluent.streaming.source.ConfluentStreamingSourceConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Tests for Confluent Streaming Source plugin. + */ +public class ConfluentStreamingSourceStateStoreRecoveryTest extends ConfluentStreamingTestBase { + + private static KafkaProducer kafkaProducer; + private static KafkaProducer kafkaAvroProducer; + private static final Gson GSON = new Gson(); + + @Rule + public TestName testName = new TestName(); + + private String topic; + private String outputTable; + private SparkManager programManager; + + @BeforeClass + public static void setupTestClass() { + kafkaProducer = KafkaTestUtils.createProducer(); + kafkaAvroProducer = KafkaTestUtils.createProducerForSchemaRegistry(); + } + + @AfterClass + public static void cleanupTestClass() { + RecoveringTransform.reset(); + kafkaProducer.close(); + kafkaAvroProducer.close(); + } + + @Before + public void setUp() { + outputTable = testName.getMethodName() + "_out"; + topic = ConfluentStreamingSourceStateStoreRecoveryTest.class.getSimpleName() + "_" + testName.getMethodName(); + KafkaTestUtils.deleteTopic(topic); + KafkaTestUtils.createTopic(topic, 2, 3); + } + + @After + public void tearDown() throws Exception { + KafkaTestUtils.deleteTopic(topic); + if (programManager != null) { + programManager.stop(); + programManager.waitForStopped(10, TimeUnit.SECONDS); + programManager.waitForRun(ProgramRunStatus.KILLED, 10, TimeUnit.SECONDS); + } + } + + @Test + public void testConfluentStreamingSource() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING)) + ); + Map properties = getConfigProperties(schema); + properties.put(ConfluentStreamingSourceConfig.NAME_FORMAT, "csv"); + programManager = deploySourcePlugin(properties); + + ApplicationId appId = NamespaceId.DEFAULT.app("KafkaSourceApp"); + // Save an entry for offset 1 (second in the data) in state store with reference name. + AppStateStore appStateStore = TestBase.getAppStateStore(appId.getNamespace(), appId.getApplication()); + appStateStore.saveState("source" + "." + topic, + GSON.toJson(new KafkaPartitionOffsets(Collections.singletonMap(0, 1L))) + .getBytes(StandardCharsets.UTF_8)); + + programManager.startAndWaitForRun(ProgramRunStatus.RUNNING, 2, TimeUnit.MINUTES); + + // write some messages to kafka + Map messages = new HashMap<>(); + messages.put("a", "1,samuel,jackson"); + messages.put("b", "2,dwayne,johnson"); + messages.put("c", "3,christopher,walken"); + for (Map.Entry entry : messages.entrySet()) { + sendKafkaMessage(topic, 0, entry.getKey(), entry.getValue()); + } + + final DataSetManager outputManager = getDataset("kafkaOutput"); + // Should start reading from offset 1, so should skip the message at offset 0. + Tasks.waitFor( + ImmutableMap.of(2L, "dwayne johnson", 3L, "christopher walken"), + () -> { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put(outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + }, 2, TimeUnit.MINUTES); + + // Verify that state is saved with the next offset to start from. + Tasks.waitFor(3L, () -> { + Optional savedState = appStateStore.getState("source" + "." + topic); + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(savedState.get()), + StandardCharsets.UTF_8)) { + KafkaPartitionOffsets partitionOffsets = GSON.fromJson(reader, KafkaPartitionOffsets.class); + Long savedOffset = partitionOffsets.getPartitionOffsets().get(0); + return savedOffset.longValue(); + } + }, 2, TimeUnit.MINUTES); + + + // stop the run + programManager.stop(); + programManager.waitForRun(ProgramRunStatus.KILLED, 2, TimeUnit.MINUTES); + } + + private Map getConfigProperties(Schema schema) { + Map properties = new HashMap<>(); + properties.put(Constants.Reference.REFERENCE_NAME, "confluent"); + properties.put(ConfluentStreamingSourceConfig.NAME_BROKERS, KafkaTestUtils.KAFKA_SERVER); + properties.put(ConfluentStreamingSourceConfig.NAME_TOPIC, topic); + properties.put(ConfluentStreamingSourceConfig.NAME_DEFAULT_INITIAL_OFFSET, + String.valueOf(ListOffsetRequest.EARLIEST_TIMESTAMP)); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_KEY, KafkaTestUtils.CLUSTER_API_KEY); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_SECRET, KafkaTestUtils.CLUSTER_API_SECRET); + properties.put(ConfluentStreamingSourceConfig.NAME_SCHEMA, schema.toString()); + properties.put(ConfluentStreamingSourceConfig.NAME_MAX_RATE, "1000"); + return properties; + } + + private SparkManager deploySourcePlugin(Map properties) throws Exception { + return deployETL( + new ETLPlugin(ConfluentStreamingSource.PLUGIN_NAME, StreamingSource.PLUGIN_TYPE, properties, null), + MockSink.getPlugin(outputTable), + "KafkaSourceApp", true + ); + } + + private void sendKafkaMessage(String topic, @Nullable Integer partition, @Nullable String key, String value) { + byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) : null; + try { + kafkaProducer.send(new ProducerRecord<>(topic, partition, keyBytes, valueBytes)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java new file mode 100644 index 0000000..047f31d --- /dev/null +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java @@ -0,0 +1,208 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.confluent.integration.streaming.source; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import io.cdap.cdap.api.app.AppStateStore; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.common.utils.Tasks; +import io.cdap.cdap.datastreams.DataStreamsSparkLauncher; +import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.transform.RecoveringTransform; +import io.cdap.cdap.etl.proto.v2.DataStreamsConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.SparkManager; +import io.cdap.cdap.test.TestBase; +import io.cdap.plugin.batch.source.KafkaPartitionOffsets; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.confluent.integration.KafkaTestUtils; +import io.cdap.plugin.confluent.integration.streaming.ConfluentStreamingTestBase; +import io.cdap.plugin.confluent.streaming.source.ConfluentStreamingSource; +import io.cdap.plugin.confluent.streaming.source.ConfluentStreamingSourceConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Tests for Confluent Streaming Source plugin. + */ +public class ConfluentStreamingSourceStateStoreTest extends ConfluentStreamingTestBase { + + private static KafkaProducer kafkaProducer; + private static KafkaProducer kafkaAvroProducer; + private static final Gson GSON = new Gson(); + + @Rule + public TestName testName = new TestName(); + + private String topic; + private String outputTable; + private SparkManager programManager; + + @BeforeClass + public static void setupTestClass() { + kafkaProducer = KafkaTestUtils.createProducer(); + kafkaAvroProducer = KafkaTestUtils.createProducerForSchemaRegistry(); + } + + @AfterClass + public static void cleanupTestClass() { + kafkaProducer.close(); + kafkaAvroProducer.close(); + } + + @Before + public void setUp() { + outputTable = testName.getMethodName() + "_out"; + topic = ConfluentStreamingSourceStateStoreTest.class.getSimpleName() + "_" + testName.getMethodName(); + KafkaTestUtils.deleteTopic(topic); + KafkaTestUtils.createTopic(topic, 2, 3); + } + + @After + public void tearDown() throws Exception { + KafkaTestUtils.deleteTopic(topic); + if (programManager != null) { + programManager.stop(); + programManager.waitForStopped(10, TimeUnit.SECONDS); + programManager.waitForRun(ProgramRunStatus.KILLED, 10, TimeUnit.SECONDS); + } + } + + @Test + public void testConfluentStreamingSource() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING)) + ); + Map properties = getConfigProperties(schema); + properties.put(ConfluentStreamingSourceConfig.NAME_FORMAT, "csv"); + programManager = deploySourcePlugin(properties); + + ApplicationId appId = NamespaceId.DEFAULT.app("KafkaSourceApp"); + // Save an entry for offset 1 (second in the data) in state store with reference name. + AppStateStore appStateStore = TestBase.getAppStateStore(appId.getNamespace(), appId.getApplication()); + appStateStore.saveState("source" + "." + topic, + GSON.toJson(new KafkaPartitionOffsets(Collections.singletonMap(0, 1L))) + .getBytes(StandardCharsets.UTF_8)); + + programManager.startAndWaitForRun(ProgramRunStatus.RUNNING, 2, TimeUnit.MINUTES); + + // write some messages to kafka + Map messages = new HashMap<>(); + messages.put("a", "1,samuel,jackson"); + messages.put("b", "2,dwayne,johnson"); + messages.put("c", "3,christopher,walken"); + for (Map.Entry entry : messages.entrySet()) { + sendKafkaMessage(topic, 0, entry.getKey(), entry.getValue()); + } + + final DataSetManager
outputManager = getDataset("kafkaOutput"); + // Should start reading from offset 1, so should skip the message at offset 0. + Tasks.waitFor( + ImmutableMap.of(2L, "dwayne johnson", 3L, "christopher walken"), + () -> { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put(outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + }, 2, TimeUnit.MINUTES); + + // Verify that state is saved with the next offset to start from. + Tasks.waitFor(3L, () -> { + Optional savedState = appStateStore.getState("source" + "." + topic); + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(savedState.get()), + StandardCharsets.UTF_8)) { + KafkaPartitionOffsets partitionOffsets = GSON.fromJson(reader, KafkaPartitionOffsets.class); + Long savedOffset = partitionOffsets.getPartitionOffsets().get(0); + return savedOffset.longValue(); + } + }, 2, TimeUnit.MINUTES); + + + // stop the run + programManager.stop(); + programManager.waitForRun(ProgramRunStatus.KILLED, 2, TimeUnit.MINUTES); + } + + private Map getConfigProperties(Schema schema) { + Map properties = new HashMap<>(); + properties.put(Constants.Reference.REFERENCE_NAME, "confluent"); + properties.put(ConfluentStreamingSourceConfig.NAME_BROKERS, KafkaTestUtils.KAFKA_SERVER); + properties.put(ConfluentStreamingSourceConfig.NAME_TOPIC, topic); + properties.put(ConfluentStreamingSourceConfig.NAME_DEFAULT_INITIAL_OFFSET, + String.valueOf(ListOffsetRequest.EARLIEST_TIMESTAMP)); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_KEY, KafkaTestUtils.CLUSTER_API_KEY); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_SECRET, KafkaTestUtils.CLUSTER_API_SECRET); + properties.put(ConfluentStreamingSourceConfig.NAME_SCHEMA, schema.toString()); + properties.put(ConfluentStreamingSourceConfig.NAME_MAX_RATE, "1000"); + return properties; + } + + private SparkManager deploySourcePlugin(Map properties) throws Exception { + return deployETL( + new ETLPlugin(ConfluentStreamingSource.PLUGIN_NAME, StreamingSource.PLUGIN_TYPE, properties, null), + MockSink.getPlugin(outputTable), + "KafkaSourceApp", false + ); + } + + private void sendKafkaMessage(String topic, @Nullable Integer partition, @Nullable String key, String value) { + byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) : null; + try { + kafkaProducer.send(new ProducerRecord<>(topic, partition, keyBytes, valueBytes)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java index 9dca9e5..101200e 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java @@ -429,7 +429,7 @@ private SparkManager deploySourcePlugin(Map properties) throws E return deployETL( new ETLPlugin(ConfluentStreamingSource.PLUGIN_NAME, StreamingSource.PLUGIN_TYPE, properties, null), MockSink.getPlugin(outputTable), - "KafkaSourceApp" + "KafkaSourceApp", false ); } From e523626b5cc91d4709484c302b5a0ddc44dd89eb Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Tue, 20 Dec 2022 12:46:56 +0530 Subject: [PATCH 6/6] Fix test cases --- .../cdap/plugin/confluent/integration/KafkaTestUtils.java | 6 ++---- .../integration/streaming/ConfluentStreamingTestBase.java | 7 ++++++- .../streaming/source/ConfluentStreamingSourceTest.java | 4 ++-- pom.xml | 7 ++++++- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java index e599cf0..af6a583 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/KafkaTestUtils.java @@ -170,10 +170,8 @@ private static Map getSecurityProps() { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - /*props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " + - "username=\"" + CLUSTER_API_KEY + "\" password=\"" + CLUSTER_API_SECRET + "\";");*/ - props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule " + - "required username=\"%s\" password=\"%s\";", CLUSTER_API_KEY, CLUSTER_API_SECRET)); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"" + CLUSTER_API_KEY + "\" password=\"" + CLUSTER_API_SECRET + "\";"); return props; diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java index 972c431..fed460f 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/ConfluentStreamingTestBase.java @@ -50,6 +50,7 @@ import org.awaitility.Awaitility; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,9 @@ public abstract class ConfluentStreamingTestBase extends HydratorTestBase { public static final TestConfiguration CONFIG = new TestConfiguration(Constants.Explore.EXPLORE_ENABLED, false, Constants.AppFabric.SPARK_COMPAT, Compat.SPARK_COMPAT); + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + private static final Logger LOG = LoggerFactory.getLogger(ConfluentStreamingTestBase.class); private static final ArtifactId APP_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-streams", "1.0.0"); private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-streams", "1.0.0"); @@ -82,7 +86,8 @@ public static void setupBasic() throws Exception { ); } - protected SparkManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin, String appName, boolean isRecovery) throws Exception { + protected SparkManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin, String appName, boolean isRecovery) + throws Exception { ETLStage source = new ETLStage("source", sourcePlugin); ETLStage sink = new ETLStage("sink", sinkPlugin); diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java index 101200e..0be6b5b 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java @@ -99,7 +99,7 @@ public void tearDown() throws Exception { KafkaTestUtils.deleteTopic(topic); if (programManager != null) { programManager.stop(); - programManager.waitForStopped(10, TimeUnit.SECONDS); + programManager.waitForStopped(1, TimeUnit.MINUTES); programManager.waitForRun(ProgramRunStatus.KILLED, 10, TimeUnit.SECONDS); } } @@ -129,7 +129,7 @@ public void testConfluentStreamingSource() throws Exception { waitForRecords(outputTable, expectedRecords); programManager.stop(); - programManager.waitForStopped(10, TimeUnit.SECONDS); + programManager.waitForStopped(1, TimeUnit.MINUTES); // clear the output table DataSetManager
outputManager = getDataset(outputTable); diff --git a/pom.xml b/pom.xml index 9639f01..33534e4 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ UTF-8 - 6.9.0-SNAPSHOT + 6.8.0 2.11.0-SNAPSHOT 1.6.1 3.1.2 @@ -300,6 +300,11 @@ ${netty-http.version} test + + com.thoughtworks.paranamer + paranamer + 2.8 +