diff --git a/examples/sql/pom.xml b/examples/sql/pom.xml
index e06fe6ce00..45eb88eda3 100644
--- a/examples/sql/pom.xml
+++ b/examples/sql/pom.xml
@@ -90,6 +90,19 @@
+
+ org.apache.apex
+ malhar-kafka-common
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ *
+ *
+
+
+
org.apache.kafka
kafka_2.11
diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
index ebb46e5acf..250201e027 100644
--- a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
+++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
@@ -27,6 +27,7 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
/**
@@ -53,7 +54,7 @@ public interface AbstractKafkaConsumer
* @param timeOut time in milliseconds, spent waiting in poll if data is not available in buffer.
* @return records
*/
- ConsumerRecords pollRecords(long timeOut);
+ ConsumerRecords pollRecords(long timeOut);
/**
* Commit the specified offsets for the specified list of topics and partitions to Kafka.
@@ -124,4 +125,6 @@ public interface AbstractKafkaConsumer
* @param tp partition
*/
long positionPartition(TopicPartition tp);
+
+ List partitionsFor(String topic);
}
diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaExactlyOnceOutputOperator.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaExactlyOnceOutputOperator.java
new file mode 100644
index 0000000000..ec99931b0e
--- /dev/null
+++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaExactlyOnceOutputOperator.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics.
+ *
+ *
+ *
+ * Requirements
+ *
In the Kafka message, only Value will be available for users
+ * Users need to provide Value deserializers for Kafka message as it is used during recovery
+ * Value type should have well defined Equals & HashCodes,
+ * as during messages are stored in HashMaps for comparison.
+ *
+ * Recovery handling
+ *
Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow
+ * During recovery,
+ *
+ * Partially written Streaming Window before the crash is constructed. ( Explained below )
+ * Tuples from the completed Streaming Window's are skipped
+ * Tuples coming for the partially written Streaming Window are skipped.
+ * (No assumption is made on the order and the uniqueness of the tuples)
+ *
+ *
+ *
+ *
+ *
+ * Partial Window Construction
+ *
Operator uses the Key in the Kafka message, which is not available for use by the operator users.
+ * Key is used to uniquely identify the message written by the particular instance of this operator.
+ * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
+ * During recovery Kafka partitions are read between the latest offset and the last written offsets.
+ * All the tuples written by the particular instance is kept in the Map
+ *
+ *
+ *
+ * Limitations
+ *
Key in the Kafka message is reserved for Operator's use
+ * During recovery, operator needs to read tuples between 2 offsets,
+ * if there are lot of data to be read, Operator may
+ * appear to be blocked to the Stram and can kill the operator.
+ *
+ *
+ * @displayName Kafka Single Port Exactly Once Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ * @since 3.5.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractKafkaExactlyOnceOutputOperator extends AbstractKafkaOutputOperator
+ implements Operator.CheckpointNotificationListener
+{
+ private transient String key;
+ private transient String appName;
+ private transient Integer operatorId;
+ private transient Long windowId;
+ private transient Map partialWindowTuples = new HashMap<>();
+ private transient AbstractKafkaConsumer consumer;
+
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+ private final int KAFKA_CONNECT_ATTEMPT = 10;
+ private final String KEY_SEPARATOR = "#";
+
+ public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+ public final transient DefaultInputPort inputPort = new DefaultInputPort()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ sendTuple(tuple);
+ }
+ };
+
+ // Creates the consumer object and it wraps KafkaConsumer.
+ public abstract AbstractKafkaConsumer createConsumer(Properties prop);
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+
+ if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ throw new IllegalArgumentException(
+ "Value deserializer needs to be set for the operator, as it is used during recovery.");
+ }
+
+ super.setup(context);
+
+ this.operatorId = context.getId();
+ this.windowDataManager.setup(context);
+ this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+ this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+
+ this.consumer = KafkaConsumerInit();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ this.windowId = windowId;
+
+ if (windowId == windowDataManager.getLargestCompletedWindow()) {
+ rebuildPartialWindow();
+ }
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.committed(windowId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ consumer.close();
+ super.teardown();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+ return;
+ }
+
+ if (!partialWindowTuples.isEmpty()) {
+ throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset.");
+ }
+
+ // Every tuples should be written before the offsets are stored in the window data manager.
+ getProducer().flush();
+
+ try {
+ this.windowDataManager.save(getPartitionsAndOffsets(true), windowId);
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public WindowDataManager getWindowDataManager()
+ {
+ return windowDataManager;
+ }
+
+ public void setWindowDataManager(WindowDataManager windowDataManager)
+ {
+ this.windowDataManager = windowDataManager;
+ }
+
+ private boolean doesKeyBelongsToThisInstance(int operatorId, String key)
+ {
+ String[] split = key.split(KEY_SEPARATOR);
+
+ if (split.length != 2) {
+ return false;
+ }
+
+ if ((Integer.parseInt(split[1]) == operatorId) && (split[0].equals(appName))) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean alreadyInKafka(T message)
+ {
+ if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+ return true;
+ }
+
+ if (partialWindowTuples.containsKey(message)) {
+
+ Integer val = partialWindowTuples.get(message);
+
+ if (val == 0) {
+ return false;
+ } else if (val == 1) {
+ partialWindowTuples.remove(message);
+ } else {
+ partialWindowTuples.put(message, val - 1);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private Map getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
+ {
+ List partitionInfoList = consumer.partitionsFor(getTopic());
+ List topicPartitionList = new ArrayList<>();
+
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()));
+ }
+
+ Map parttionsAndOffset = new HashMap<>();
+ consumer.assignPartitions(topicPartitionList);
+
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ try {
+ TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
+ if (latest) {
+ consumer.seekToEnd(topicPartition);
+ } else {
+ consumer.seekToBeginning(topicPartition);
+ }
+ parttionsAndOffset.put(partitionInfo.partition(), consumer.positionPartition(topicPartition));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return parttionsAndOffset;
+ }
+
+ private void rebuildPartialWindow()
+ {
+ logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow());
+
+ Map storedOffsets;
+ Map currentOffsets;
+
+ try {
+ storedOffsets = (Map)this.windowDataManager.retrieve(windowId);
+ currentOffsets = getPartitionsAndOffsets(true);
+ } catch (IOException | ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (currentOffsets == null) {
+ logger.info("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
+ return;
+ }
+
+ if (storedOffsets == null) {
+
+ logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition.");
+
+ try {
+ storedOffsets = getPartitionsAndOffsets(false);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ List topicPartitions = new ArrayList<>();
+
+ for (Map.Entry entry : currentOffsets.entrySet()) {
+ topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
+ }
+
+ consumer.assignPartitions(topicPartitions);
+
+ for (Map.Entry entry : currentOffsets.entrySet()) {
+ Long storedOffset = 0L;
+ Integer currentPartition = entry.getKey();
+ Long currentOffset = entry.getValue();
+
+ if (storedOffsets.containsKey(currentPartition)) {
+ storedOffset = storedOffsets.get(currentPartition);
+ }
+
+ if (storedOffset >= currentOffset) {
+ continue;
+ }
+
+ try {
+ consumer.seekToOffset(new TopicPartition(getTopic(), currentPartition), storedOffset);
+ } catch (Exception ex) {
+ logger.info("Rebuilding of the partial window is not complete, exactly once recovery is not possible.");
+ throw new RuntimeException(ex);
+ }
+
+ int kafkaAttempt = 0;
+
+ while (true) {
+
+ ConsumerRecords consumerRecords = consumer.pollRecords(100);
+
+ if (consumerRecords.count() == 0) {
+ if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
+ break;
+ }
+ } else {
+ kafkaAttempt = 0;
+ }
+
+ boolean crossedBoundary = false;
+
+ for (ConsumerRecord consumerRecord : consumerRecords) {
+
+ if (consumerRecord.offset() >= currentOffset) {
+ crossedBoundary = true;
+ break;
+ }
+
+ if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
+ continue;
+ }
+
+ T value = consumerRecord.value();
+
+ if (partialWindowTuples.containsKey(value)) {
+ Integer count = partialWindowTuples.get(value);
+ partialWindowTuples.put(value, count + 1);
+ } else {
+ partialWindowTuples.put(value, 1);
+ }
+
+ }
+
+ if (crossedBoundary) {
+ break;
+ }
+ }
+ }
+ }
+
+ private AbstractKafkaConsumer KafkaConsumerInit()
+ {
+ Properties props = new Properties();
+
+ props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG));
+ props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, getProperties().get(VALUE_DESERIALIZER_CLASS_CONFIG));
+
+ return createConsumer(props);
+ }
+
+ protected void sendTuple(T tuple)
+ {
+ if (alreadyInKafka(tuple)) {
+ return;
+ }
+
+ getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
+ {
+ public void onCompletion(RecordMetadata metadata, Exception e)
+ {
+ if (e != null) {
+ logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaExactlyOnceOutputOperator.class);
+}
+
diff --git a/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractEmbeddedKafka.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractEmbeddedKafka.java
new file mode 100644
index 0000000000..cd690625d7
--- /dev/null
+++ b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractEmbeddedKafka.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public abstract class AbstractEmbeddedKafka
+{
+ private static final String[] KAFKA_PATH = new String[]{"/tmp/kafka-test1","/tmp/kafka-test2"};
+
+ private ZkClient[] zkClient = new ZkClient[2];
+ private ZkUtils[] zkUtils = new ZkUtils[2];
+ private String BROKERHOST = "localhost";
+ private ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
+ private KafkaServer[] kafkaServer = new KafkaServer[2];
+ public static int[] TEST_ZOOKEEPER_PORT;
+ public static int[] TEST_KAFKA_BROKER_PORT;
+ public static String baseDir = "target";
+ private int clusterId = 0;
+ public static final String TEST_TOPIC = "testtopic";
+ public static int testCounter = 0;
+ public static final String END_TUPLE = "END_TUPLE";
+
+ private static final String zkBaseDir = "zookeeper-server-data";
+ private static final String kafkaBaseDir = "kafka-server-data";
+ private static final String[] zkdir = new String[]{"zookeeper-server-data1", "zookeeper-server-data2"};
+ private static final String[] zklogdir = new String[]{"zookeeper-log-data1", "zookeeper-log-data2"};
+ private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
+ static final org.slf4j.Logger logger = LoggerFactory.getLogger(AbstractEmbeddedKafka.class);
+
+ // get available ports
+ private void getAvailablePorts()
+ {
+ ServerSocket[] listeners = new ServerSocket[4];
+ int[] p = new int[4];
+
+ try {
+ for (int i = 0; i < 4; i++) {
+ listeners[i] = new ServerSocket(0);
+ p[i] = listeners[i].getLocalPort();
+ }
+
+ for (int i = 0; i < 4; i++) {
+ listeners[i].close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
+ TEST_KAFKA_BROKER_PORT = new int[]{p[2], p[3]};
+ }
+
+ public void startZookeeper(int clusterId)
+ {
+ try {
+ int numConnections = 100;
+ int tickTime = 2000;
+ File snapshotDir;
+ File logDir;
+ try {
+ snapshotDir = java.nio.file.Files.createTempDirectory(zkdir[clusterId]).toFile();
+ logDir = java.nio.file.Files.createTempDirectory(zklogdir[clusterId]).toFile();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to start Kafka", e);
+ }
+
+ snapshotDir.deleteOnExit();
+ logDir.deleteOnExit();
+ zkServer[clusterId] = new ZooKeeperServer(snapshotDir, logDir, tickTime);
+ zkFactory[clusterId] = new NIOServerCnxnFactory();
+ zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
+
+ zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
+ Thread.sleep(2000);
+ //kserver.startup();
+ } catch (Exception ex) {
+ logger.error(ex.getLocalizedMessage());
+ }
+ }
+
+ public void stopZookeeper(int clusterId)
+ {
+ zkServer[clusterId].shutdown();
+ zkFactory[clusterId].closeAll();
+ zkFactory[clusterId].shutdown();
+ }
+
+ public String getBroker()
+ {
+ return getBroker(clusterId);
+ }
+
+ public String getBroker(int clusterId)
+ {
+ return BROKERHOST + ":" + TEST_KAFKA_BROKER_PORT[clusterId];
+ }
+
+ public void start() throws IOException
+ {
+ getAvailablePorts();
+ FileUtils.deleteDirectory(new File(KAFKA_PATH[0]));
+ FileUtils.deleteDirectory(new File(KAFKA_PATH[1]));
+ // Setup Zookeeper
+ startZookeeper(0);
+ startZookeeper(1);
+ // Setup brokers
+ cleanupDir();
+ startBroker(0);
+ startBroker(1);
+ }
+
+ public abstract KafkaServer createKafkaServer(Properties prop);
+
+ public void startBroker(int clusterId)
+ {
+ String zkConnect = BROKERHOST + ":" + TEST_ZOOKEEPER_PORT[clusterId];
+ zkClient[clusterId] = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils[clusterId] = ZkUtils.apply(zkClient[clusterId], false);
+
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", zkConnect);
+ props.setProperty("broker.id", "" + clusterId);
+ props.setProperty("log.dirs", KAFKA_PATH[clusterId]);
+ props.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + TEST_KAFKA_BROKER_PORT[clusterId]);
+ kafkaServer[clusterId] = createKafkaServer(props);
+
+ }
+
+ public void stopBroker(int clusterId)
+ {
+ kafkaServer[clusterId].shutdown();
+ zkClient[clusterId].close();
+ }
+
+ public void stop() throws IOException
+ {
+ stopBroker(0);
+ stopBroker(1);
+ stopZookeeper(0);
+ stopZookeeper(1);
+ cleanupDir();
+ }
+
+ private void cleanupDir() throws IOException
+ {
+ FileUtils.deleteDirectory(new File(KAFKA_PATH[0]));
+ FileUtils.deleteDirectory(new File(KAFKA_PATH[1]));
+ }
+
+ public abstract void createTopic(String topic, ZkUtils zkUtils, int noOfPartitions);
+
+ public void createTopic(String topic)
+ {
+ createTopic(topic, clusterId, 1);
+ }
+
+ public void createTopic(String topic, int clusterId, int numOfPartitions)
+ {
+ createTopic(topic, zkUtils[clusterId], numOfPartitions);
+ List servers = new ArrayList();
+ servers.add(kafkaServer[clusterId]);
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+ }
+
+ public void publish(String topic, List messages)
+ {
+ Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + TEST_KAFKA_BROKER_PORT[clusterId]);
+ producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+ try (KafkaProducer producer = new KafkaProducer<>(producerProps)) {
+ for (String message : messages) {
+ ProducerRecord data = new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8));
+ producer.send(data);
+ }
+ }
+
+ List servers = new ArrayList();
+ servers.add(kafkaServer[clusterId]);
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+ }
+
+ public List consume(String topic, int timeout)
+ {
+ return consume(topic, timeout, true);
+ }
+
+ public List consume(String topic, int timeout, boolean earliest)
+ {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + TEST_KAFKA_BROKER_PORT[clusterId]);
+ consumerProps.setProperty("group.id", "group0");
+ consumerProps.setProperty("client.id", "consumer0");
+ consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ // to make sure the consumer starts from the beginning of the topic
+ consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(topic));
+
+ List messages = new ArrayList<>();
+
+ ConsumerRecords records = consumer.poll(timeout);
+ for (ConsumerRecord record : records) {
+ messages.add(new String(record.value()));
+ }
+
+ consumer.close();
+
+ return messages;
+ }
+}
diff --git a/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperatorTest.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperatorTest.java
new file mode 100644
index 0000000000..2c3aedda6a
--- /dev/null
+++ b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperatorTest.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * A bunch of test to verify the input operator will be automatically partitioned
+ * per kafka partition This test is launching its
+ * own Kafka cluster.
+ */
+public abstract class AbstractKafkaInputOperatorTest
+{
+
+ private int totalBrokers = 0;
+
+ private String partition = null;
+
+ public String testName = "";
+ protected boolean hasMultiPartition = false;
+ protected boolean hasMultiCluster = false;
+
+ public static String APPLICATION_PATH = AbstractEmbeddedKafka.baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
+
+ public class KafkaTestInfo extends TestWatcher
+ {
+ public org.junit.runner.Description desc;
+
+ public String getDir()
+ {
+ String methodName = desc.getMethodName();
+ String className = desc.getClassName();
+ return "target/" + className + "/" + methodName + "/" + testName;
+ }
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ this.desc = description;
+ }
+ }
+
+ public static AbstractEmbeddedKafka kafkaServer;
+
+ @Rule
+ public final KafkaTestInfo testInfo = new KafkaTestInfo();
+
+ @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+ public static Collection testScenario()
+ {
+ return Arrays.asList(new Object[][]{
+ {true, false, "one_to_one"},// multi cluster with single partition
+ {true, false, "one_to_many"},
+ {true, true, "one_to_one"},// multi cluster with multi partitions
+ {true, true, "one_to_many"},
+ {false, true, "one_to_one"}, // single cluster with multi partitions
+ {false, true, "one_to_many"},
+ {false, false, "one_to_one"}, // single cluster with single partitions
+ {false, false, "one_to_many"}
+ });
+ }
+
+ @Before
+ public void before()
+ {
+ testName = AbstractEmbeddedKafka.TEST_TOPIC + AbstractEmbeddedKafka.testCounter++;
+ logger.info("before() test case: {}", testName);
+ tupleCollection.clear();
+ //reset count for next new test case
+ k = 0;
+
+ int noOfPartitions = hasMultiPartition ? 2 : 1;
+ kafkaServer.createTopic(testName, 0, noOfPartitions);
+ if (hasMultiCluster) {
+ kafkaServer.createTopic(testName, 1, noOfPartitions);
+ }
+ }
+
+ public AbstractKafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
+ {
+ // This class want to initialize several kafka brokers for multiple partitions
+ this.hasMultiCluster = hasMultiCluster;
+ this.hasMultiPartition = hasMultiPartition;
+ int cluster = 1 + (hasMultiCluster ? 1 : 0);
+ totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+ this.partition = partition;
+ }
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperatorTest.class);
+ private static List tupleCollection = new LinkedList<>();
+
+ /**
+ * whether countDown latch count all tuples or just END_TUPLE
+ */
+ private static final boolean countDownAll = false;
+ private static final int scale = 2;
+ private static final int totalCount = 10 * scale;
+ private static final int failureTrigger = 3 * scale;
+ private static final int tuplesPerWindow = 5 * scale;
+ private static final int waitTime = 60000 + 300 * scale;
+
+ //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
+ //so, count valid tuple instead.
+ private static CountDownLatch latch;
+ private static boolean hasFailure = false;
+ private static int k = 0;
+ private static Thread monitorThread;
+
+ /**
+ * Test Operator to collect tuples from KafkaSingleInputStringOperator.
+ *
+ * @param
+ */
+ public static class CollectorModule extends BaseOperator
+ {
+ public final transient DefaultInputPort inputPort = new DefaultInputPort()
+ {
+ @Override
+ public void process(byte[] bt)
+ {
+ processTuple(bt);
+ }
+ };
+
+ long currentWindowId;
+
+ long operatorId;
+
+ boolean isIdempotentTest = false;
+
+ transient List windowTupleCollector = Lists.newArrayList();
+ private transient Map> tupleCollectedInWindow = new HashMap<>();
+ private int endTuples = 0;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ currentWindowId = windowId;
+ windowTupleCollector.clear();
+ endTuples = 0;
+ }
+
+ public void processTuple(byte[] bt)
+ {
+ String tuple = new String(bt);
+ if (hasFailure && k++ == failureTrigger) {
+ //you can only kill yourself once
+ hasFailure = false;
+ throw new RuntimeException();
+ }
+ if (tuple.startsWith(AbstractEmbeddedKafka.END_TUPLE)) {
+ endTuples++;
+ }
+
+ windowTupleCollector.add(tuple);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ if (isIdempotentTest) {
+ String key = operatorId + "," + currentWindowId;
+ List msgsInWin = tupleCollectedInWindow.get(key);
+ if (msgsInWin != null) {
+ Assert.assertEquals(
+ "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
+ } else {
+ List newList = Lists.newArrayList();
+ newList.addAll(windowTupleCollector);
+ tupleCollectedInWindow.put(key, newList);
+ }
+ }
+
+ //discard the tuples of this window if except happened
+ int tupleSize = windowTupleCollector.size();
+ tupleCollection.addAll(windowTupleCollector);
+
+ int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+
+ if (latch != null) {
+ Assert.assertTrue(
+ "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
+ // If it is a failure test, then clear the tupleCollection and don't decrement the count of latch.
+ // Because tupleCollection and latch are static global values
+ if (hasFailure && latch.getCount() != countDownTupleSize) {
+ tupleCollection.clear();
+ return;
+ }
+ while (countDownTupleSize > 0) {
+ latch.countDown();
+ --countDownTupleSize;
+ }
+ if (latch.getCount() == 0) {
+ /**
+ * The time before countDown() and the shutdown() of the application
+ * will cause fatal error:
+ * "Catastrophic Error: Invalid State - the operator blocked forever!"
+ * as the activeQueues could be cleared but alive haven't changed yet.
+ * throw the ShutdownException to let the engine shutdown;
+ */
+ try {
+ throw new ShutdownException();
+ //lc.shutdown();
+ } finally {
+ /**
+ * interrupt the engine thread, let it wake from sleep and handle
+ * the shutdown at this time, all payload should be handled. so it
+ * should be ok to interrupt
+ */
+ monitorThread.interrupt();
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
+ * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
+ *
+ * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
+ * consumer) and send using emitTuples() interface on output port]
+ *
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testInputOperator() throws Exception
+ {
+ hasFailure = false;
+ testInputOperator(false, false);
+ }
+
+ @Test
+ public void testInputOperatorWithFailure() throws Exception
+ {
+ hasFailure = true;
+ testInputOperator(true, false);
+ }
+
+ @Test
+ public void testIdempotentInputOperatorWithFailure() throws Exception
+ {
+ hasFailure = true;
+ testInputOperator(true, true);
+ }
+
+ public abstract AbstractKafkaInputOperator createKafkaInputOperator(DAG dag, DefaultInputPort inputPort);
+
+ public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
+ {
+ // each broker should get a END_TUPLE message
+ latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
+ logger.info(
+ "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
+ " hasMultiPartition: {}, partition: {}",
+ testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
+
+ // Start producer
+ KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster, kafkaServer);
+ p.setSendCount(totalCount);
+ Thread t = new Thread(p);
+ t.start();
+
+ int expectedReceiveCount = totalCount + totalBrokers;
+
+ // Create DAG for testing.
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ // Create Test tuple collector
+ CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
+ collector.isIdempotentTest = idempotent;
+
+ // Create KafkaSinglePortStringInputOperator
+ AbstractKafkaInputOperator node = createKafkaInputOperator(dag, collector.inputPort);
+ node.setInitialPartitionCount(1);
+ // set topic
+ node.setTopics(testName);
+ node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ node.setClusters(getClusterConfig());
+ node.setStrategy(partition);
+ if (idempotent) {
+ node.setWindowDataManager(new FSWindowDataManager());
+ }
+
+ if (hasFailure) {
+ setupHasFailureTest(node, dag);
+ }
+
+ // Create local cluster
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
+ //but Controller.runAsync() don't expose the thread which run it,
+ //so we don't know when the thread will be terminated.
+ //create this thread and then call join() to make sure the Controller shutdown completely.
+ monitorThread = new Thread((StramLocalCluster)lc, "master");
+ monitorThread.start();
+
+ boolean notTimeout = true;
+ try {
+ // Wait 60s for consumer finish consuming all the messages
+ notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
+ lc.shutdown();
+
+ //wait until control thread finished.
+ monitorThread.join();
+ } catch (Exception e) {
+ logger.warn(e.getMessage());
+ }
+
+ t.join();
+
+ if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
+ logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
+ expectedReceiveCount, testName, tupleCollection);
+ }
+ Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
+ + tupleCollection, notTimeout);
+
+ // Check results
+ Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
+ + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
+ expectedReceiveCount == tupleCollection.size());
+
+ logger.info("End of test case: {}", testName);
+ }
+
+ private void setupHasFailureTest(AbstractKafkaInputOperator operator, DAG dag)
+ {
+ operator.setHoldingBufferSize(5000);
+ dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+ //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
+ // APPLICATION_PATH + "failureck", new Configuration()));
+ operator.setMaxTuplesPerWindow(tuplesPerWindow);
+ }
+
+ private String getClusterConfig()
+ {
+ return kafkaServer.getBroker(0) +
+ (hasMultiCluster ? ";" + kafkaServer.getBroker(1) : "");
+ }
+
+}
diff --git a/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperatorTest.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperatorTest.java
new file mode 100644
index 0000000000..3bfeab5132
--- /dev/null
+++ b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperatorTest.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+public abstract class AbstractKafkaOutputOperatorTest
+{
+ String testName;
+ private static List tupleCollection = new LinkedList<>();
+ public static AbstractEmbeddedKafka kafkaserver;
+ private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
+ private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
+ protected boolean hasMultiCluster = false;
+
+ public static String APPLICATION_PATH = AbstractEmbeddedKafka.baseDir + File.separator + "MyKafkaApp" + File.separator;
+
+ @Before
+ public void before()
+ {
+ FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+ testName = AbstractEmbeddedKafka.TEST_TOPIC + AbstractEmbeddedKafka.testCounter++;
+ kafkaserver.createTopic(testName, 0, 1);
+ if (hasMultiCluster) {
+ kafkaserver.createTopic(testName, 1, 1);
+ }
+ }
+
+ @After
+ public void after()
+ {
+ FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+ }
+
+ @Test
+ public void testExactlyOnceWithFailure() throws Exception
+ {
+ List toKafka = GenerateList();
+
+ sendDataToKafka(true, toKafka, true, false);
+
+ List fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
+ }
+
+ @Test
+ public void testExactlyOnceWithNoFailure() throws Exception
+ {
+ List toKafka = GenerateList();
+
+ sendDataToKafka(true, toKafka, false, false);
+
+ List fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
+ }
+
+ @Test
+ public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
+ {
+ List toKafka = GenerateList();
+
+ try {
+ sendDataToKafka(true, toKafka, true, true);
+ } catch (RuntimeException ex) {
+
+ boolean expectedException = false;
+ if (ex.getMessage().contains("Violates")) {
+ expectedException = true;
+ }
+
+ Assert.assertTrue("Different tuples after recovery", expectedException);
+ return;
+ }
+
+ Assert.assertTrue("Wrong tuples during replay, should throw exception", false);
+ }
+
+ @Test
+ public void testKafkaOutput() throws Exception
+ {
+ List toKafka = GenerateList();
+
+ sendDataToKafka(false, toKafka, false, false);
+
+ List fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("No failure", compare(fromKafka, toKafka));
+ }
+
+ @Test
+ public void testKafkaOutputWithFailure() throws Exception
+ {
+ List toKafka = GenerateList();
+
+ sendDataToKafka(false, toKafka, true, true);
+
+ List fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
+ }
+
+ private void sendDataToKafka(boolean exactlyOnce, List toKafka, boolean hasFailure,
+ boolean differentTuplesAfterRecovery) throws InterruptedException
+ {
+ Properties props = new Properties();
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
+ if (!exactlyOnce) {
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AbstractKafkaExactlyOnceOutputOperator.KEY_SERIALIZER);
+ }
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
+ attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
+
+ OperatorContext operatorContext = mockOperatorContext(2, attributeMap);
+
+ cleanUp(operatorContext);
+
+ Operator kafkaOutput;
+ DefaultInputPort inputPort;
+
+ if (exactlyOnce) {
+ AbstractKafkaExactlyOnceOutputOperator kafkaOutputTemp =
+ ResetKafkaOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ } else {
+ KafkaSinglePortOutputOperator kafkaOutputTemp =
+ ResetKafkaSimpleOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ }
+
+ kafkaOutput.beginWindow(1);
+ inputPort.getSink().put(toKafka.get(0));
+ inputPort.getSink().put(toKafka.get(1));
+ inputPort.getSink().put(toKafka.get(2));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(2);
+ inputPort.getSink().put(toKafka.get(3));
+ inputPort.getSink().put(toKafka.get(4));
+ inputPort.getSink().put(toKafka.get(5));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(3);
+ inputPort.getSink().put(toKafka.get(6));
+ inputPort.getSink().put(toKafka.get(7));
+
+ if (hasFailure) {
+ if (exactlyOnce) {
+ AbstractKafkaExactlyOnceOutputOperator kafkaOutputTemp =
+ ResetKafkaOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ } else {
+ KafkaSinglePortOutputOperator kafkaOutputTemp =
+ ResetKafkaSimpleOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ }
+
+ kafkaOutput.beginWindow(2);
+ inputPort.getSink().put(toKafka.get(3));
+ inputPort.getSink().put(toKafka.get(4));
+ inputPort.getSink().put(toKafka.get(5));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(3);
+ inputPort.getSink().put(toKafka.get(6));
+
+ if (!differentTuplesAfterRecovery) {
+ inputPort.getSink().put(toKafka.get(7));
+ }
+ }
+
+ inputPort.getSink().put(toKafka.get(8));
+ inputPort.getSink().put(toKafka.get(9));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(4);
+ inputPort.getSink().put(toKafka.get(10));
+ inputPort.getSink().put(toKafka.get(11));
+ kafkaOutput.endWindow();
+
+ cleanUp(operatorContext);
+ }
+
+ public abstract AbstractKafkaExactlyOnceOutputOperator createExaactlyOnceOutputOperator();
+
+ public abstract AbstractKafkaInputOperator createKafkaInputOperator(DAG dag, DefaultInputPort inputPort);
+
+ private AbstractKafkaExactlyOnceOutputOperator ResetKafkaOutput(
+ String testName, Properties props, OperatorContext operatorContext)
+ {
+ AbstractKafkaExactlyOnceOutputOperator kafkaOutput = createExaactlyOnceOutputOperator();
+ kafkaOutput.setTopic(testName);
+ kafkaOutput.setProperties(props);
+ kafkaOutput.setup(operatorContext);
+
+ return kafkaOutput;
+ }
+
+ private KafkaSinglePortOutputOperator ResetKafkaSimpleOutput(
+ String testName, Properties props, OperatorContext operatorContext)
+ {
+ KafkaSinglePortOutputOperator kafkaOutput = new KafkaSinglePortOutputOperator<>();
+ kafkaOutput.setTopic(testName);
+ kafkaOutput.setProperties(props);
+ kafkaOutput.setup(operatorContext);
+
+ return kafkaOutput;
+ }
+
+ private void cleanUp(OperatorContext operatorContext)
+ {
+ FSWindowDataManager windowDataManager = new FSWindowDataManager();
+ windowDataManager.setup(operatorContext);
+ try {
+ windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean compare(List fromKafka, List toKafka)
+ {
+ if (fromKafka.size() != toKafka.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < fromKafka.size(); ++i) {
+ if (!fromKafka.get(i).equals(toKafka.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private String getClusterConfig()
+ {
+ return kafkaserver.getBroker(0) +
+ (hasMultiCluster ? ";" + kafkaserver.getBroker(1) : "");
+ }
+
+ private List GenerateList()
+ {
+ List people = new ArrayList<>();
+
+ for (Integer i = 0; i < 12; ++i) {
+ people.add(new Person(i.toString(), i));
+ }
+
+ return people;
+ }
+
+ private List ReadFromKafka()
+ {
+ tupleCollection.clear();
+
+ // Create KafkaSinglePortStringInputOperator
+ Properties props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+ props.put(KEY_DESERIALIZER_CLASS_CONFIG, AbstractKafkaExactlyOnceOutputOperator.KEY_DESERIALIZER);
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+ props.put(GROUP_ID_CONFIG, "KafkaTest");
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ // Create Test tuple collector
+ CollectorModule collector1 = dag.addOperator("collector", new CollectorModule());
+
+ // Create KafkaSinglePortStringInputOperator
+ AbstractKafkaInputOperator node = createKafkaInputOperator(dag, collector1.inputPort);
+ node.setConsumerProps(props);
+ node.setInitialPartitionCount(1);
+ // set topic
+ node.setTopics(testName);
+ node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ node.setClusters(getClusterConfig());
+ node.setStrategy("one_to_one");
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ lc.run(30000);
+
+ return tupleCollection;
+ }
+
+ public static class CollectorModule extends BaseOperator
+ {
+ public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
+
+ long currentWindowId;
+ long operatorId;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ currentWindowId = windowId;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ }
+ }
+
+ public static class CollectorInputPort extends DefaultInputPort
+ {
+ CollectorModule ownerNode;
+
+ CollectorInputPort(CollectorModule node)
+ {
+ this.ownerNode = node;
+ }
+
+ @Override
+ public void process(byte[] bt)
+ {
+ tupleCollection.add(new KafkaHelper().deserialize("r", bt));
+ }
+ }
+
+ public static class Person
+ {
+ public String name;
+ public Integer age;
+
+ public Person(String name, Integer age)
+ {
+ this.name = name;
+ this.age = age;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Person person = (Person)o;
+
+ if (name != null ? !name.equals(person.name) : person.name != null) {
+ return false;
+ }
+
+ return age != null ? age.equals(person.age) : person.age == null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + (age != null ? age.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return name + age.toString();
+ }
+ }
+}
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
similarity index 79%
rename from kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
rename to kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
index abf3a5b849..e8d4158fec 100644
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
+++ b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
@@ -25,11 +25,11 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-public class KafkaHelper implements Serializer,
- Deserializer
+public class KafkaHelper implements Serializer,
+ Deserializer
{
@Override
- public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
+ public AbstractKafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
{
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
int nameLength = byteBuffer.getInt();
@@ -37,11 +37,11 @@ public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
byteBuffer.get(name, 0, nameLength);
- return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt());
+ return new AbstractKafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt());
}
@Override
- public byte[] serialize(String s, KafkaOutputOperatorTest.Person person)
+ public byte[] serialize(String s, AbstractKafkaOutputOperatorTest.Person person)
{
byte[] name = person.name.getBytes();
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
similarity index 100%
rename from kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
rename to kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
similarity index 84%
rename from kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
rename to kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
index 322f070779..ae4fffd6c4 100644
--- a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -68,13 +68,18 @@ public void setMessages(List messages)
}
private Properties createProducerConfig(int cid)
+ {
+ return createProducerConfig(cid, "localhost:" + AbstractEmbeddedKafka.TEST_KAFKA_BROKER_PORT[cid]);
+ }
+
+ private Properties createProducerConfig(int cid, String brokerId)
{
Properties props = new Properties();
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
- String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid];
- brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid]) : "";
+ String brokerList = brokerId;
+ brokerList += hasPartition ? (",localhost:" + AbstractEmbeddedKafka.TEST_KAFKA_BROKER_PORT[cid]) : "";
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
@@ -104,6 +109,21 @@ public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiClu
}
}
+ public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster, AbstractEmbeddedKafka server)
+ {
+ // Use random partitioner. Don't need the key type. Just set it to Integer.
+ // The message is of type String.
+ this.topic = topic;
+ this.hasPartition = hasPartition;
+ this.hasMultiCluster = hasMultiCluster;
+ producer = new KafkaProducer<>(createProducerConfig(0, server.getBroker(0)));
+ if (hasMultiCluster) {
+ producer1 = new KafkaProducer<>(createProducerConfig(1, server.getBroker(1)));
+ } else {
+ producer1 = null;
+ }
+ }
+
public KafkaTestProducer(String topic, boolean hasPartition)
{
this(topic, hasPartition, false);
@@ -127,9 +147,9 @@ private void generateMessages()
}
// produce the end tuple to let the test input operator know it's done produce messages
for (int i = 0; i < (hasPartition ? 2 : 1); ++i) {
- sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
+ sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, AbstractEmbeddedKafka.END_TUPLE)));
if (hasMultiCluster) {
- sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
+ sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, AbstractEmbeddedKafka.END_TUPLE)));
}
}
}
diff --git a/kafka/kafka010/pom.xml b/kafka/kafka010/pom.xml
index 02be49615f..f38b95895c 100755
--- a/kafka/kafka010/pom.xml
+++ b/kafka/kafka010/pom.xml
@@ -94,5 +94,12 @@
test
test
+
+ org.apache.kafka
+ kafka-clients
+ 0.10.2.1
+ test
+ test
+
diff --git a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java
index fb4115a685..3a45e2cd12 100644
--- a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java
+++ b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java
@@ -31,16 +31,17 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
/**
* Wrapper for 0.10.x version of Kafka consumer
*/
@InterfaceStability.Evolving
-public class KafkaConsumer010 implements AbstractKafkaConsumer
+public class KafkaConsumer010 implements AbstractKafkaConsumer
{
- private KafkaConsumer consumer;
+ private KafkaConsumer consumer;
public KafkaConsumer010(Properties properties)
{
@@ -75,7 +76,7 @@ public void seekToOffset(TopicPartition topicPartition, long offset)
* @return records
*/
@Override
- public ConsumerRecords pollRecords(long timeOut)
+ public ConsumerRecords pollRecords(long timeOut)
{
return consumer.poll(timeOut);
}
@@ -197,4 +198,10 @@ public long positionPartition(TopicPartition tp)
{
return consumer.position(tp);
}
+
+ @Override
+ public List partitionsFor(String topic)
+ {
+ return consumer.partitionsFor(topic);
+ }
}
diff --git a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
new file mode 100644
index 0000000000..19c98c19d0
--- /dev/null
+++ b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+
+/**
+ * Kafka output operator with exactly once processing semantics.
+ *
+ *
+ *
+ * Requirements
+ *
In the Kafka message, only Value will be available for users
+ * Users need to provide Value deserializers for Kafka message as it is used during recovery
+ * Value type should have well defined Equals & HashCodes,
+ * as during messages are stored in HashMaps for comparison.
+ *
+ * Recovery handling
+ *
Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow
+ * During recovery,
+ *
+ * Partially written Streaming Window before the crash is constructed. ( Explained below )
+ * Tuples from the completed Streaming Window's are skipped
+ * Tuples coming for the partially written Streaming Window are skipped.
+ * (No assumption is made on the order and the uniqueness of the tuples)
+ *
+ *
+ *
+ *
+ *
+ * Partial Window Construction
+ *
Operator uses the Key in the Kafka message, which is not available for use by the operator users.
+ * Key is used to uniquely identify the message written by the particular instance of this operator.
+ * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
+ * During recovery Kafka partitions are read between the latest offset and the last written offsets.
+ * All the tuples written by the particular instance is kept in the Map
+ *
+ *
+ *
+ * Limitations
+ *
Key in the Kafka message is reserved for Operator's use
+ * During recovery, operator needs to read tuples between 2 offsets,
+ * if there are lot of data to be read, Operator may
+ * appear to be blocked to the Stram and can kill the operator.
+ *
+ *
+ * @displayName Kafka Single Port Exactly Once Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ * @since 3.5.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaExactlyOnceOutputOperator
+{
+
+ @Override
+ public AbstractKafkaConsumer createConsumer(Properties prop)
+ {
+ return new KafkaConsumer010(prop);
+ }
+}
diff --git a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
index f314660a30..bdc3f28b48 100644
--- a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
+++ b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
@@ -42,7 +42,7 @@ public class KafkaSinglePortInputOperator extends AbstractKafkaInputOperator
@Override
public AbstractKafkaConsumer createConsumer(Properties prop)
{
- return new KafkaConsumer010(prop);
+ return new KafkaConsumer010(prop);
}
/**
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
similarity index 52%
rename from kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
rename to kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
index 6098bde06c..aca210550d 100644
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
@@ -18,47 +18,29 @@
*/
package org.apache.apex.malhar.kafka;
-import java.util.Map;
+import java.util.Properties;
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.utils.SystemTime;
-import kafka.utils.VerifiableProperties;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
-/**
- * A simple partitioner class for test purpose
- * Key is a int string
- * Messages are distributed to all partitions
- * One for even number, the other for odd
- */
-public class KafkaTestPartitioner implements Partitioner
+public class EmbeddedKafka extends AbstractEmbeddedKafka
{
- public KafkaTestPartitioner(VerifiableProperties props)
- {
-
- }
-
- public KafkaTestPartitioner()
- {
-
- }
-
- @Override
- public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
- {
- int num_partitions = cluster.partitionsForTopic(topic).size();
- return Integer.parseInt((String)key) % num_partitions;
- }
-
@Override
- public void close()
+ public KafkaServer createKafkaServer(Properties prop)
{
-
+ KafkaConfig config = new KafkaConfig(prop);
+ return TestUtils.createServer(config, new SystemTime());
}
@Override
- public void configure(Map map)
+ public void createTopic(String topic, ZkUtils zkUtils, int noOfPartitions)
{
-
+ AdminUtils.createTopic(zkUtils, topic, noOfPartitions, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
}
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 09d878d25d..dae839e605 100644
--- a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -18,36 +18,14 @@
*/
package org.apache.apex.malhar.kafka;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
+import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.stram.StramLocalCluster;
/**
* A bunch of test to verify the input operator will be automatically partitioned
@@ -55,343 +33,43 @@
* own Kafka cluster.
*/
@RunWith(Parameterized.class)
-public class KafkaInputOperatorTest extends KafkaOperatorTestBase
+public class KafkaInputOperatorTest extends AbstractKafkaInputOperatorTest
{
-
- private int totalBrokers = 0;
-
- private String partition = null;
-
- private String testName = "";
-
- public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
-
- public class KafkaTestInfo extends TestWatcher
+ @BeforeClass
+ public static void beforeClass()
{
- public org.junit.runner.Description desc;
-
- public String getDir()
- {
- String methodName = desc.getMethodName();
- String className = desc.getClassName();
- return "target/" + className + "/" + methodName + "/" + testName;
- }
-
- @Override
- protected void starting(org.junit.runner.Description description)
- {
- this.desc = description;
+ try {
+ kafkaServer = new EmbeddedKafka();
+ kafkaServer.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
- @Rule
- public final KafkaTestInfo testInfo = new KafkaTestInfo();
-
- @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
- public static Collection testScenario()
- {
- return Arrays.asList(new Object[][]{
- {true, false, "one_to_one"},// multi cluster with single partition
- {true, false, "one_to_many"},
- {true, true, "one_to_one"},// multi cluster with multi partitions
- {true, true, "one_to_many"},
- {false, true, "one_to_one"}, // single cluster with multi partitions
- {false, true, "one_to_many"},
- {false, false, "one_to_one"}, // single cluster with single partitions
- {false, false, "one_to_many"}
- });
- }
-
- @Before
- public void before()
+ @AfterClass
+ public static void afterClass()
{
- testName = TEST_TOPIC + testCounter++;
- logger.info("before() test case: {}", testName);
- tupleCollection.clear();
- //reset count for next new test case
- k = 0;
-
- createTopic(0, testName);
- if (hasMultiCluster) {
- createTopic(1, testName);
+ try {
+ kafkaServer.stop();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
-
}
public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
{
- // This class want to initialize several kafka brokers for multiple partitions
- this.hasMultiCluster = hasMultiCluster;
- this.hasMultiPartition = hasMultiPartition;
- int cluster = 1 + (hasMultiCluster ? 1 : 0);
- totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
- this.partition = partition;
+ super(hasMultiCluster, hasMultiPartition, partition);
}
- private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
- private static List tupleCollection = new LinkedList<>();
-
- /**
- * whether countDown latch count all tuples or just END_TUPLE
- */
- private static final boolean countDownAll = false;
- private static final int scale = 2;
- private static final int totalCount = 10 * scale;
- private static final int failureTrigger = 3 * scale;
- private static final int tuplesPerWindow = 5 * scale;
- private static final int waitTime = 60000 + 300 * scale;
-
- //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
- //so, count valid tuple instead.
- private static CountDownLatch latch;
- private static boolean hasFailure = false;
- private static int k = 0;
- private static Thread monitorThread;
-
- /**
- * Test Operator to collect tuples from KafkaSingleInputStringOperator.
- *
- * @param
- */
- public static class CollectorModule extends BaseOperator
+ @Override
+ public AbstractKafkaInputOperator createKafkaInputOperator(DAG dag, DefaultInputPort inputPort)
{
- public final transient DefaultInputPort inputPort = new DefaultInputPort()
- {
- @Override
- public void process(byte[] bt)
- {
- processTuple(bt);
- }
- };
-
- long currentWindowId;
-
- long operatorId;
-
- boolean isIdempotentTest = false;
-
- transient List windowTupleCollector = Lists.newArrayList();
- private transient Map> tupleCollectedInWindow = new HashMap<>();
- private int endTuples = 0;
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- operatorId = context.getId();
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- super.beginWindow(windowId);
- currentWindowId = windowId;
- windowTupleCollector.clear();
- endTuples = 0;
- }
-
- public void processTuple(byte[] bt)
- {
- String tuple = new String(bt);
- if (hasFailure && k++ == failureTrigger) {
- //you can only kill yourself once
- hasFailure = false;
- throw new RuntimeException();
- }
- if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
- endTuples++;
- }
-
- windowTupleCollector.add(tuple);
- }
-
- @Override
- public void endWindow()
- {
- super.endWindow();
- if (isIdempotentTest) {
- String key = operatorId + "," + currentWindowId;
- List msgsInWin = tupleCollectedInWindow.get(key);
- if (msgsInWin != null) {
- Assert.assertEquals(
- "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
- } else {
- List newList = Lists.newArrayList();
- newList.addAll(windowTupleCollector);
- tupleCollectedInWindow.put(key, newList);
- }
- }
-
- //discard the tuples of this window if except happened
- int tupleSize = windowTupleCollector.size();
- tupleCollection.addAll(windowTupleCollector);
-
- int countDownTupleSize = countDownAll ? tupleSize : endTuples;
-
- if (latch != null) {
- Assert.assertTrue(
- "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
- while (countDownTupleSize > 0) {
- latch.countDown();
- --countDownTupleSize;
- }
- if (latch.getCount() == 0) {
- /**
- * The time before countDown() and the shutdown() of the application
- * will cause fatal error:
- * "Catastrophic Error: Invalid State - the operator blocked forever!"
- * as the activeQueues could be cleared but alive haven't changed yet.
- * throw the ShutdownException to let the engine shutdown;
- */
- try {
- throw new ShutdownException();
- //lc.shutdown();
- } finally {
- /**
- * interrupt the engine thread, let it wake from sleep and handle
- * the shutdown at this time, all payload should be handled. so it
- * should be ok to interrupt
- */
- monitorThread.interrupt();
- }
- }
- }
- }
-
- }
-
- /**
- * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
- * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
- *
- * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
- * consumer) and send using emitTuples() interface on output port]
- *
- *
- * @throws Exception
- */
- @Test
- public void testInputOperator() throws Exception
- {
- hasFailure = false;
- testInputOperator(false, false);
- }
-
- @Test
- public void testInputOperatorWithFailure() throws Exception
- {
- hasFailure = true;
- testInputOperator(true, false);
- }
-
- @Test
- public void testIdempotentInputOperatorWithFailure() throws Exception
- {
- hasFailure = true;
- testInputOperator(true, true);
- }
-
- public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
- {
- // each broker should get a END_TUPLE message
- latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
-
- logger.info(
- "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
- " hasMultiPartition: {}, partition: {}",
- testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
-
- // Start producer
- KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
- p.setSendCount(totalCount);
- Thread t = new Thread(p);
- t.start();
-
- int expectedReceiveCount = totalCount + totalBrokers;
-
- // Create DAG for testing.
- LocalMode lma = LocalMode.newInstance();
- DAG dag = lma.getDAG();
-
- // Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator(
"Kafka input" + testName, KafkaSinglePortInputOperator.class);
- node.setInitialPartitionCount(1);
- // set topic
- node.setTopics(testName);
- node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
- node.setClusters(getClusterConfig());
- node.setStrategy(partition);
- if (idempotent) {
- node.setWindowDataManager(new FSWindowDataManager());
- }
-
- // Create Test tuple collector
- CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
- collector.isIdempotentTest = idempotent;
// Connect ports
- dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
- .setLocality(Locality.CONTAINER_LOCAL);
-
- if (hasFailure) {
- setupHasFailureTest(node, dag);
- }
-
- // Create local cluster
- LocalMode.Controller lc = lma.getController();
- lc.setHeartbeatMonitoringEnabled(false);
-
- //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
- //but Controller.runAsync() don't expose the thread which run it,
- //so we don't know when the thread will be terminated.
- //create this thread and then call join() to make sure the Controller shutdown completely.
- monitorThread = new Thread((StramLocalCluster)lc, "master");
- monitorThread.start();
-
- boolean notTimeout = true;
- try {
- // Wait 60s for consumer finish consuming all the messages
- notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
- lc.shutdown();
-
- //wait until control thread finished.
- monitorThread.join();
- } catch (Exception e) {
- logger.warn(e.getMessage());
- }
-
- t.join();
-
- if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
- logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
- expectedReceiveCount, testName, tupleCollection);
- }
- Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
- + tupleCollection, notTimeout);
-
- // Check results
- Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
- + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
- expectedReceiveCount == tupleCollection.size());
-
- logger.info("End of test case: {}", testName);
- }
-
- private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
- {
- operator.setHoldingBufferSize(5000);
- dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
- //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
- // APPLICATION_PATH + "failureck", new Configuration()));
- operator.setMaxTuplesPerWindow(tuplesPerWindow);
+ dag.addStream("Kafka message" + testName, node.outputPort, inputPort)
+ .setLocality(Locality.CONTAINER_LOCAL);
+ return node;
}
-
- private String getClusterConfig()
- {
- String l = "localhost:";
- return l + TEST_KAFKA_BROKER_PORT[0] +
- (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
- }
-
}
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
deleted file mode 100644
index 235eeba615..0000000000
--- a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-import kafka.admin.TopicCommand;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
-
-/**
- * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
- * test, this class creates 2 kafka partitions
- */
-public class KafkaOperatorTestBase
-{
-
- public static final String END_TUPLE = "END_TUPLE";
- public static final int[] TEST_ZOOKEEPER_PORT;
- public static final int[] TEST_KAFKA_BROKER_PORT;
- public static final String TEST_TOPIC = "testtopic";
- public static int testCounter = 0;
-
- // get available ports
- static {
- ServerSocket[] listeners = new ServerSocket[6];
- int[] p = new int[6];
-
- try {
- for (int i = 0; i < 6; i++) {
- listeners[i] = new ServerSocket(0);
- p[i] = listeners[i].getLocalPort();
- }
-
- for (int i = 0; i < 6; i++) {
- listeners[i].close();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
- TEST_KAFKA_BROKER_PORT = new int[]{p[2], p[3]};
- }
-
- static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
- // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
-
- // multiple brokers in multiple cluster
- private static KafkaServerStartable[] broker = new KafkaServerStartable[2];
-
- // multiple cluster
- private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
-
- private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
-
- public static String baseDir = "target";
-
- private static final String zkBaseDir = "zookeeper-server-data";
- private static final String kafkaBaseDir = "kafka-server-data";
- private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
- private static final String[] kafkadir = new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"};
- protected boolean hasMultiPartition = false;
- protected boolean hasMultiCluster = false;
-
- public static void startZookeeper(final int clusterId)
- {
- try {
-
- int numConnections = 100;
- int tickTime = 2000;
- File dir = new File(baseDir, zkdir[clusterId]);
-
- zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
- zkFactory[clusterId] = new NIOServerCnxnFactory();
- zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
-
- zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
- Thread.sleep(2000);
- //kserver.startup();
- } catch (Exception ex) {
- logger.error(ex.getLocalizedMessage());
- }
- }
-
- public static void stopZookeeper()
- {
- for (ZooKeeperServer zs : zkServer) {
- if (zs != null) {
- zs.shutdown();
- }
- }
-
- for (ServerCnxnFactory zkf : zkFactory) {
- if (zkf != null) {
- zkf.closeAll();
- zkf.shutdown();
- }
- }
- zkServer = new ZooKeeperServer[2];
- zkFactory = new ServerCnxnFactory[2];
- }
-
- public static void startKafkaServer(int clusterid, int brokerid)
- {
- Properties props = new Properties();
- props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
- props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid]).toString());
- props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
- props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid]);
- props.setProperty("default.replication.factor", "1");
- // set this to 50000 to boost the performance so most test data are in memory before flush to disk
- props.setProperty("log.flush.interval.messages", "50000");
-
- broker[clusterid] = new KafkaServerStartable(new KafkaConfig(props));
- broker[clusterid].startup();
-
- }
-
- public static void startKafkaServer()
- {
-
- FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
- //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
- // new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
- startKafkaServer(0, 0);
- //startKafkaServer(0, 1);
- startKafkaServer(1, 0);
- //startKafkaServer(1, 1);
-
- // startup is asynch operation. wait 2 sec for server to startup
-
- }
-
- public static void stopKafkaServer()
- {
- for (int i = 0; i < broker.length; i++) {
- if (broker[i] != null) {
- broker[i].shutdown();
- broker[i].awaitShutdown();
- broker[i] = null;
- }
- }
- }
-
- @BeforeClass
- public static void beforeTest()
- {
- try {
- startZookeeper();
- startKafkaServer();
- } catch (java.nio.channels.CancelledKeyException ex) {
- logger.debug("LSHIL {}", ex.getLocalizedMessage());
- }
- }
-
- public static void startZookeeper()
- {
- FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
- startZookeeper(0);
- startZookeeper(1);
- }
-
- public void createTopic(int clusterid, String topicName)
- {
- String[] args = new String[9];
- args[0] = "--zookeeper";
- args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
- args[2] = "--replication-factor";
- args[3] = "1";
- args[4] = "--partitions";
- if (hasMultiPartition) {
- args[5] = "2";
- } else {
- args[5] = "1";
- }
- args[6] = "--topic";
- args[7] = topicName;
- args[8] = "--create";
-
- ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false);
- TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
-
- }
-
- @AfterClass
- public static void afterTest()
- {
- try {
- stopKafkaServer();
- stopZookeeper();
- } catch (Exception ex) {
- logger.debug("LSHIL {}", ex.getLocalizedMessage());
- }
- }
-
- public void setHasMultiPartition(boolean hasMultiPartition)
- {
- this.hasMultiPartition = hasMultiPartition;
- }
-
- public void setHasMultiCluster(boolean hasMultiCluster)
- {
- this.hasMultiCluster = hasMultiCluster;
- }
-
- public static class TestZookeeperServer extends ZooKeeperServer
- {
-
- public TestZookeeperServer()
- {
- super();
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
- {
- super(snapDir, logDir, tickTime);
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
- {
- super(txnLogFactory, treeBuilder);
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
- throws IOException
- {
- super(txnLogFactory, tickTime, treeBuilder);
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
- int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
- {
- super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- protected void registerJMX()
- {
- }
-
- @Override
- protected void unregisterJMX()
- {
- }
-
- }
-}
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
new file mode 100644
index 0000000000..2f2c81558b
--- /dev/null
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+
+public class KafkaOutputOperatorTest extends AbstractKafkaOutputOperatorTest
+{
+ @BeforeClass
+ public static void beforeClass()
+ {
+ try {
+ kafkaserver = new EmbeddedKafka();
+ kafkaserver.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+ try {
+ kafkaserver.stop();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public AbstractKafkaExactlyOnceOutputOperator createExaactlyOnceOutputOperator()
+ {
+ return new KafkaSinglePortExactlyOnceOutputOperator();
+ }
+
+ @Override
+ public AbstractKafkaInputOperator createKafkaInputOperator(DAG dag, DefaultInputPort inputPort)
+ {
+ KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
+ // Connect ports
+ dag.addStream("Kafka message", node.outputPort, inputPort);
+
+ return node;
+ }
+}
diff --git a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java
index faa5171d43..c196f6fb45 100644
--- a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java
+++ b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java
@@ -30,6 +30,7 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import com.google.common.collect.Iterables;
@@ -38,9 +39,9 @@
* Wrapper for 0.9.x version of Kafka consumer
*/
@InterfaceStability.Evolving
-public class KafkaConsumer09 implements AbstractKafkaConsumer
+public class KafkaConsumer09 implements AbstractKafkaConsumer
{
- private KafkaConsumer consumer;
+ private KafkaConsumer consumer;
public KafkaConsumer09(Properties properties)
{
@@ -75,7 +76,7 @@ public void seekToOffset(TopicPartition topicPartition, long offset)
* @return records
*/
@Override
- public ConsumerRecords pollRecords(long timeOut)
+ public ConsumerRecords pollRecords(long timeOut)
{
return consumer.poll(timeOut);
}
@@ -197,4 +198,10 @@ public long positionPartition(TopicPartition tp)
{
return consumer.position(tp);
}
+
+ @Override
+ public List partitionsFor(String topic)
+ {
+ return consumer.partitionsFor(topic);
+ }
}
diff --git a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 23c519fb0b..94e0977e1d 100644
--- a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -19,36 +19,7 @@
package org.apache.apex.malhar.kafka;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-
-import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
/**
* Kafka output operator with exactly once processing semantics.
@@ -96,318 +67,12 @@
* @since 3.5.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator
- implements Operator.CheckpointNotificationListener
+public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaExactlyOnceOutputOperator
{
- private transient String key;
- private transient String appName;
- private transient Integer operatorId;
- private transient Long windowId;
- private transient Map partialWindowTuples = new HashMap<>();
- private transient KafkaConsumer consumer;
-
- private WindowDataManager windowDataManager = new FSWindowDataManager();
- private final int KAFKA_CONNECT_ATTEMPT = 10;
- private final String KEY_SEPARATOR = "#";
-
- public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-
- public final transient DefaultInputPort inputPort = new DefaultInputPort()
- {
- @Override
- public void process(T tuple)
- {
- sendTuple(tuple);
- }
- };
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
-
- if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
- throw new IllegalArgumentException(
- "Value deserializer needs to be set for the operator, as it is used during recovery.");
- }
-
- super.setup(context);
-
- this.operatorId = context.getId();
- this.windowDataManager.setup(context);
- this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
- this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
-
- this.consumer = KafkaConsumerInit();
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- this.windowId = windowId;
-
- if (windowId == windowDataManager.getLargestCompletedWindow()) {
- rebuildPartialWindow();
- }
- }
-
- @Override
- public void checkpointed(long windowId)
- {
- }
-
- @Override
- public void committed(long windowId)
- {
- try {
- windowDataManager.committed(windowId);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void beforeCheckpoint(long windowId)
- {
- }
@Override
- public void teardown()
- {
- consumer.close();
- super.teardown();
- }
-
- @Override
- public void endWindow()
- {
- if (windowId <= windowDataManager.getLargestCompletedWindow()) {
- return;
- }
-
- if (!partialWindowTuples.isEmpty()) {
- throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset.");
- }
-
- // Every tuples should be written before the offsets are stored in the window data manager.
- getProducer().flush();
-
- try {
- this.windowDataManager.save(getPartitionsAndOffsets(true), windowId);
- } catch (IOException | InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- public WindowDataManager getWindowDataManager()
- {
- return windowDataManager;
- }
-
- public void setWindowDataManager(WindowDataManager windowDataManager)
- {
- this.windowDataManager = windowDataManager;
- }
-
- private boolean doesKeyBelongsToThisInstance(int operatorId, String key)
- {
- String[] split = key.split(KEY_SEPARATOR);
-
- if (split.length != 2) {
- return false;
- }
-
- if ((Integer.parseInt(split[1]) == operatorId) && (split[0].equals(appName))) {
- return true;
- }
-
- return false;
- }
-
- private boolean alreadyInKafka(T message)
- {
- if (windowId <= windowDataManager.getLargestCompletedWindow()) {
- return true;
- }
-
- if (partialWindowTuples.containsKey(message)) {
-
- Integer val = partialWindowTuples.get(message);
-
- if (val == 0) {
- return false;
- } else if (val == 1) {
- partialWindowTuples.remove(message);
- } else {
- partialWindowTuples.put(message, val - 1);
- }
- return true;
- }
- return false;
- }
-
- private Map getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
- {
- List partitionInfoList = consumer.partitionsFor(getTopic());
- List topicPartitionList = new java.util.ArrayList<>();
-
- for (PartitionInfo partitionInfo : partitionInfoList) {
- topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()));
- }
-
- Map parttionsAndOffset = new HashMap<>();
- consumer.assign(topicPartitionList);
-
- for (PartitionInfo partitionInfo : partitionInfoList) {
- try {
- TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
- if (latest) {
- consumer.seekToEnd(topicPartition);
- } else {
- consumer.seekToBeginning(topicPartition);
- }
- parttionsAndOffset.put(partitionInfo.partition(), consumer.position(topicPartition));
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- return parttionsAndOffset;
- }
-
- private void rebuildPartialWindow()
- {
- logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow());
-
- Map storedOffsets;
- Map currentOffsets;
-
- try {
- storedOffsets = (Map)this.windowDataManager.retrieve(windowId);
- currentOffsets = getPartitionsAndOffsets(true);
- } catch (IOException | ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- if (currentOffsets == null) {
- logger.info("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
- return;
- }
-
- if (storedOffsets == null) {
-
- logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition.");
-
- try {
- storedOffsets = getPartitionsAndOffsets(false);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- List topicPartitions = new ArrayList<>();
-
- for (Map.Entry entry : currentOffsets.entrySet()) {
- topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
- }
-
- consumer.assign(topicPartitions);
-
- for (Map.Entry entry : currentOffsets.entrySet()) {
- Long storedOffset = 0L;
- Integer currentPartition = entry.getKey();
- Long currentOffset = entry.getValue();
-
- if (storedOffsets.containsKey(currentPartition)) {
- storedOffset = storedOffsets.get(currentPartition);
- }
-
- if (storedOffset >= currentOffset) {
- continue;
- }
-
- try {
- consumer.seek(new TopicPartition(getTopic(), currentPartition), storedOffset);
- } catch (Exception ex) {
- logger.info("Rebuilding of the partial window is not complete, exactly once recovery is not possible.");
- throw new RuntimeException(ex);
- }
-
- int kafkaAttempt = 0;
-
- while (true) {
-
- ConsumerRecords consumerRecords = consumer.poll(100);
-
- if (consumerRecords.count() == 0) {
- if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
- break;
- }
- } else {
- kafkaAttempt = 0;
- }
-
- boolean crossedBoundary = false;
-
- for (ConsumerRecord consumerRecord : consumerRecords) {
-
- if (consumerRecord.offset() >= currentOffset) {
- crossedBoundary = true;
- break;
- }
-
- if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
- continue;
- }
-
- T value = consumerRecord.value();
-
- if (partialWindowTuples.containsKey(value)) {
- Integer count = partialWindowTuples.get(value);
- partialWindowTuples.put(value, count + 1);
- } else {
- partialWindowTuples.put(value, 1);
- }
-
- }
-
- if (crossedBoundary) {
- break;
- }
- }
- }
- }
-
- private KafkaConsumer KafkaConsumerInit()
+ public AbstractKafkaConsumer createConsumer(Properties prop)
{
- Properties props = new Properties();
-
- props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG));
- props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
- props.put(VALUE_DESERIALIZER_CLASS_CONFIG, getProperties().get(VALUE_DESERIALIZER_CLASS_CONFIG));
-
- return new KafkaConsumer<>(props);
+ return new KafkaConsumer09(prop);
}
-
- protected void sendTuple(T tuple)
- {
- if (alreadyInKafka(tuple)) {
- return;
- }
-
- getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
- {
- public void onCompletion(RecordMetadata metadata, Exception e)
- {
- if (e != null) {
- logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
}
-
diff --git a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
index 5b54979cdd..d68be7caba 100644
--- a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
+++ b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
@@ -48,7 +48,7 @@ public class KafkaSinglePortInputOperator extends AbstractKafkaInputOperator
@Override
public AbstractKafkaConsumer createConsumer(Properties properties)
{
- return new KafkaConsumer09(properties);
+ return new KafkaConsumer09(properties);
}
@Override
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
index e9fcc36e1a..0b14f35d30 100644
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
@@ -18,149 +18,29 @@
*/
package org.apache.apex.malhar.kafka;
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.Properties;
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import com.google.common.base.Throwables;
-
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-public class EmbeddedKafka
+public class EmbeddedKafka extends AbstractEmbeddedKafka
{
- private static final String KAFKA_PATH = "/tmp/kafka-test";
-
- private ZkClient zkClient;
- private ZkUtils zkUtils;
- private String BROKERHOST = "127.0.0.1";
- private String BROKERPORT = "9092";
- private EmbeddedZookeeper zkServer;
- private KafkaServer kafkaServer;
-
- public String getBroker()
- {
- return BROKERHOST + ":" + BROKERPORT;
- }
-
- public void start() throws IOException
+ @Override
+ public KafkaServer createKafkaServer(Properties prop)
{
- // Find port
- try {
- ServerSocket serverSocket = new ServerSocket(0);
- BROKERPORT = Integer.toString(serverSocket.getLocalPort());
- serverSocket.close();
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
-
- // Setup Zookeeper
- zkServer = new EmbeddedZookeeper();
- String zkConnect = BROKERHOST + ":" + zkServer.port();
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
- zkUtils = ZkUtils.apply(zkClient, false);
-
- // Setup brokers
- cleanupDir();
- Properties props = new Properties();
- props.setProperty("zookeeper.connect", zkConnect);
- props.setProperty("broker.id", "0");
- props.setProperty("log.dirs", KAFKA_PATH);
- props.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
- KafkaConfig config = new KafkaConfig(props);
+ KafkaConfig config = new KafkaConfig(prop);
Time mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
- }
-
- public void stop() throws IOException
- {
- kafkaServer.shutdown();
- zkClient.close();
- zkServer.shutdown();
- cleanupDir();
- }
-
- private void cleanupDir() throws IOException
- {
- FileUtils.deleteDirectory(new File(KAFKA_PATH));
+ return TestUtils.createServer(config, mock);
}
- public void createTopic(String topic)
+ @Override
+ public void createTopic(String topic, ZkUtils zkUtils, int noOfPartitions)
{
- AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
- List servers = new ArrayList();
- servers.add(kafkaServer);
- TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+ AdminUtils.createTopic(zkUtils, topic, noOfPartitions, 1, new Properties());
}
-
- public void publish(String topic, List messages)
- {
- Properties producerProps = new Properties();
- producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
- producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-
- try (KafkaProducer producer = new KafkaProducer<>(producerProps)) {
- for (String message : messages) {
- ProducerRecord data = new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8));
- producer.send(data);
- }
- }
-
- List servers = new ArrayList();
- servers.add(kafkaServer);
- TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
- }
-
- public List consume(String topic, int timeout)
- {
- return consume(topic, timeout, true);
- }
-
- public List consume(String topic, int timeout, boolean earliest)
- {
- Properties consumerProps = new Properties();
- consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
- consumerProps.setProperty("group.id", "group0");
- consumerProps.setProperty("client.id", "consumer0");
- consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- // to make sure the consumer starts from the beginning of the topic
- consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");
- KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Arrays.asList(topic));
-
- List messages = new ArrayList<>();
-
- ConsumerRecords records = consumer.poll(timeout);
- for (ConsumerRecord record : records) {
- messages.add(new String(record.value()));
- }
-
- consumer.close();
-
- return messages;
- }
-
}
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 09d878d25d..dae839e605 100644
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -18,36 +18,14 @@
*/
package org.apache.apex.malhar.kafka;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
+import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.stram.StramLocalCluster;
/**
* A bunch of test to verify the input operator will be automatically partitioned
@@ -55,343 +33,43 @@
* own Kafka cluster.
*/
@RunWith(Parameterized.class)
-public class KafkaInputOperatorTest extends KafkaOperatorTestBase
+public class KafkaInputOperatorTest extends AbstractKafkaInputOperatorTest
{
-
- private int totalBrokers = 0;
-
- private String partition = null;
-
- private String testName = "";
-
- public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
-
- public class KafkaTestInfo extends TestWatcher
+ @BeforeClass
+ public static void beforeClass()
{
- public org.junit.runner.Description desc;
-
- public String getDir()
- {
- String methodName = desc.getMethodName();
- String className = desc.getClassName();
- return "target/" + className + "/" + methodName + "/" + testName;
- }
-
- @Override
- protected void starting(org.junit.runner.Description description)
- {
- this.desc = description;
+ try {
+ kafkaServer = new EmbeddedKafka();
+ kafkaServer.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
- @Rule
- public final KafkaTestInfo testInfo = new KafkaTestInfo();
-
- @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
- public static Collection testScenario()
- {
- return Arrays.asList(new Object[][]{
- {true, false, "one_to_one"},// multi cluster with single partition
- {true, false, "one_to_many"},
- {true, true, "one_to_one"},// multi cluster with multi partitions
- {true, true, "one_to_many"},
- {false, true, "one_to_one"}, // single cluster with multi partitions
- {false, true, "one_to_many"},
- {false, false, "one_to_one"}, // single cluster with single partitions
- {false, false, "one_to_many"}
- });
- }
-
- @Before
- public void before()
+ @AfterClass
+ public static void afterClass()
{
- testName = TEST_TOPIC + testCounter++;
- logger.info("before() test case: {}", testName);
- tupleCollection.clear();
- //reset count for next new test case
- k = 0;
-
- createTopic(0, testName);
- if (hasMultiCluster) {
- createTopic(1, testName);
+ try {
+ kafkaServer.stop();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
-
}
public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
{
- // This class want to initialize several kafka brokers for multiple partitions
- this.hasMultiCluster = hasMultiCluster;
- this.hasMultiPartition = hasMultiPartition;
- int cluster = 1 + (hasMultiCluster ? 1 : 0);
- totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
- this.partition = partition;
+ super(hasMultiCluster, hasMultiPartition, partition);
}
- private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
- private static List tupleCollection = new LinkedList<>();
-
- /**
- * whether countDown latch count all tuples or just END_TUPLE
- */
- private static final boolean countDownAll = false;
- private static final int scale = 2;
- private static final int totalCount = 10 * scale;
- private static final int failureTrigger = 3 * scale;
- private static final int tuplesPerWindow = 5 * scale;
- private static final int waitTime = 60000 + 300 * scale;
-
- //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
- //so, count valid tuple instead.
- private static CountDownLatch latch;
- private static boolean hasFailure = false;
- private static int k = 0;
- private static Thread monitorThread;
-
- /**
- * Test Operator to collect tuples from KafkaSingleInputStringOperator.
- *
- * @param
- */
- public static class CollectorModule extends BaseOperator
+ @Override
+ public AbstractKafkaInputOperator createKafkaInputOperator(DAG dag, DefaultInputPort inputPort)
{
- public final transient DefaultInputPort inputPort = new DefaultInputPort()
- {
- @Override
- public void process(byte[] bt)
- {
- processTuple(bt);
- }
- };
-
- long currentWindowId;
-
- long operatorId;
-
- boolean isIdempotentTest = false;
-
- transient List windowTupleCollector = Lists.newArrayList();
- private transient Map> tupleCollectedInWindow = new HashMap<>();
- private int endTuples = 0;
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- operatorId = context.getId();
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- super.beginWindow(windowId);
- currentWindowId = windowId;
- windowTupleCollector.clear();
- endTuples = 0;
- }
-
- public void processTuple(byte[] bt)
- {
- String tuple = new String(bt);
- if (hasFailure && k++ == failureTrigger) {
- //you can only kill yourself once
- hasFailure = false;
- throw new RuntimeException();
- }
- if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
- endTuples++;
- }
-
- windowTupleCollector.add(tuple);
- }
-
- @Override
- public void endWindow()
- {
- super.endWindow();
- if (isIdempotentTest) {
- String key = operatorId + "," + currentWindowId;
- List msgsInWin = tupleCollectedInWindow.get(key);
- if (msgsInWin != null) {
- Assert.assertEquals(
- "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
- } else {
- List newList = Lists.newArrayList();
- newList.addAll(windowTupleCollector);
- tupleCollectedInWindow.put(key, newList);
- }
- }
-
- //discard the tuples of this window if except happened
- int tupleSize = windowTupleCollector.size();
- tupleCollection.addAll(windowTupleCollector);
-
- int countDownTupleSize = countDownAll ? tupleSize : endTuples;
-
- if (latch != null) {
- Assert.assertTrue(
- "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
- while (countDownTupleSize > 0) {
- latch.countDown();
- --countDownTupleSize;
- }
- if (latch.getCount() == 0) {
- /**
- * The time before countDown() and the shutdown() of the application
- * will cause fatal error:
- * "Catastrophic Error: Invalid State - the operator blocked forever!"
- * as the activeQueues could be cleared but alive haven't changed yet.
- * throw the ShutdownException to let the engine shutdown;
- */
- try {
- throw new ShutdownException();
- //lc.shutdown();
- } finally {
- /**
- * interrupt the engine thread, let it wake from sleep and handle
- * the shutdown at this time, all payload should be handled. so it
- * should be ok to interrupt
- */
- monitorThread.interrupt();
- }
- }
- }
- }
-
- }
-
- /**
- * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
- * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
- *
- * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
- * consumer) and send using emitTuples() interface on output port]
- *
- *
- * @throws Exception
- */
- @Test
- public void testInputOperator() throws Exception
- {
- hasFailure = false;
- testInputOperator(false, false);
- }
-
- @Test
- public void testInputOperatorWithFailure() throws Exception
- {
- hasFailure = true;
- testInputOperator(true, false);
- }
-
- @Test
- public void testIdempotentInputOperatorWithFailure() throws Exception
- {
- hasFailure = true;
- testInputOperator(true, true);
- }
-
- public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
- {
- // each broker should get a END_TUPLE message
- latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
-
- logger.info(
- "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
- " hasMultiPartition: {}, partition: {}",
- testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
-
- // Start producer
- KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
- p.setSendCount(totalCount);
- Thread t = new Thread(p);
- t.start();
-
- int expectedReceiveCount = totalCount + totalBrokers;
-
- // Create DAG for testing.
- LocalMode lma = LocalMode.newInstance();
- DAG dag = lma.getDAG();
-
- // Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator(
"Kafka input" + testName, KafkaSinglePortInputOperator.class);
- node.setInitialPartitionCount(1);
- // set topic
- node.setTopics(testName);
- node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
- node.setClusters(getClusterConfig());
- node.setStrategy(partition);
- if (idempotent) {
- node.setWindowDataManager(new FSWindowDataManager());
- }
-
- // Create Test tuple collector
- CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
- collector.isIdempotentTest = idempotent;
// Connect ports
- dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
- .setLocality(Locality.CONTAINER_LOCAL);
-
- if (hasFailure) {
- setupHasFailureTest(node, dag);
- }
-
- // Create local cluster
- LocalMode.Controller lc = lma.getController();
- lc.setHeartbeatMonitoringEnabled(false);
-
- //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
- //but Controller.runAsync() don't expose the thread which run it,
- //so we don't know when the thread will be terminated.
- //create this thread and then call join() to make sure the Controller shutdown completely.
- monitorThread = new Thread((StramLocalCluster)lc, "master");
- monitorThread.start();
-
- boolean notTimeout = true;
- try {
- // Wait 60s for consumer finish consuming all the messages
- notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
- lc.shutdown();
-
- //wait until control thread finished.
- monitorThread.join();
- } catch (Exception e) {
- logger.warn(e.getMessage());
- }
-
- t.join();
-
- if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
- logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
- expectedReceiveCount, testName, tupleCollection);
- }
- Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
- + tupleCollection, notTimeout);
-
- // Check results
- Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
- + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
- expectedReceiveCount == tupleCollection.size());
-
- logger.info("End of test case: {}", testName);
- }
-
- private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
- {
- operator.setHoldingBufferSize(5000);
- dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
- //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
- // APPLICATION_PATH + "failureck", new Configuration()));
- operator.setMaxTuplesPerWindow(tuplesPerWindow);
+ dag.addStream("Kafka message" + testName, node.outputPort, inputPort)
+ .setLocality(Locality.CONTAINER_LOCAL);
+ return node;
}
-
- private String getClusterConfig()
- {
- String l = "localhost:";
- return l + TEST_KAFKA_BROKER_PORT[0] +
- (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
- }
-
}
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
deleted file mode 100644
index 235eeba615..0000000000
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-import kafka.admin.TopicCommand;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
-
-/**
- * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
- * test, this class creates 2 kafka partitions
- */
-public class KafkaOperatorTestBase
-{
-
- public static final String END_TUPLE = "END_TUPLE";
- public static final int[] TEST_ZOOKEEPER_PORT;
- public static final int[] TEST_KAFKA_BROKER_PORT;
- public static final String TEST_TOPIC = "testtopic";
- public static int testCounter = 0;
-
- // get available ports
- static {
- ServerSocket[] listeners = new ServerSocket[6];
- int[] p = new int[6];
-
- try {
- for (int i = 0; i < 6; i++) {
- listeners[i] = new ServerSocket(0);
- p[i] = listeners[i].getLocalPort();
- }
-
- for (int i = 0; i < 6; i++) {
- listeners[i].close();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
- TEST_KAFKA_BROKER_PORT = new int[]{p[2], p[3]};
- }
-
- static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
- // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
-
- // multiple brokers in multiple cluster
- private static KafkaServerStartable[] broker = new KafkaServerStartable[2];
-
- // multiple cluster
- private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
-
- private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
-
- public static String baseDir = "target";
-
- private static final String zkBaseDir = "zookeeper-server-data";
- private static final String kafkaBaseDir = "kafka-server-data";
- private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
- private static final String[] kafkadir = new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"};
- protected boolean hasMultiPartition = false;
- protected boolean hasMultiCluster = false;
-
- public static void startZookeeper(final int clusterId)
- {
- try {
-
- int numConnections = 100;
- int tickTime = 2000;
- File dir = new File(baseDir, zkdir[clusterId]);
-
- zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
- zkFactory[clusterId] = new NIOServerCnxnFactory();
- zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
-
- zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
- Thread.sleep(2000);
- //kserver.startup();
- } catch (Exception ex) {
- logger.error(ex.getLocalizedMessage());
- }
- }
-
- public static void stopZookeeper()
- {
- for (ZooKeeperServer zs : zkServer) {
- if (zs != null) {
- zs.shutdown();
- }
- }
-
- for (ServerCnxnFactory zkf : zkFactory) {
- if (zkf != null) {
- zkf.closeAll();
- zkf.shutdown();
- }
- }
- zkServer = new ZooKeeperServer[2];
- zkFactory = new ServerCnxnFactory[2];
- }
-
- public static void startKafkaServer(int clusterid, int brokerid)
- {
- Properties props = new Properties();
- props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
- props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid]).toString());
- props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
- props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid]);
- props.setProperty("default.replication.factor", "1");
- // set this to 50000 to boost the performance so most test data are in memory before flush to disk
- props.setProperty("log.flush.interval.messages", "50000");
-
- broker[clusterid] = new KafkaServerStartable(new KafkaConfig(props));
- broker[clusterid].startup();
-
- }
-
- public static void startKafkaServer()
- {
-
- FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
- //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
- // new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
- startKafkaServer(0, 0);
- //startKafkaServer(0, 1);
- startKafkaServer(1, 0);
- //startKafkaServer(1, 1);
-
- // startup is asynch operation. wait 2 sec for server to startup
-
- }
-
- public static void stopKafkaServer()
- {
- for (int i = 0; i < broker.length; i++) {
- if (broker[i] != null) {
- broker[i].shutdown();
- broker[i].awaitShutdown();
- broker[i] = null;
- }
- }
- }
-
- @BeforeClass
- public static void beforeTest()
- {
- try {
- startZookeeper();
- startKafkaServer();
- } catch (java.nio.channels.CancelledKeyException ex) {
- logger.debug("LSHIL {}", ex.getLocalizedMessage());
- }
- }
-
- public static void startZookeeper()
- {
- FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
- startZookeeper(0);
- startZookeeper(1);
- }
-
- public void createTopic(int clusterid, String topicName)
- {
- String[] args = new String[9];
- args[0] = "--zookeeper";
- args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
- args[2] = "--replication-factor";
- args[3] = "1";
- args[4] = "--partitions";
- if (hasMultiPartition) {
- args[5] = "2";
- } else {
- args[5] = "1";
- }
- args[6] = "--topic";
- args[7] = topicName;
- args[8] = "--create";
-
- ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false);
- TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
-
- }
-
- @AfterClass
- public static void afterTest()
- {
- try {
- stopKafkaServer();
- stopZookeeper();
- } catch (Exception ex) {
- logger.debug("LSHIL {}", ex.getLocalizedMessage());
- }
- }
-
- public void setHasMultiPartition(boolean hasMultiPartition)
- {
- this.hasMultiPartition = hasMultiPartition;
- }
-
- public void setHasMultiCluster(boolean hasMultiCluster)
- {
- this.hasMultiCluster = hasMultiCluster;
- }
-
- public static class TestZookeeperServer extends ZooKeeperServer
- {
-
- public TestZookeeperServer()
- {
- super();
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
- {
- super(snapDir, logDir, tickTime);
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
- {
- super(txnLogFactory, treeBuilder);
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
- throws IOException
- {
- super(txnLogFactory, tickTime, treeBuilder);
- // TODO Auto-generated constructor stub
- }
-
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
- int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
- {
- super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- protected void registerJMX()
- {
- }
-
- @Override
- protected void unregisterJMX()
- {
- }
-
- }
-}
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
index 2b4307feaf..2f2c81558b 100644
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -18,408 +18,50 @@
*/
package org.apache.apex.malhar.kafka;
-import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.kafka.clients.producer.ProducerConfig;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Operator;
-import com.datatorrent.common.util.BaseOperator;
-
-import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
+public class KafkaOutputOperatorTest extends AbstractKafkaOutputOperatorTest
{
- String testName;
- private static List tupleCollection = new LinkedList<>();
- private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
- private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
-
- public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;
-
- @Before
- public void before()
- {
- FileUtils.deleteQuietly(new File(APPLICATION_PATH));
- testName = TEST_TOPIC + testCounter++;
- createTopic(0, testName);
- if (hasMultiCluster) {
- createTopic(1, testName);
- }
- }
-
- @After
- public void after()
+ @BeforeClass
+ public static void beforeClass()
{
- FileUtils.deleteQuietly(new File(APPLICATION_PATH));
- }
-
- @Test
- public void testExactlyOnceWithFailure() throws Exception
- {
- List toKafka = GenerateList();
-
- sendDataToKafka(true, toKafka, true, false);
-
- List fromKafka = ReadFromKafka();
-
- Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
- }
-
- @Test
- public void testExactlyOnceWithNoFailure() throws Exception
- {
- List toKafka = GenerateList();
-
- sendDataToKafka(true, toKafka, false, false);
-
- List fromKafka = ReadFromKafka();
-
- Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
- }
-
- @Test
- public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
- {
- List toKafka = GenerateList();
-
try {
- sendDataToKafka(true, toKafka, true, true);
- } catch (RuntimeException ex) {
-
- boolean expectedException = false;
- if (ex.getMessage().contains("Violates")) {
- expectedException = true;
- }
-
- Assert.assertTrue("Different tuples after recovery", expectedException);
- return;
- }
-
- Assert.assertTrue("Wrong tuples during replay, should throw exception", false);
- }
-
- @Test
- public void testKafkaOutput() throws Exception
- {
- List toKafka = GenerateList();
-
- sendDataToKafka(false, toKafka, false, false);
-
- List fromKafka = ReadFromKafka();
-
- Assert.assertTrue("No failure", compare(fromKafka, toKafka));
- }
-
- @Test
- public void testKafkaOutputWithFailure() throws Exception
- {
- List toKafka = GenerateList();
-
- sendDataToKafka(false, toKafka, true, true);
-
- List fromKafka = ReadFromKafka();
-
- Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
- }
-
- private void sendDataToKafka(boolean exactlyOnce, List toKafka, boolean hasFailure,
- boolean differentTuplesAfterRecovery) throws InterruptedException
- {
- Properties props = new Properties();
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
- if (!exactlyOnce) {
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER);
- }
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
- props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
-
- Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
- attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
- attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
-
- OperatorContext operatorContext = mockOperatorContext(2, attributeMap);
-
- cleanUp(operatorContext);
-
- Operator kafkaOutput;
- DefaultInputPort inputPort;
-
- if (exactlyOnce) {
- KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp =
- ResetKafkaOutput(testName, props, operatorContext);
- inputPort = kafkaOutputTemp.inputPort;
- kafkaOutput = kafkaOutputTemp;
- } else {
- KafkaSinglePortOutputOperator kafkaOutputTemp =
- ResetKafkaSimpleOutput(testName, props, operatorContext);
- inputPort = kafkaOutputTemp.inputPort;
- kafkaOutput = kafkaOutputTemp;
- }
-
- kafkaOutput.beginWindow(1);
- inputPort.getSink().put(toKafka.get(0));
- inputPort.getSink().put(toKafka.get(1));
- inputPort.getSink().put(toKafka.get(2));
- kafkaOutput.endWindow();
- kafkaOutput.beginWindow(2);
- inputPort.getSink().put(toKafka.get(3));
- inputPort.getSink().put(toKafka.get(4));
- inputPort.getSink().put(toKafka.get(5));
- kafkaOutput.endWindow();
- kafkaOutput.beginWindow(3);
- inputPort.getSink().put(toKafka.get(6));
- inputPort.getSink().put(toKafka.get(7));
-
- if (hasFailure) {
- if (exactlyOnce) {
- KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp =
- ResetKafkaOutput(testName, props, operatorContext);
- inputPort = kafkaOutputTemp.inputPort;
- kafkaOutput = kafkaOutputTemp;
- } else {
- KafkaSinglePortOutputOperator kafkaOutputTemp =
- ResetKafkaSimpleOutput(testName, props, operatorContext);
- inputPort = kafkaOutputTemp.inputPort;
- kafkaOutput = kafkaOutputTemp;
- }
-
- kafkaOutput.beginWindow(2);
- inputPort.getSink().put(toKafka.get(3));
- inputPort.getSink().put(toKafka.get(4));
- inputPort.getSink().put(toKafka.get(5));
- kafkaOutput.endWindow();
- kafkaOutput.beginWindow(3);
- inputPort.getSink().put(toKafka.get(6));
-
- if (!differentTuplesAfterRecovery) {
- inputPort.getSink().put(toKafka.get(7));
- }
+ kafkaserver = new EmbeddedKafka();
+ kafkaserver.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
-
- inputPort.getSink().put(toKafka.get(8));
- inputPort.getSink().put(toKafka.get(9));
- kafkaOutput.endWindow();
- kafkaOutput.beginWindow(4);
- inputPort.getSink().put(toKafka.get(10));
- inputPort.getSink().put(toKafka.get(11));
- kafkaOutput.endWindow();
-
- cleanUp(operatorContext);
- }
-
- private KafkaSinglePortExactlyOnceOutputOperator ResetKafkaOutput(
- String testName, Properties props, Context.OperatorContext operatorContext)
- {
- KafkaSinglePortExactlyOnceOutputOperator kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
- kafkaOutput.setTopic(testName);
- kafkaOutput.setProperties(props);
- kafkaOutput.setup(operatorContext);
-
- return kafkaOutput;
- }
-
- private KafkaSinglePortOutputOperator ResetKafkaSimpleOutput(
- String testName, Properties props, Context.OperatorContext operatorContext)
- {
- KafkaSinglePortOutputOperator kafkaOutput = new KafkaSinglePortOutputOperator<>();
- kafkaOutput.setTopic(testName);
- kafkaOutput.setProperties(props);
- kafkaOutput.setup(operatorContext);
-
- return kafkaOutput;
}
- private void cleanUp(Context.OperatorContext operatorContext)
+ @AfterClass
+ public static void afterClass()
{
- FSWindowDataManager windowDataManager = new FSWindowDataManager();
- windowDataManager.setup(operatorContext);
try {
- windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
+ kafkaserver.stop();
} catch (IOException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
- private boolean compare(List fromKafka, List toKafka)
+ @Override
+ public AbstractKafkaExactlyOnceOutputOperator createExaactlyOnceOutputOperator()
{
- if (fromKafka.size() != toKafka.size()) {
- return false;
- }
-
- for (int i = 0; i < fromKafka.size(); ++i) {
- if (!fromKafka.get(i).equals(toKafka.get(i))) {
- return false;
- }
- }
-
- return true;
+ return new KafkaSinglePortExactlyOnceOutputOperator();
}
- private String getClusterConfig()
+ @Override
+ public AbstractKafkaInputOperator createKafkaInputOperator(DAG dag, DefaultInputPort inputPort)
{
- String l = "localhost:";
- return l + TEST_KAFKA_BROKER_PORT[0] +
- (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
- }
-
- private List GenerateList()
- {
- List people = new ArrayList<>();
-
- for (Integer i = 0; i < 12; ++i) {
- people.add(new Person(i.toString(), i));
- }
-
- return people;
- }
-
- private List ReadFromKafka()
- {
- tupleCollection.clear();
-
- // Create KafkaSinglePortStringInputOperator
- Properties props = new Properties();
- props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
- props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER);
- props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
- props.put(GROUP_ID_CONFIG, "KafkaTest");
-
- LocalMode lma = LocalMode.newInstance();
- DAG dag = lma.getDAG();
-
- // Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
- node.setConsumerProps(props);
- node.setInitialPartitionCount(1);
- // set topic
- node.setTopics(testName);
- node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
- node.setClusters(getClusterConfig());
- node.setStrategy("one_to_one");
-
- // Create Test tuple collector
- CollectorModule collector1 = dag.addOperator("collector", new CollectorModule());
-
// Connect ports
- dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
-
- // Create local cluster
- final LocalMode.Controller lc = lma.getController();
- lc.setHeartbeatMonitoringEnabled(false);
-
- lc.run(30000);
-
- return tupleCollection;
- }
+ dag.addStream("Kafka message", node.outputPort, inputPort);
- public static class CollectorModule extends BaseOperator
- {
- public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
-
- long currentWindowId;
- long operatorId;
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- operatorId = context.getId();
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- super.beginWindow(windowId);
- currentWindowId = windowId;
- }
-
- @Override
- public void endWindow()
- {
- super.endWindow();
- }
- }
-
- public static class CollectorInputPort extends DefaultInputPort
- {
- CollectorModule ownerNode;
-
- CollectorInputPort(CollectorModule node)
- {
- this.ownerNode = node;
- }
-
- @Override
- public void process(byte[] bt)
- {
- tupleCollection.add(new KafkaHelper().deserialize("r", bt));
- }
- }
-
- public static class Person
- {
- public String name;
- public Integer age;
-
- public Person(String name, Integer age)
- {
- this.name = name;
- this.age = age;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Person person = (Person)o;
-
- if (name != null ? !name.equals(person.name) : person.name != null) {
- return false;
- }
-
- return age != null ? age.equals(person.age) : person.age == null;
- }
-
- @Override
- public int hashCode()
- {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (age != null ? age.hashCode() : 0);
- return result;
- }
-
- @Override
- public String toString()
- {
- return name + age.toString();
- }
+ return node;
}
}
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
deleted file mode 100644
index 322f070779..0000000000
--- a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import com.google.common.collect.Lists;
-
-/**
- * A kafka producer for testing
- */
-public class KafkaTestProducer implements Runnable
-{
- // private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
- private final Producer producer;
- private final Producer producer1;
- private final String topic;
- private int sendCount = 20;
- // to generate a random int as a key for partition
- private final Random rand = new Random();
- private boolean hasPartition = false;
- private boolean hasMultiCluster = false;
- private List messages;
-
- // http://kafka.apache.org/documentation.html#producerconfigs
- private String ackType = "1";
-
- public int getSendCount()
- {
- return sendCount;
- }
-
- public void setSendCount(int sendCount)
- {
- this.sendCount = sendCount;
- }
-
- public void setMessages(List messages)
- {
- this.messages = messages;
- }
-
- private Properties createProducerConfig(int cid)
- {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
- String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid];
- brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid]) : "";
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
- props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
- props.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
- props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
-
- return props;
- }
-
- public KafkaTestProducer(String topic)
- {
- this(topic, false);
- }
-
- public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
- {
- // Use random partitioner. Don't need the key type. Just set it to Integer.
- // The message is of type String.
- this.topic = topic;
- this.hasPartition = hasPartition;
- this.hasMultiCluster = hasMultiCluster;
- producer = new KafkaProducer<>(createProducerConfig(0));
- if (hasMultiCluster) {
- producer1 = new KafkaProducer<>(createProducerConfig(1));
- } else {
- producer1 = null;
- }
- }
-
- public KafkaTestProducer(String topic, boolean hasPartition)
- {
- this(topic, hasPartition, false);
- }
-
- private transient List> sendTasks = Lists.newArrayList();
-
- private void generateMessages()
- {
- // Create dummy message
- int messageNo = 1;
- while (messageNo <= sendCount) {
- String messageStr = "_" + messageNo++;
- int k = rand.nextInt(100);
- sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)));
- if (hasMultiCluster && messageNo <= sendCount) {
- messageStr = "_" + messageNo++;
- sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)));
- }
- // logger.debug(String.format("Producing %s", messageStr));
- }
- // produce the end tuple to let the test input operator know it's done produce messages
- for (int i = 0; i < (hasPartition ? 2 : 1); ++i) {
- sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
- if (hasMultiCluster) {
- sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
- }
- }
- }
-
- @Override
- public void run()
- {
- if (messages == null) {
- generateMessages();
- } else {
- for (String msg : messages) {
- sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg)));
- }
- }
-
- producer.flush();
- if (producer1 != null) {
- producer1.flush();
- }
-
- try {
- for (Future task : sendTasks) {
- task.get(30, TimeUnit.SECONDS);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- close();
- }
-
- public void close()
- {
- producer.close();
- if (producer1 != null) {
- producer1.close();
- }
- }
-
- public String getAckType()
- {
- return ackType;
- }
-
- public void setAckType(String ackType)
- {
- this.ackType = ackType;
- }
-} // End of KafkaTestProducer
diff --git a/sql/pom.xml b/sql/pom.xml
index 96ac17816b..6f34037f26 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -194,6 +194,19 @@
+
+ org.apache.apex
+ malhar-kafka-common
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ *
+ *
+
+
+
org.apache.kafka
kafka_2.11