From 431a9690a22a20801c1e8ae3865e8ebaf62fed9c Mon Sep 17 00:00:00 2001 From: poorna Date: Wed, 8 Nov 2017 19:01:10 -0800 Subject: [PATCH 01/11] First cut of re-write to use Kafka 0.10 APIs --- pom.xml | 4 +- .../plugin/batchSource/KafkaBatchSource.java | 23 +- .../plugin/batchSource/KafkaInputFormat.java | 272 ++++++++---------- .../hydrator/plugin/batchSource/KafkaKey.java | 18 +- .../plugin/batchSource/KafkaReader.java | 84 ++---- .../plugin/batchSource/KafkaRecordReader.java | 2 +- .../plugin/batchSource/KafkaRequest.java | 74 +---- .../plugin/batchSource/KafkaSplit.java | 60 ++-- .../co/cask/hydrator/EmbeddedKafkaServer.java | 133 +++++++++ .../java/co/cask/hydrator/PipelineTest.java | 1 - 10 files changed, 345 insertions(+), 326 deletions(-) create mode 100644 src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java diff --git a/pom.xml b/pom.xml index 60caa5e..cb4a3dd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ co.cask.hydrator kafka-plugins - 1.8.0-SNAPSHOT-0.8.2.2 + 1.9.0-SNAPSHOT-0.10.2.0 @@ -63,7 +63,7 @@ 1.6.1 widgets docs - 0.8.2.2 + 0.10.2.0 2.3.0 system:cdap-data-streams[4.3.0-SNAPSHOT,5.0.0),system:cdap-data-pipeline[4.3.0-SNAPSHOT,5.0.0) diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java index 49f162a..998be68 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java @@ -16,15 +16,6 @@ package co.cask.hydrator.plugin.batchSource; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import co.cask.cdap.api.annotation.Description; import co.cask.cdap.api.annotation.Macro; import co.cask.cdap.api.annotation.Name; @@ -56,6 +47,16 @@ import kafka.common.TopicAndPartition; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; /** * Kafka batch source. @@ -374,7 +375,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { context.createDataset(tableName, KeyValueTable.class.getName(), DatasetProperties.EMPTY); } table = context.getDataset(tableName); - kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), config.getBrokerMap(), + Map kafkaConf = new HashMap<>(); + kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers()); + kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf, config.getPartitions(), config.getInitialPartitionOffsets(), table); context.setInput(Input.of(config.referenceName, diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java index 032b6b6..3c39e4a 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java @@ -18,35 +18,37 @@ import co.cask.cdap.api.common.Bytes; import co.cask.cdap.api.dataset.lib.KeyValueTable; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.consumer.SimpleConsumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Type; -import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; @@ -80,180 +82,130 @@ public List getSplits(JobContext context) throws IOException, Interr return kafkaSplits; } - public static List saveKafkaRequests(Configuration conf, String topic, Map brokers, - Set partitions, - Map initOffsets, - KeyValueTable table) throws Exception { - ArrayList finalRequests; - HashMap> offsetRequestInfo = new HashMap<>(); - - // Get Metadata for all topics - List topicMetadataList = getKafkaMetadata(brokers, topic); - - for (TopicMetadata topicMetadata : topicMetadataList) { - for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) { - LeaderInfo leader = - new LeaderInfo(new URI("tcp://" + partitionMetadata.leader().connectionString()), - partitionMetadata.leader().id()); - if (partitions.isEmpty() || partitions.contains(partitionMetadata.partitionId())) { - if (offsetRequestInfo.containsKey(leader)) { - ArrayList topicAndPartitions = offsetRequestInfo.get(leader); - topicAndPartitions.add(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId())); - offsetRequestInfo.put(leader, topicAndPartitions); - } else { - ArrayList topicAndPartitions = new ArrayList<>(); - topicAndPartitions.add(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId())); - offsetRequestInfo.put(leader, topicAndPartitions); - } - } + static List saveKafkaRequests(Configuration conf, String topic, Map kafkaConf, + final Set partitions, + Map initOffsets, + KeyValueTable table) throws Exception { + Properties properties = new Properties(); + properties.putAll(kafkaConf); + try (Consumer consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + // Get Metadata for all topics + @SuppressWarnings("unchecked") List partitionInfos = consumer.partitionsFor(topic); + if (!partitions.isEmpty()) { + Collection filteredPartitionInfos = + Collections2.filter(partitionInfos, + new Predicate() { + @Override + public boolean apply(PartitionInfo input) { + return partitions.contains(input.partition()); + } + }); + partitionInfos = ImmutableList.copyOf(filteredPartitionInfos); } - } - // Get the latest offsets and generate the KafkaRequests - finalRequests = fetchLatestOffsetAndCreateKafkaRequests(offsetRequestInfo, initOffsets, table); + // Get the latest offsets and generate the KafkaRequests + List finalRequests = createKafkaRequests(consumer, kafkaConf, partitionInfos, initOffsets, table); - Collections.sort(finalRequests, new Comparator() { - @Override - public int compare(KafkaRequest r1, KafkaRequest r2) { - return r1.getTopic().compareTo(r2.getTopic()); - } - }); + Collections.sort(finalRequests, new Comparator() { + @Override + public int compare(KafkaRequest r1, KafkaRequest r2) { + return r1.getTopic().compareTo(r2.getTopic()); + } + }); - Map offsetKeys = new HashMap<>(); - for (KafkaRequest request : finalRequests) { - KafkaKey key = offsetKeys.get(request); + // TODO: Understand this logic + Map offsetKeys = new HashMap<>(); + for (KafkaRequest request : finalRequests) { + KafkaKey key = offsetKeys.get(request); - if (key != null) { - request.setOffset(key.getOffset()); - request.setAvgMsgSize(key.getMessageSize()); - } + if (key != null) { + request.setOffset(key.getOffset()); + request.setAvgMsgSize(key.getMessageSize()); + } - if (request.getEarliestOffset() > request.getOffset() || request.getOffset() > request.getLastOffset()) { + if (request.getEarliestOffset() > request.getOffset() || request.getOffset() > request.getLastOffset()) { - boolean offsetUnset = request.getOffset() == KafkaRequest.DEFAULT_OFFSET; - // When the offset is unset, it means it's a new topic/partition, we also need to consume the earliest offset - if (offsetUnset) { - request.setOffset(request.getEarliestOffset()); - offsetKeys.put( - request, - new KafkaKey(request.getTopic(), request.getLeaderId(), request.getPartition(), 0, request.getOffset())); + boolean offsetUnset = request.getOffset() == KafkaRequest.DEFAULT_OFFSET; + // When the offset is unset, it means it's a new topic/partition, we also need to consume the earliest offset + if (offsetUnset) { + request.setOffset(request.getEarliestOffset()); + offsetKeys.put( + request, + new KafkaKey(request.getTopic(), request.getPartition(), + 0, request.getOffset())); + } } } + conf.set(KAFKA_REQUEST, new Gson().toJson(finalRequests)); + return finalRequests; } - conf.set(KAFKA_REQUEST, new Gson().toJson(finalRequests)); - return finalRequests; } - private static List getKafkaMetadata(Map brokers, String topic) { - List topicMetadataList = new ArrayList<>(); - - for (Map.Entry entry : brokers.entrySet()) { - SimpleConsumer consumer = createSimpleConsumer(entry.getKey(), entry.getValue()); - LOG.debug("Fetching metadata from broker {}: {} with client id {} for topic {}", entry.getKey(), - entry.getValue(), consumer.clientId(), topic); - try { - topicMetadataList = - consumer.send(new TopicMetadataRequest(ImmutableList.of(topic))).topicsMetadata(); - break; - } catch (Exception e) { - // No-op just query next broker - } finally { - consumer.close(); + private static List createKafkaRequests(Consumer consumer, Map kafkaConf, + List partitionInfos, + Map offsets, + KeyValueTable table) { + Collection topicPartitions = + Collections2.transform(partitionInfos, + new Function() { + @Override + public TopicPartition apply(PartitionInfo input) { + return new TopicPartition(input.topic(), input.partition()); + } + }); + Map latestOffsets = getLatestOffsets(consumer, topicPartitions); + Map earliestOffsets = getEarliestOffsets(consumer, topicPartitions); + + List requests = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicAndPartition topicAndPartition = new TopicAndPartition(partitionInfo.topic(), partitionInfo.partition()); + TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + long latestOffset = latestOffsets.get(topicPartition); + Long start; + byte[] tableStart = table.read(topicAndPartition.toString()); + if (tableStart != null) { + start = Bytes.toLong(tableStart); + } else { + start = offsets.containsKey(topicAndPartition) ? offsets.get(topicAndPartition) - 1 : null; } - } - if (topicMetadataList.isEmpty()) { - throw new IllegalArgumentException( - String.format("Failed to get any information for topic: %s from the given brokers: %s", topic, - brokers.toString())); + long earliestOffset = start == null || start == -2 ? earliestOffsets.get(topicPartition) : start; + if (earliestOffset == -1) { + earliestOffset = latestOffset; + } + LOG.debug("Getting kafka messages from topic {}, partition {}, with earlistOffset {}, latest offset {}", + topicAndPartition.topic(), topicAndPartition.partition(), earliestOffset, latestOffset); + KafkaRequest KafkaRequest = new KafkaRequest(kafkaConf, topicAndPartition.topic(), topicAndPartition.partition()); + KafkaRequest.setLatestOffset(latestOffset); + KafkaRequest.setEarliestOffset(earliestOffset); + requests.add(KafkaRequest); } - - return topicMetadataList; + return requests; } - private static SimpleConsumer createSimpleConsumer(String host, int port) { - return new SimpleConsumer(host, port, 20 * 1000, 128 * 1024, "client"); - } - - /** - * Gets the latest offsets and create the requests as needed - */ - private static ArrayList fetchLatestOffsetAndCreateKafkaRequests( - Map> offsetRequestInfo, - Map offsets, - KeyValueTable table) { - ArrayList finalRequests = new ArrayList<>(); - for (LeaderInfo leader : offsetRequestInfo.keySet()) { - Long latestTime = kafka.api.OffsetRequest.LatestTime(); - Long earliestTime = kafka.api.OffsetRequest.EarliestTime(); - - SimpleConsumer consumer = createSimpleConsumer(leader.getUri().getHost(), leader.getUri().getPort()); - // Latest Offset - PartitionOffsetRequestInfo partitionLatestOffsetRequestInfo = new PartitionOffsetRequestInfo(latestTime, 1); - // Earliest Offset - PartitionOffsetRequestInfo partitionEarliestOffsetRequestInfo = new PartitionOffsetRequestInfo(earliestTime, 1); - Map latestOffsetInfo = new HashMap<>(); - Map earliestOffsetInfo = new HashMap<>(); - ArrayList topicAndPartitions = offsetRequestInfo.get(leader); - for (TopicAndPartition topicAndPartition : topicAndPartitions) { - latestOffsetInfo.put(topicAndPartition, partitionLatestOffsetRequestInfo); - earliestOffsetInfo.put(topicAndPartition, partitionEarliestOffsetRequestInfo); - } - - OffsetResponse latestOffsetResponse = getLatestOffsetResponse(consumer, latestOffsetInfo); - OffsetResponse earliestOffsetResponse = null; - if (latestOffsetResponse != null) { - earliestOffsetResponse = getLatestOffsetResponse(consumer, earliestOffsetInfo); - } - consumer.close(); - if (earliestOffsetResponse == null) { - continue; - } + private static Map getLatestOffsets(Consumer consumer, + Collection topicAndPartitions) { + consumer.assign(topicAndPartitions); + consumer.seekToEnd(topicAndPartitions); - for (TopicAndPartition topicAndPartition : topicAndPartitions) { - long latestOffset = latestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0]; - Long start; - byte[] tableStart = table.read(topicAndPartition.toString()); - if (tableStart != null) { - start = Bytes.toLong(tableStart); - } else { - start = offsets.containsKey(topicAndPartition) ? offsets.get(topicAndPartition) - 1 : null; - } - - long earliestOffset = start == null || start == -2 - ? earliestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0] : start; - if (earliestOffset == -1) { - earliestOffset = latestOffset; - } - LOG.debug("Getting kafka messages from topic {}, partition {}, with earlistOffset {}, latest offset {}", - topicAndPartition.topic(), topicAndPartition.partition(), earliestOffset, latestOffset); - KafkaRequest KafkaRequest = - new KafkaRequest(topicAndPartition.topic(), Integer.toString(leader.getLeaderId()), - topicAndPartition.partition(), leader.getUri()); - KafkaRequest.setLatestOffset(latestOffset); - KafkaRequest.setEarliestOffset(earliestOffset); - finalRequests.add(KafkaRequest); - } + Map offsets = new HashMap<>(); + for (TopicPartition topicAndPartition : topicAndPartitions) { + long offset = consumer.position(topicAndPartition); + offsets.put(topicAndPartition, offset); } - return finalRequests; + return offsets; } - private static OffsetResponse getLatestOffsetResponse(SimpleConsumer consumer, - Map offsetInfo) { + private static Map getEarliestOffsets(Consumer consumer, + Collection topicAndPartitions) { + consumer.assign(topicAndPartitions); + consumer.seekToBeginning(topicAndPartitions); - OffsetResponse offsetResponse = - consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), - "client")); - if (offsetResponse.hasError()) { - for (TopicAndPartition key : offsetInfo.keySet()) { - short errorCode = offsetResponse.errorCode(key.topic(), key.partition()); - if (errorCode != ErrorMapping.NoError()) { - throw new RuntimeException( - String.format("Error happens when getting the offset for topic %s and partition %d with error code %d", - key.topic(), key.partition(),errorCode)); - } - } + Map offsets = new HashMap<>(); + for (TopicPartition topicAndPartition : topicAndPartitions) { + long offset = consumer.position(topicAndPartition); + offsets.put(topicAndPartition, offset); } - return offsetResponse; + return offsets; } } diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java index 4d0747b..acf9155 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java @@ -19,7 +19,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -34,7 +33,6 @@ */ public class KafkaKey implements WritableComparable { - private String leaderId = ""; private int partition = 0; private long beginOffset = 0; private long offset = 0; @@ -46,19 +44,18 @@ public class KafkaKey implements WritableComparable { * dummy empty constructor */ public KafkaKey() { - this("dummy", "0", 0, 0, 0, 0); + this("dummy", 0, 0, 0, 0); } - public KafkaKey(String topic, String leaderId, int partition, long beginOffset, long offset) { - this(topic, leaderId, partition, beginOffset, offset, 0); + public KafkaKey(String topic, int partition, long beginOffset, long offset) { + this(topic, partition, beginOffset, offset, 0); } - public KafkaKey(String topic, String leaderId, int partition, long beginOffset, long offset, long checksum) { - this.set(topic, leaderId, partition, beginOffset, offset, checksum); + public KafkaKey(String topic, int partition, long beginOffset, long offset, long checksum) { + this.set(topic, partition, beginOffset, offset, checksum); } - public void set(String topic, String leaderId, int partition, long beginOffset, long offset, long checksum) { - this.leaderId = leaderId; + public void set(String topic, int partition, long beginOffset, long offset, long checksum) { this.partition = partition; this.beginOffset = beginOffset; this.offset = offset; @@ -67,7 +64,6 @@ public void set(String topic, String leaderId, int partition, long beginOffset, } public void clear() { - leaderId = ""; partition = 0; beginOffset = 0; offset = 0; @@ -108,7 +104,6 @@ public void put(Writable key, Writable value) { @Override public void readFields(DataInput in) throws IOException { - this.leaderId = in.readUTF(); this.partition = in.readInt(); this.beginOffset = in.readLong(); this.offset = in.readLong(); @@ -120,7 +115,6 @@ public void readFields(DataInput in) throws IOException { @Override public void write(DataOutput out) throws IOException { - UTF8.writeString(out, this.leaderId); out.writeInt(this.partition); out.writeLong(this.beginOffset); out.writeLong(this.offset); diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java index 2bf958b..8e87d96 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java @@ -16,23 +16,21 @@ package co.cask.hydrator.plugin.batchSource; -import kafka.api.PartitionFetchInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchRequest; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import kafka.message.MessageAndOffset; +import com.google.common.collect.Lists; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; /** @@ -42,13 +40,12 @@ public class KafkaReader { private static final Logger LOG = LoggerFactory.getLogger(KafkaReader.class); // index of context - private static final int fetchBufferSize = 1024 * 1024; private final KafkaRequest kafkaRequest; - private final SimpleConsumer simpleConsumer; + private final Consumer consumer; private long currentOffset; private long lastOffset; - private Iterator messageIter; + private Iterator> messageIter; /** @@ -60,8 +57,9 @@ public KafkaReader(KafkaRequest request) { lastOffset = request.getLastOffset(); // read data from queue - URI uri = kafkaRequest.getURI(); - simpleConsumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 20 * 1000, fetchBufferSize, "client"); + Properties properties = new Properties(); + properties.putAll(request.getConf()); + consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer()); fetch(); } @@ -82,23 +80,22 @@ public boolean hasNext() throws IOException { public KafkaMessage getNext(KafkaKey kafkaKey) throws IOException { if (hasNext()) { - MessageAndOffset msgAndOffset = messageIter.next(); - Message message = msgAndOffset.message(); + ConsumerRecord consumerRecord = messageIter.next(); - ByteBuffer payload = message.payload(); - ByteBuffer key = message.key(); - - if (payload == null) { + byte[] value = consumerRecord.value(); + if (value == null) { LOG.warn("Received message with null message.payload with topic {} and partition {}", kafkaKey.getTopic(), kafkaKey.getPartition()); - } + ByteBuffer payload = value == null ? ByteBuffer.wrap(new byte[0]) : ByteBuffer.wrap(value); + ByteBuffer key = ByteBuffer.wrap(consumerRecord.key()); + kafkaKey.clear(); - kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(), kafkaRequest.getPartition(), currentOffset, - msgAndOffset.offset() + 1, message.checksum()); - kafkaKey.setMessageSize(msgAndOffset.message().size()); - currentOffset = msgAndOffset.offset() + 1; // increase offset + kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getPartition(), currentOffset, + consumerRecord.offset() + 1, consumerRecord.checksum()); + kafkaKey.setMessageSize(consumerRecord.serializedValueSize()); + currentOffset = consumerRecord.offset() + 1; // increase offset return new KafkaMessage(payload, key); } else { return null; @@ -112,31 +109,12 @@ private boolean fetch() { if (currentOffset >= lastOffset) { return false; } - TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaRequest.getTopic(), kafkaRequest.getPartition()); - PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(currentOffset, fetchBufferSize); - - Map fetchInfo = new HashMap<>(); - fetchInfo.put(topicAndPartition, partitionFetchInfo); - - FetchRequest fetchRequest = new FetchRequest(-1, "client", 1000, 1024, fetchInfo); - - FetchResponse fetchResponse; - try { - fetchResponse = simpleConsumer.fetch(fetchRequest); - if (fetchResponse.hasError()) { - String message = - "Error Code generated : " + fetchResponse.errorCode(kafkaRequest.getTopic(), kafkaRequest.getPartition()); - throw new RuntimeException(message); - } - return processFetchResponse(fetchResponse); - } catch (Exception e) { - return false; - } - } - private boolean processFetchResponse(FetchResponse fetchResponse) { - ByteBufferMessageSet messageBuffer = fetchResponse.messageSet(kafkaRequest.getTopic(), kafkaRequest.getPartition()); - messageIter = messageBuffer.iterator(); + TopicPartition topicPartition = new TopicPartition(kafkaRequest.getTopic(), kafkaRequest.getPartition()); + this.consumer.assign(Lists.newArrayList(topicPartition)); + this.consumer.seek(topicPartition, currentOffset); + ConsumerRecords consumerRecords = consumer.poll(TimeUnit.SECONDS.toMillis(30)); + messageIter = consumerRecords.iterator(); if (!messageIter.hasNext()) { messageIter = null; return false; @@ -148,8 +126,8 @@ private boolean processFetchResponse(FetchResponse fetchResponse) { * Closes this context */ public void close() throws IOException { - if (simpleConsumer != null) { - simpleConsumer.close(); + if (consumer != null) { + consumer.close(); } } } diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java index a6d87cb..98a097e 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java @@ -81,7 +81,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { return false; } - key.set(request.getTopic(), request.getLeaderId(), request.getPartition(), request.getOffset(), + key.set(request.getTopic(), request.getPartition(), request.getOffset(), request.getOffset(), 0); value = null; diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java index c00920e..02b2853 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java @@ -16,46 +16,37 @@ package co.cask.hydrator.plugin.batchSource; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; - -import java.net.URI; -import java.util.HashMap; +import com.google.common.collect.ImmutableMap; + import java.util.Map; /** * A class that represents the kafka pull request. - * + *

* The class is a container for topic, leaderId, partition, uri and offset. It is * used in reading and writing the sequence files used for the extraction job. - * */ public class KafkaRequest { public static final long DEFAULT_OFFSET = 0; + private Map conf; private String topic = ""; - private String leaderId = ""; private int partition = 0; - private URI uri = null; private long offset = DEFAULT_OFFSET; private long latestOffset = -1; private long earliestOffset = -2; private long avgMsgSize = 1024; - public KafkaRequest(String topic, String leaderId, int partition, URI brokerUri) { - this(topic, leaderId, partition, brokerUri, DEFAULT_OFFSET, -1); + public KafkaRequest(Map conf, String topic, int partition) { + this(conf, topic, partition, DEFAULT_OFFSET, -1); } - public KafkaRequest(String topic, String leaderId, int partition, URI brokerUri, long offset, long latestOffset) { + public KafkaRequest(Map conf, String topic, int partition, long offset, long latestOffset) { + this.conf = ImmutableMap.copyOf(conf); this.topic = topic; - this.leaderId = leaderId; - this.uri = brokerUri; this.partition = partition; this.latestOffset = latestOffset; setOffset(offset); @@ -77,18 +68,14 @@ public void setOffset(long offset) { this.offset = offset; } - public String getLeaderId() { - return this.leaderId; + public Map getConf() { + return conf; } public String getTopic() { return this.topic; } - public URI getURI() { - return this.uri; - } - public int getPartition() { return this.partition; } @@ -98,48 +85,11 @@ public long getOffset() { } public long getEarliestOffset() { - if (this.earliestOffset == -2 && uri != null) { - SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 20000, 1024 * 1024, "client"); - Map offsetInfo = new HashMap<>(); - offsetInfo.put(new TopicAndPartition(topic, partition), - new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); - OffsetResponse response = - consumer.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "client")); - long[] endOffset = response.offsets(topic, partition); - if (endOffset.length == 0) { - throw new RuntimeException("Could not find earliest offset for topic: " + topic + - " and partition: " + partition); - } - consumer.close(); - this.earliestOffset = endOffset[0]; - return endOffset[0]; - } else { - return this.earliestOffset; - } + return this.earliestOffset; } public long getLastOffset() { - if (this.latestOffset == -1 && uri != null) - return getLastOffset(kafka.api.OffsetRequest.LatestTime()); - else { - return this.latestOffset; - } - } - - private long getLastOffset(long time) { - SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000, 1024 * 1024, "client"); - Map offsetInfo = new HashMap<>(); - offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(time, 1)); - OffsetResponse response = - consumer.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "client")); - long[] endOffset = response.offsets(topic, partition); - consumer.close(); - if (endOffset.length == 0) { - throw new RuntimeException("Could not find latest offset for topic: " + topic + - " and partition: " + partition); - } - this.latestOffset = endOffset[0]; - return endOffset[0]; + return this.latestOffset; } public long estimateDataSize() { diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java index ce56ad5..b7313ff 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java @@ -17,16 +17,16 @@ package co.cask.hydrator.plugin.batchSource; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import javax.annotation.Nullable; /** @@ -46,34 +46,28 @@ public KafkaSplit(KafkaRequest request) { @Override public void readFields(DataInput in) throws IOException { - String topic = in.readUTF(); - String leaderId = in.readUTF(); - String str = in.readUTF(); - URI uri = null; - if (!str.isEmpty()) - try { - uri = new URI(str); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - int partition = in.readInt(); - long offset = in.readLong(); - long latestOffset = in.readLong(); - request = new KafkaRequest(topic, leaderId, partition, uri, offset, latestOffset); - length = request.estimateDataSize(); + MapWritable mapWritable = new MapWritable(); + mapWritable.readFields(in); + String topic = in.readUTF(); + int partition = in.readInt(); + long offset = in.readLong(); + long latestOffset = in.readLong(); + long earliestOffset = in.readLong(); + Map conf = writableToConf(mapWritable); + request = new KafkaRequest(conf, topic, partition, offset, latestOffset); + request.setEarliestOffset(earliestOffset); + length = request.estimateDataSize(); } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(request.getTopic()); - out.writeUTF(request.getLeaderId()); - if (request.getURI() != null) - out.writeUTF(request.getURI().toString()); - else - out.writeUTF(""); + MapWritable conf = confToWritable(request.getConf()); + conf.write(out); + out.writeUTF(request.getTopic()); out.writeInt(request.getPartition()); out.writeLong(request.getOffset()); out.writeLong(request.getLastOffset()); + out.writeLong(request.getEarliestOffset()); } @Override @@ -92,4 +86,20 @@ public KafkaRequest popRequest() { request = null; return result; } + + private MapWritable confToWritable(Map conf) { + MapWritable mapWritable = new MapWritable(); + for (Map.Entry entry : conf.entrySet()) { + mapWritable.put(new Text(entry.getKey()), new Text(entry.getValue())); + } + return mapWritable; + } + + private Map writableToConf(MapWritable mapWritable) { + Map conf = new HashMap<>(); + for (Map.Entry entry : mapWritable.entrySet()) { + conf.put(entry.getKey().toString(), entry.getValue().toString()); + } + return conf; + } } diff --git a/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java b/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java new file mode 100644 index 0000000..c32a973 --- /dev/null +++ b/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java @@ -0,0 +1,133 @@ +package co.cask.hydrator; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.AbstractIdleService; +import kafka.metrics.KafkaMetricsReporter; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.kafka.common.utils.Time; +import org.apache.twill.internal.utils.Networks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.net.BindException; +import java.util.Collections; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in + * the same process. + */ +public final class EmbeddedKafkaServer extends AbstractIdleService { + + public static final String START_RETRIES = "twill.kafka.start.timeout.retries"; + + private static final Logger LOG = LoggerFactory.getLogger(org.apache.twill.internal.kafka.EmbeddedKafkaServer.class); + private static final String DEFAULT_START_RETRIES = "5"; + + private final int startTimeoutRetries; + private final Properties properties; + private KafkaServer server; + + public EmbeddedKafkaServer(Properties properties) { + this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES, + DEFAULT_START_RETRIES)); + this.properties = new Properties(); + this.properties.putAll(properties); + } + + @Override + protected void startUp() throws Exception { + int tries = 0; + do { + KafkaConfig kafkaConfig = createKafkaConfig(properties); + KafkaServer kafkaServer = createKafkaServer(kafkaConfig); + try { + kafkaServer.startup(); + server = kafkaServer; + } catch (Exception e) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + + Throwable rootCause = Throwables.getRootCause(e); + if (rootCause instanceof ZkTimeoutException) { + // Potentially caused by race condition bug described in TWILL-139. + LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause); + } else if (rootCause instanceof BindException) { + LOG.warn("Kafka failed to bind to port {}. Attempt number {}.", kafkaConfig.port(), tries, rootCause); + } else { + throw e; + } + + // Do a random sleep of < 200ms + TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200) + 1L); + } + } while (server == null && ++tries < startTimeoutRetries); + + if (server == null) { + throw new IllegalStateException("Failed to start Kafka server after " + tries + " attempts."); + } + } + + @Override + protected void shutDown() throws Exception { + if (server != null) { + server.shutdown(); + server.awaitShutdown(); + } + } + + private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) { + return new KafkaServer(kafkaConfig, new Time() { + + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public long hiResClockMs() { + return System.currentTimeMillis(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.interrupted(); + } + } + }, Option.apply("embedded-server"), (Seq) JavaConverters.asScalaBufferConverter(Collections.EMPTY_LIST).asScala()); + } + + /** + * Creates a new {@link KafkaConfig} from the given {@link Properties}. If the {@code "port"} property is missing + * or is equals to {@code "0"}, a random port will be generated. + */ + private KafkaConfig createKafkaConfig(Properties properties) { + Properties prop = new Properties(); + prop.putAll(properties); + + String port = prop.getProperty("port"); + if (port == null || "0".equals(port)) { + int randomPort = Networks.getRandomPort(); + Preconditions.checkState(randomPort > 0, "Failed to get random port."); + prop.setProperty("port", Integer.toString(randomPort)); + } + + return new KafkaConfig(prop); + } +} diff --git a/src/test/java/co/cask/hydrator/PipelineTest.java b/src/test/java/co/cask/hydrator/PipelineTest.java index 2409d26..48e5f78 100644 --- a/src/test/java/co/cask/hydrator/PipelineTest.java +++ b/src/test/java/co/cask/hydrator/PipelineTest.java @@ -44,7 +44,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import kafka.common.TopicAndPartition; -import org.apache.twill.internal.kafka.EmbeddedKafkaServer; import org.apache.twill.internal.kafka.client.ZKKafkaClientService; import org.apache.twill.internal.utils.Networks; import org.apache.twill.internal.zookeeper.InMemoryZKServer; From 88d80129fa874769d30794d84b4508e4e4057e15 Mon Sep 17 00:00:00 2001 From: poorna Date: Wed, 8 Nov 2017 20:53:17 -0800 Subject: [PATCH 02/11] Fix alert publisher and pom.xml for cdap version --- pom.xml | 10 ++++++++-- .../plugin/alertpublisher/KafkaAlertPublisher.java | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index cb4a3dd..72f9d12 100644 --- a/pom.xml +++ b/pom.xml @@ -58,8 +58,8 @@ UTF-8 - 4.3.0-SNAPSHOT - 1.8.0-SNAPSHOT + 4.3.1 + 1.8.1 1.6.1 widgets docs @@ -306,6 +306,12 @@ + + net.sf.jopt-simple + jopt-simple + 3.2 + test + diff --git a/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java b/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java index 340181a..01df012 100644 --- a/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java +++ b/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java @@ -13,11 +13,11 @@ import co.cask.hydrator.common.KeyValueListParser; import com.google.common.base.Strings; import com.google.gson.Gson; -import kafka.common.InvalidTopicException; import kafka.common.Topic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidTopicException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 1effaed14255c51bf2c72e5478c1549d05fbc012 Mon Sep 17 00:00:00 2001 From: poorna Date: Wed, 8 Nov 2017 22:17:06 -0800 Subject: [PATCH 03/11] Fix null pointer for key --- .../co/cask/hydrator/plugin/batchSource/KafkaReader.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java index 8e87d96..182d024 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java @@ -38,6 +38,7 @@ */ public class KafkaReader { private static final Logger LOG = LoggerFactory.getLogger(KafkaReader.class); + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; // index of context private final KafkaRequest kafkaRequest; @@ -79,17 +80,17 @@ public boolean hasNext() throws IOException { */ public KafkaMessage getNext(KafkaKey kafkaKey) throws IOException { if (hasNext()) { - ConsumerRecord consumerRecord = messageIter.next(); + byte[] keyBytes = consumerRecord.key(); byte[] value = consumerRecord.value(); if (value == null) { LOG.warn("Received message with null message.payload with topic {} and partition {}", kafkaKey.getTopic(), kafkaKey.getPartition()); } - ByteBuffer payload = value == null ? ByteBuffer.wrap(new byte[0]) : ByteBuffer.wrap(value); - ByteBuffer key = ByteBuffer.wrap(consumerRecord.key()); + ByteBuffer payload = value == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(value); + ByteBuffer key = keyBytes == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(keyBytes); kafkaKey.clear(); kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getPartition(), currentOffset, From ebc5c56202452b3738dbc7943444777710c64daa Mon Sep 17 00:00:00 2001 From: poorna Date: Wed, 8 Nov 2017 22:17:21 -0800 Subject: [PATCH 04/11] Secure kafka settings --- .../hydrator/plugin/batchSource/KafkaBatchSource.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java index 998be68..2915060 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java @@ -377,6 +377,14 @@ public void prepareRun(BatchSourceContext context) throws Exception { table = context.getDataset(tableName); Map kafkaConf = new HashMap<>(); kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers()); + kafkaConf.put("security.protocol", "SASL_PLAINTEXT"); + kafkaConf.put("sasl.mechanism", "GSSAPI"); + kafkaConf.put("sasl.kerberos.service.name", "kafka"); + kafkaConf.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required \n" + + " useKeyTab=true \n" + + " storeKey=true \n" + + " keyTab=\"/etc/security/keytabs/cdap.service.keytab\" \n" + + " principal=\"cdap/hdp-2529066-1000.dev.continuuity.net@CONTINUUITY.NET\";"); kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf, config.getPartitions(), config.getInitialPartitionOffsets(), table); From e2bd10d24a28918d618276bf204f58c31a95845d Mon Sep 17 00:00:00 2001 From: yaojiefeng Date: Wed, 8 Nov 2017 22:47:08 -0800 Subject: [PATCH 05/11] add additional fields in kafka batch config to let user input kerberos information --- .../plugin/batchSource/KafkaBatchSource.java | 55 ++++++++++++++++--- widgets/Kafka-batchsource.json | 20 +++++++ 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java index 2915060..7e9fdb1 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java @@ -135,6 +135,21 @@ public static class KafkaBatchConfig extends ReferencePluginConfig { @Nullable private String offsetField; + @Description("Additional kafka producer properties to set") + @Macro + @Nullable + private String kafkaProperties; + + @Description("The kerberos principal used for the source.") + @Macro + @Nullable + private String principal; + + @Description("The keytab location for the kerberos principal when kerberos security is enabled for kafka.") + @Macro + @Nullable + private String keytabLocation; + public KafkaBatchConfig() { super(""); } @@ -161,6 +176,17 @@ public String getTableName() { return tableName; } + @Nullable + public String getPrincipal() { + return principal; + } + + @Nullable + public String getKeytabLocation() { + + return keytabLocation; + } + public Set getPartitions() { Set partitionSet = new HashSet<>(); if (partitions == null) { @@ -307,6 +333,17 @@ public Map getInitialPartitionOffsets() { return partitionOffsets; } + public Map getKafkaProperties() { + KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":"); + Map conf = new HashMap<>(); + if (!Strings.isNullOrEmpty(kafkaProperties)) { + for (KeyValue keyVal : kvParser.parse(kafkaProperties)) { + conf.put(keyVal.getKey(), keyVal.getValue()); + } + } + return conf; + } + public void validate() { // brokers can be null since it is macro enabled. if (kafkaBrokers != null) { @@ -377,14 +414,16 @@ public void prepareRun(BatchSourceContext context) throws Exception { table = context.getDataset(tableName); Map kafkaConf = new HashMap<>(); kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers()); - kafkaConf.put("security.protocol", "SASL_PLAINTEXT"); - kafkaConf.put("sasl.mechanism", "GSSAPI"); - kafkaConf.put("sasl.kerberos.service.name", "kafka"); - kafkaConf.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required \n" + - " useKeyTab=true \n" + - " storeKey=true \n" + - " keyTab=\"/etc/security/keytabs/cdap.service.keytab\" \n" + - " principal=\"cdap/hdp-2529066-1000.dev.continuuity.net@CONTINUUITY.NET\";"); + kafkaConf.putAll(config.getKafkaProperties()); + if (config.getKeytabLocation() != null && config.getPrincipal() != null) { + kafkaConf.put("sasl.jaas.config", String.format("com.sun.security.auth.module.Krb5LoginModule required \n" + + " useKeyTab=true \n" + + " storeKey=true \n" + + " useTicketCache=false \n" + + " keyTab=\"%s\" \n" + + " principal=\"%s\";", config.getKeytabLocation(), + config.getPrincipal())); + } kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf, config.getPartitions(), config.getInitialPartitionOffsets(), table); diff --git a/widgets/Kafka-batchsource.json b/widgets/Kafka-batchsource.json index 9c563e4..f452bf4 100644 --- a/widgets/Kafka-batchsource.json +++ b/widgets/Kafka-batchsource.json @@ -62,6 +62,26 @@ "widget-type": "textbox", "label": "Offset Field", "name": "offsetField" + }, + { + "widget-type": "textbox", + "label": "Kerberos Principal", + "name": "principal" + }, + { + "widget-type": "textbox", + "label": "Keytab Location", + "name": "keytabLocation" + }, + { + "widget-type": "keyvalue", + "label": "Additional Kafka Consumer Properties", + "name": "kafkaProperties", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Kafka consumer property", + "value-placeholder": "Kafka consumer property value" + } } ] }, From 84a23863257e14145b75d36281cb915e0bc6784e Mon Sep 17 00:00:00 2001 From: yaojiefeng Date: Wed, 8 Nov 2017 23:36:52 -0800 Subject: [PATCH 06/11] update kafka producer to allow access to kerberos enabled kafka --- .../co/cask/hydrator/plugin/sink/Kafka.java | 25 ++++++++++++++++--- .../plugin/sink/KafkaOutputFormat.java | 6 ++--- .../plugin/sink/StringPartitioner.java | 20 +++++++++++---- widgets/Kafka-batchsink.json | 10 ++++++++ 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java b/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java index 253d8c8..238873d 100644 --- a/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java +++ b/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java @@ -19,7 +19,6 @@ import co.cask.hydrator.common.ReferencePluginConfig; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import org.apache.avro.reflect.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -28,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; /** * Kafka sink to write to Kafka @@ -144,6 +144,16 @@ public static class Config extends ReferencePluginConfig { @Nullable private String kafkaProperties; + @Description("The kerberos principal used for the source.") + @Macro + @Nullable + private String principal; + + @Description("The keytab location for the kerberos principal when kerberos security is enabled for kafka.") + @Macro + @Nullable + private String keytabLocation; + @Name("compressionType") @Description("Additional kafka producer properties to set") @Macro @@ -171,10 +181,19 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider { conf.put(BROKER_LIST, kafkaSinkConfig.brokers); conf.put("compression.type", kafkaSinkConfig.compressionType); - conf.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"); - conf.put(VAL_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"); + addKafkaProperties(kafkaSinkConfig.kafkaProperties); + if (kafkaSinkConfig.principal != null && kafkaSinkConfig.keytabLocation != null) { + conf.put("additional." + "sasl.jaas.config", + String.format("com.sun.security.auth.module.Krb5LoginModule required \n" + + " useKeyTab=true \n" + + " storeKey=true \n" + + " useTicketCache=false \n" + + " keyTab=\"%s\" \n" + + " principal=\"%s\";", kafkaSinkConfig.keytabLocation, + kafkaSinkConfig.principal)); + } conf.put("async", kafkaSinkConfig.async); if (kafkaSinkConfig.async.equalsIgnoreCase("true")) { diff --git a/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java b/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java index efc43c7..3f4451e 100644 --- a/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java +++ b/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java @@ -10,6 +10,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,6 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) Properties props = new Properties(); // Configure the properties for kafka. props.put(BROKER_LIST, configuration.get(BROKER_LIST)); - props.put(KEY_SERIALIZER, configuration.get(KEY_SERIALIZER)); - props.put(VAL_SERIALIZER, configuration.get(VAL_SERIALIZER)); props.put("compression.type", configuration.get("compression.type")); if (!Strings.isNullOrEmpty(configuration.get("hasKey"))) { @@ -102,7 +101,8 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) // CDAP-9178: cached the producer object to avoid being created on every batch interval if (producer == null) { - producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props, new StringSerializer(), + new StringSerializer()); } return new KafkaRecordWriter(producer, topic); diff --git a/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java b/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java index 5cf38eb..6014a18 100644 --- a/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java +++ b/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java @@ -1,8 +1,12 @@ package co.cask.hydrator.plugin.sink; import com.google.common.hash.Hashing; -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.Map; /** * String partitioner for kafka @@ -10,12 +14,18 @@ @SuppressWarnings("UnusedDeclaration") public final class StringPartitioner implements Partitioner { - public StringPartitioner(VerifiableProperties props) { + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + List partitions = cluster.partitionsForTopic(topic); + int numPartitions = partitions.size(); + return Math.abs(Hashing.md5().hashString(key.toString()).asInt()) % numPartitions; + } + @Override + public void close() { } @Override - public int partition(Object key, int numPartitions) { - return Math.abs(Hashing.md5().hashString(key.toString()).asInt()) % numPartitions; + public void configure(Map map) { } } diff --git a/widgets/Kafka-batchsink.json b/widgets/Kafka-batchsink.json index ee33e85..4e8d141 100644 --- a/widgets/Kafka-batchsink.json +++ b/widgets/Kafka-batchsink.json @@ -50,6 +50,16 @@ "default": "none" } }, + { + "widget-type": "textbox", + "label": "Kerberos Principal", + "name": "principal" + }, + { + "widget-type": "textbox", + "label": "Keytab Location", + "name": "keytabLocation" + }, { "widget-type": "keyvalue", "label": "Additional Kafka Producer Properties", From 4487005f72734f46b910c89fccf8741275bf616f Mon Sep 17 00:00:00 2001 From: yaojiefeng Date: Wed, 6 Dec 2017 14:23:28 -0800 Subject: [PATCH 07/11] use kafka 9 --- pom.xml | 4 ++-- .../plugin/batchSource/KafkaInputFormat.java | 19 ++++++++++------- .../hydrator/plugin/batchSource/KafkaKey.java | 21 ++++--------------- .../plugin/batchSource/KafkaReader.java | 4 ++-- .../plugin/batchSource/KafkaRecordReader.java | 2 +- .../co/cask/hydrator/EmbeddedKafkaServer.java | 13 ++---------- 6 files changed, 23 insertions(+), 40 deletions(-) diff --git a/pom.xml b/pom.xml index 72f9d12..d61f813 100644 --- a/pom.xml +++ b/pom.xml @@ -63,7 +63,7 @@ 1.6.1 widgets docs - 0.10.2.0 + 0.9.0.1 2.3.0 system:cdap-data-streams[4.3.0-SNAPSHOT,5.0.0),system:cdap-data-pipeline[4.3.0-SNAPSHOT,5.0.0) @@ -334,7 +334,7 @@ <_exportcontents>co.cask.hydrator.plugin.*;org.apache.spark.streaming.kafka.*; - kafka.serializer.*;kafka.common; + kafka.serializer.*;kafka.common;org.apache.kafka.common.serialization.*;org.apache.kafka.clients.consumer.* *;inline=false;scope=compile true lib diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java index 3c39e4a..776b552 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import kafka.common.TopicAndPartition; @@ -145,9 +146,9 @@ private static List createKafkaRequests(Consumer consumer, Map partitionInfos, Map offsets, KeyValueTable table) { - Collection topicPartitions = - Collections2.transform(partitionInfos, - new Function() { + List topicPartitions = + Lists.transform(partitionInfos, + new Function() { @Override public TopicPartition apply(PartitionInfo input) { return new TopicPartition(input.topic(), input.partition()); @@ -184,9 +185,11 @@ public TopicPartition apply(PartitionInfo input) { } private static Map getLatestOffsets(Consumer consumer, - Collection topicAndPartitions) { + List topicAndPartitions) { consumer.assign(topicAndPartitions); - consumer.seekToEnd(topicAndPartitions); + for (TopicPartition topicPartition : topicAndPartitions) { + consumer.seekToEnd(topicPartition); + } Map offsets = new HashMap<>(); for (TopicPartition topicAndPartition : topicAndPartitions) { @@ -197,9 +200,11 @@ private static Map getLatestOffsets(Consumer consumer, } private static Map getEarliestOffsets(Consumer consumer, - Collection topicAndPartitions) { + List topicAndPartitions) { consumer.assign(topicAndPartitions); - consumer.seekToBeginning(topicAndPartitions); + for (TopicPartition topicPartition : topicAndPartitions) { + consumer.seekToBeginning(topicPartition); + } Map offsets = new HashMap<>(); for (TopicPartition topicAndPartition : topicAndPartitions) { diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java index acf9155..01dfdec 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java @@ -36,7 +36,6 @@ public class KafkaKey implements WritableComparable { private int partition = 0; private long beginOffset = 0; private long offset = 0; - private long checksum = 0; private String topic = ""; private MapWritable partitionMap = new MapWritable(); @@ -44,22 +43,17 @@ public class KafkaKey implements WritableComparable { * dummy empty constructor */ public KafkaKey() { - this("dummy", 0, 0, 0, 0); + this("dummy", 0, 0, 0); } public KafkaKey(String topic, int partition, long beginOffset, long offset) { - this(topic, partition, beginOffset, offset, 0); + this.set(topic, partition, beginOffset, offset); } - public KafkaKey(String topic, int partition, long beginOffset, long offset, long checksum) { - this.set(topic, partition, beginOffset, offset, checksum); - } - - public void set(String topic, int partition, long beginOffset, long offset, long checksum) { + public void set(String topic, int partition, long beginOffset, long offset) { this.partition = partition; this.beginOffset = beginOffset; this.offset = offset; - this.checksum = checksum; this.topic = topic; } @@ -67,7 +61,6 @@ public void clear() { partition = 0; beginOffset = 0; offset = 0; - checksum = 0; topic = ""; partitionMap = new MapWritable(); } @@ -107,7 +100,6 @@ public void readFields(DataInput in) throws IOException { this.partition = in.readInt(); this.beginOffset = in.readLong(); this.offset = in.readLong(); - this.checksum = in.readLong(); this.topic = in.readUTF(); this.partitionMap = new MapWritable(); this.partitionMap.readFields(in); @@ -118,7 +110,6 @@ public void write(DataOutput out) throws IOException { out.writeInt(this.partition); out.writeLong(this.beginOffset); out.writeLong(this.offset); - out.writeLong(this.checksum); out.writeUTF(this.topic); this.partitionMap.write(out); } @@ -129,10 +120,6 @@ public int compareTo(KafkaKey o) { if (comp != 0) { return comp; } - comp = Long.compare(offset, o.offset); - if (comp != 0) { - return comp; - } - return Long.compare(checksum, o.checksum); + return Long.compare(offset, o.offset); } } diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java index 182d024..2bfebcb 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java @@ -94,8 +94,8 @@ public KafkaMessage getNext(KafkaKey kafkaKey) throws IOException { kafkaKey.clear(); kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getPartition(), currentOffset, - consumerRecord.offset() + 1, consumerRecord.checksum()); - kafkaKey.setMessageSize(consumerRecord.serializedValueSize()); + consumerRecord.offset() + 1); + kafkaKey.setMessageSize(value == null ? -1 : value.length); currentOffset = consumerRecord.offset() + 1; // increase offset return new KafkaMessage(payload, key); } else { diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java index 98a097e..1f2a43e 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java @@ -82,7 +82,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { } key.set(request.getTopic(), request.getPartition(), request.getOffset(), - request.getOffset(), 0); + request.getOffset()); value = null; if (reader != null) { diff --git a/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java b/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java index c32a973..e03a301 100644 --- a/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java +++ b/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java @@ -3,20 +3,16 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractIdleService; -import kafka.metrics.KafkaMetricsReporter; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; +import kafka.utils.Time; import org.I0Itec.zkclient.exception.ZkTimeoutException; -import org.apache.kafka.common.utils.Time; import org.apache.twill.internal.utils.Networks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; -import scala.collection.JavaConverters; -import scala.collection.Seq; import java.net.BindException; -import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -97,11 +93,6 @@ public long nanoseconds() { return System.nanoTime(); } - @Override - public long hiResClockMs() { - return System.currentTimeMillis(); - } - @Override public void sleep(long ms) { try { @@ -110,7 +101,7 @@ public void sleep(long ms) { Thread.interrupted(); } } - }, Option.apply("embedded-server"), (Seq) JavaConverters.asScalaBufferConverter(Collections.EMPTY_LIST).asScala()); + }, Option.apply("embedded-server")); } /** From 4428dbf9c25a7b58dc11d4b1102ff5cbc7a3d5d5 Mon Sep 17 00:00:00 2001 From: yaojiefeng Date: Wed, 6 Dec 2017 14:28:58 -0800 Subject: [PATCH 08/11] fix unit test --- src/test/java/co/cask/hydrator/PipelineTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/co/cask/hydrator/PipelineTest.java b/src/test/java/co/cask/hydrator/PipelineTest.java index 48e5f78..58dae6e 100644 --- a/src/test/java/co/cask/hydrator/PipelineTest.java +++ b/src/test/java/co/cask/hydrator/PipelineTest.java @@ -44,6 +44,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.twill.internal.kafka.client.ZKKafkaClientService; import org.apache.twill.internal.utils.Networks; import org.apache.twill.internal.zookeeper.InMemoryZKServer; @@ -89,7 +90,8 @@ public static void setupTestClass() throws Exception { // this will make our plugins available to data-pipeline. addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"), parentArtifact, - KafkaBatchSource.class); + KafkaBatchSource.class, + RangeAssignor.class); zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); zkServer.startAndWait(); From 758ec57d04701ec8c0278abbb84d185f3c7fd0d2 Mon Sep 17 00:00:00 2001 From: yaojiefeng Date: Fri, 8 Dec 2017 14:49:55 -0800 Subject: [PATCH 09/11] limit max --- .../plugin/batchSource/KafkaBatchSource.java | 14 +++++++++-- .../plugin/batchSource/KafkaInputFormat.java | 18 +++++++------- .../java/co/cask/hydrator/PipelineTest.java | 24 +++++++++++-------- widgets/Kafka-batchsource.json | 5 ++++ 4 files changed, 39 insertions(+), 22 deletions(-) diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java index 7e9fdb1..01d1179 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java @@ -106,6 +106,13 @@ public static class KafkaBatchConfig extends ReferencePluginConfig { @Macro private String initialPartitionOffsets; + @Description("The maximum of messages the source will read from each topic partition. If the current topic " + + "partition does not have this number of messages, the source will read to the latest offset. " + + "Note that this is an estimation, the acutal number of messages the source read may be smaller than this number.") + @Nullable + @Macro + private Long maxNumberRecords; + @Description("Output schema of the source, including the timeField and keyField. " + "The fields excluding keyField are used in conjunction with the format " + "to parse Kafka payloads.") @@ -183,7 +190,6 @@ public String getPrincipal() { @Nullable public String getKeytabLocation() { - return keytabLocation; } @@ -218,6 +224,10 @@ public String getOffsetField() { return Strings.isNullOrEmpty(offsetField) ? null : offsetField; } + public long getMaxNumberRecords() { + return maxNumberRecords == null ? -1 : maxNumberRecords; + } + @Nullable public String getFormat() { return Strings.isNullOrEmpty(format) ? null : format; @@ -426,7 +436,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { } kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf, config.getPartitions(), config.getInitialPartitionOffsets(), - table); + config.getMaxNumberRecords(), table); context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(KafkaInputFormat.class, conf))); } diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java index 776b552..f0a6b50 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java @@ -86,7 +86,7 @@ public List getSplits(JobContext context) throws IOException, Interr static List saveKafkaRequests(Configuration conf, String topic, Map kafkaConf, final Set partitions, Map initOffsets, - KeyValueTable table) throws Exception { + long maxNumberRecords, KeyValueTable table) throws Exception { Properties properties = new Properties(); properties.putAll(kafkaConf); try (Consumer consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { @@ -105,14 +105,8 @@ public boolean apply(PartitionInfo input) { } // Get the latest offsets and generate the KafkaRequests - List finalRequests = createKafkaRequests(consumer, kafkaConf, partitionInfos, initOffsets, table); - - Collections.sort(finalRequests, new Comparator() { - @Override - public int compare(KafkaRequest r1, KafkaRequest r2) { - return r1.getTopic().compareTo(r2.getTopic()); - } - }); + List finalRequests = createKafkaRequests(consumer, kafkaConf, partitionInfos, initOffsets, + maxNumberRecords, table); // TODO: Understand this logic Map offsetKeys = new HashMap<>(); @@ -145,7 +139,7 @@ public int compare(KafkaRequest r1, KafkaRequest r2) { private static List createKafkaRequests(Consumer consumer, Map kafkaConf, List partitionInfos, Map offsets, - KeyValueTable table) { + long maxNumberRecords, KeyValueTable table) { List topicPartitions = Lists.transform(partitionInfos, new Function() { @@ -174,6 +168,10 @@ public TopicPartition apply(PartitionInfo input) { if (earliestOffset == -1) { earliestOffset = latestOffset; } + if (maxNumberRecords > 0) { + latestOffset = + (latestOffset - earliestOffset) <= maxNumberRecords ? latestOffset : (earliestOffset + maxNumberRecords); + } LOG.debug("Getting kafka messages from topic {}, partition {}, with earlistOffset {}, latest offset {}", topicAndPartition.topic(), topicAndPartition.partition(), earliestOffset, latestOffset); KafkaRequest KafkaRequest = new KafkaRequest(kafkaConf, topicAndPartition.topic(), topicAndPartition.partition()); diff --git a/src/test/java/co/cask/hydrator/PipelineTest.java b/src/test/java/co/cask/hydrator/PipelineTest.java index 58dae6e..bc42363 100644 --- a/src/test/java/co/cask/hydrator/PipelineTest.java +++ b/src/test/java/co/cask/hydrator/PipelineTest.java @@ -125,6 +125,7 @@ public void testKafkaSource() throws Exception { sourceProperties.put("referenceName", "kafkaTest"); sourceProperties.put("tableName", "testKafkaSource"); sourceProperties.put("topic", "users"); + sourceProperties.put("maxNumberRecords", "3"); sourceProperties.put("schema", schema.toString()); sourceProperties.put("format", "csv"); ETLStage source = @@ -146,6 +147,7 @@ public void testKafkaSource() throws Exception { messages.put("a", "1,samuel,jackson"); messages.put("b", "2,dwayne,johnson"); messages.put("c", "3,christopher,walken"); + messages.put("d", "4,donald,trump"); sendKafkaMessage("users", messages); @@ -178,19 +180,21 @@ public void testKafkaSource() throws Exception { Assert.assertEquals(3, Bytes.toLong(offset)); messages = new HashMap<>(); - messages.put("d", "4,samuel,jackson"); - messages.put("e", "5,dwayne,johnson"); + messages.put("d", "5,samuel,jackson"); + messages.put("e", "6,dwayne,johnson"); sendKafkaMessage("users", messages); workflowManager.start(); TimeUnit.SECONDS.sleep(10); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES); - final Map expected2 = ImmutableMap.of( - 1L, "samuel jackson", - 2L, "dwayne johnson", - 3L, "christopher walken", - 4L, "samuel jackson", - 5L, "dwayne johnson" - ); + workflowManager.waitForRuns(ProgramRunStatus.COMPLETED, 2, 1, TimeUnit.MINUTES); + final Map expected2 = ImmutableMap.builder() + .put(1L, "samuel jackson") + .put(2L, "dwayne johnson") + .put(3L, "christopher walken") + .put(4L, "donald trump") + .put(5L, "samuel jackson") + .put(6L, "dwayne johnson") + .build(); outputRecords = new HashSet<>(); outputRecords.addAll(MockSink.readOutput(outputManager)); @@ -198,7 +202,7 @@ public void testKafkaSource() throws Exception { for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { actual2.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); } - Assert.assertEquals(5, outputRecords.size()); + Assert.assertEquals(6, outputRecords.size()); Assert.assertEquals(expected2, actual2); } diff --git a/widgets/Kafka-batchsource.json b/widgets/Kafka-batchsource.json index f452bf4..71abc4d 100644 --- a/widgets/Kafka-batchsource.json +++ b/widgets/Kafka-batchsource.json @@ -48,6 +48,11 @@ "value-placeholder": "Offset" } }, + { + "widget-type": "textbox", + "label": "Max Number Records", + "name": "maxNumberRecords" + }, { "widget-type": "textbox", "label": "Key Field", From f7aa95014e05573f5b333b37aa558c876a86e1a9 Mon Sep 17 00:00:00 2001 From: Yakir Roket Date: Tue, 20 Nov 2018 14:23:37 +0200 Subject: [PATCH 10/11] adding batch source support with consumer group id variable --- .../plugin/batchSource/KafkaBatchSource.java | 13 +++++++++++++ widgets/Kafka-batchsource.json | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java index 01d1179..c7ceb3a 100644 --- a/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java +++ b/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.util.ArrayList; @@ -124,6 +125,10 @@ public static class KafkaBatchConfig extends ReferencePluginConfig { @Nullable private String format; + @Description("Optional Consumer Group Id of the Kafka Consumer Group.") + @Nullable + private String consumerGroupId; + @Description("Optional name of the field containing the message key. " + "If this is not set, no key field will be added to output records. " + "If set, this field must be present in the schema property and must be bytes.") @@ -233,6 +238,11 @@ public String getFormat() { return Strings.isNullOrEmpty(format) ? null : format; } + @Nullable + public String getConsumerGroupId() { + return Strings.isNullOrEmpty(consumerGroupId) ? null : consumerGroupId; + } + public Schema getSchema() { try { return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema); @@ -425,6 +435,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { Map kafkaConf = new HashMap<>(); kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers()); kafkaConf.putAll(config.getKafkaProperties()); + if(StringUtils.isNotEmpty(config.getConsumerGroupId())){ + kafkaConf.put(ConsumerConfig.GROUP_ID_CONFIG, config.getConsumerGroupId()); + } if (config.getKeytabLocation() != null && config.getPrincipal() != null) { kafkaConf.put("sasl.jaas.config", String.format("com.sun.security.auth.module.Krb5LoginModule required \n" + " useKeyTab=true \n" + diff --git a/widgets/Kafka-batchsource.json b/widgets/Kafka-batchsource.json index 71abc4d..254a88b 100644 --- a/widgets/Kafka-batchsource.json +++ b/widgets/Kafka-batchsource.json @@ -30,6 +30,11 @@ "label": "Offset Table Name", "name": "tableName" }, + { + "widget-type": "textbox", + "label": "Consumer Group Id", + "name": "consumerGroupId" + }, { "widget-type": "csv", "label": "Topic Partitions", From b31f83b3bada1c7d464d5769d8bba67553016df7 Mon Sep 17 00:00:00 2001 From: Yakir Roket Date: Thu, 22 Nov 2018 13:15:47 +0200 Subject: [PATCH 11/11] using cdap version 5.1.0 --- pom.xml | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index d61f813..6d398a6 100644 --- a/pom.xml +++ b/pom.xml @@ -54,11 +54,17 @@ sonatype-snapshots https://oss.sonatype.org/content/repositories/snapshots + + conjars + Conjars + http://conjars.org/repo + default + UTF-8 - 4.3.1 + 5.1.0 1.8.1 1.6.1 widgets @@ -66,7 +72,7 @@ 0.9.0.1 2.3.0 - system:cdap-data-streams[4.3.0-SNAPSHOT,5.0.0),system:cdap-data-pipeline[4.3.0-SNAPSHOT,5.0.0) + system:cdap-data-streams[5.1.0,6.0.0-SNAPSHOT),system:cdap-data-pipeline[5.1.0,6.0.0-SNAPSHOT) ${project.basedir} @@ -236,6 +242,12 @@ + + org.pentaho + pentaho-aggdesigner-algorithm + 5.1.5-jhyde + test + org.apache.commons commons-lang3