Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/use-kafka9-with-groupid #50

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
30 changes: 24 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>co.cask.hydrator</groupId>
<artifactId>kafka-plugins</artifactId>
<version>1.8.0-SNAPSHOT-0.8.2.2</version>
<version>1.9.0-SNAPSHOT-0.10.2.0</version>

<licenses>
<license>
Expand Down Expand Up @@ -54,19 +54,25 @@
<id>sonatype-snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
<repository>
<id>conjars</id>
<name>Conjars</name>
<url>http://conjars.org/repo</url>
<layout>default</layout>
</repository>
</repositories>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cdap.version>4.3.0-SNAPSHOT</cdap.version>
<hydrator.version>1.8.0-SNAPSHOT</hydrator.version>
<cdap.version>5.1.0</cdap.version>
<hydrator.version>1.8.1</hydrator.version>
<spark.version>1.6.1</spark.version>
<widgets.dir>widgets</widgets.dir>
<docs.dir>docs</docs.dir>
<kafka.version>0.8.2.2</kafka.version>
<kafka.version>0.9.0.1</kafka.version>
<hadoop.version>2.3.0</hadoop.version>
<!-- properties for script build step that creates the config files for the artifacts -->
<app.parents>system:cdap-data-streams[4.3.0-SNAPSHOT,5.0.0),system:cdap-data-pipeline[4.3.0-SNAPSHOT,5.0.0)</app.parents>
<app.parents>system:cdap-data-streams[5.1.0,6.0.0-SNAPSHOT),system:cdap-data-pipeline[5.1.0,6.0.0-SNAPSHOT)</app.parents>
<main.basedir>${project.basedir}</main.basedir>
</properties>

Expand Down Expand Up @@ -236,6 +242,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
<version>5.1.5-jhyde</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -306,6 +318,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -328,7 +346,7 @@
<configuration>
<instructions>
<_exportcontents>co.cask.hydrator.plugin.*;org.apache.spark.streaming.kafka.*;
kafka.serializer.*;kafka.common;</_exportcontents>
kafka.serializer.*;kafka.common;org.apache.kafka.common.serialization.*;org.apache.kafka.clients.consumer.*</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
<Embed-Directory>lib</Embed-Directory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import co.cask.hydrator.common.KeyValueListParser;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import kafka.common.InvalidTopicException;
import kafka.common.Topic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@

package co.cask.hydrator.plugin.batchSource;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
Expand Down Expand Up @@ -56,6 +47,17 @@
import kafka.common.TopicAndPartition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

/**
* Kafka batch source.
Expand Down Expand Up @@ -105,6 +107,13 @@ public static class KafkaBatchConfig extends ReferencePluginConfig {
@Macro
private String initialPartitionOffsets;

@Description("The maximum of messages the source will read from each topic partition. If the current topic " +
"partition does not have this number of messages, the source will read to the latest offset. " +
"Note that this is an estimation, the acutal number of messages the source read may be smaller than this number.")
@Nullable
@Macro
private Long maxNumberRecords;

@Description("Output schema of the source, including the timeField and keyField. " +
"The fields excluding keyField are used in conjunction with the format " +
"to parse Kafka payloads.")
Expand All @@ -116,6 +125,10 @@ public static class KafkaBatchConfig extends ReferencePluginConfig {
@Nullable
private String format;

@Description("Optional Consumer Group Id of the Kafka Consumer Group.")
@Nullable
private String consumerGroupId;

@Description("Optional name of the field containing the message key. " +
"If this is not set, no key field will be added to output records. " +
"If set, this field must be present in the schema property and must be bytes.")
Expand All @@ -134,6 +147,21 @@ public static class KafkaBatchConfig extends ReferencePluginConfig {
@Nullable
private String offsetField;

@Description("Additional kafka producer properties to set")
@Macro
@Nullable
private String kafkaProperties;

@Description("The kerberos principal used for the source.")
@Macro
@Nullable
private String principal;

@Description("The keytab location for the kerberos principal when kerberos security is enabled for kafka.")
@Macro
@Nullable
private String keytabLocation;

public KafkaBatchConfig() {
super("");
}
Expand All @@ -160,6 +188,16 @@ public String getTableName() {
return tableName;
}

@Nullable
public String getPrincipal() {
return principal;
}

@Nullable
public String getKeytabLocation() {
return keytabLocation;
}

public Set<Integer> getPartitions() {
Set<Integer> partitionSet = new HashSet<>();
if (partitions == null) {
Expand Down Expand Up @@ -191,11 +229,20 @@ public String getOffsetField() {
return Strings.isNullOrEmpty(offsetField) ? null : offsetField;
}

public long getMaxNumberRecords() {
return maxNumberRecords == null ? -1 : maxNumberRecords;
}

@Nullable
public String getFormat() {
return Strings.isNullOrEmpty(format) ? null : format;
}

@Nullable
public String getConsumerGroupId() {
return Strings.isNullOrEmpty(consumerGroupId) ? null : consumerGroupId;
}

public Schema getSchema() {
try {
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
Expand Down Expand Up @@ -306,6 +353,17 @@ public Map<TopicAndPartition, Long> getInitialPartitionOffsets() {
return partitionOffsets;
}

public Map<String, String> getKafkaProperties() {
KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":");
Map<String, String> conf = new HashMap<>();
if (!Strings.isNullOrEmpty(kafkaProperties)) {
for (KeyValue<String, String> keyVal : kvParser.parse(kafkaProperties)) {
conf.put(keyVal.getKey(), keyVal.getValue());
}
}
return conf;
}

public void validate() {
// brokers can be null since it is macro enabled.
if (kafkaBrokers != null) {
Expand Down Expand Up @@ -374,9 +432,24 @@ public void prepareRun(BatchSourceContext context) throws Exception {
context.createDataset(tableName, KeyValueTable.class.getName(), DatasetProperties.EMPTY);
}
table = context.getDataset(tableName);
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), config.getBrokerMap(),
Map<String, String> kafkaConf = new HashMap<>();
kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers());
kafkaConf.putAll(config.getKafkaProperties());
if(StringUtils.isNotEmpty(config.getConsumerGroupId())){
kafkaConf.put(ConsumerConfig.GROUP_ID_CONFIG, config.getConsumerGroupId());
}
if (config.getKeytabLocation() != null && config.getPrincipal() != null) {
kafkaConf.put("sasl.jaas.config", String.format("com.sun.security.auth.module.Krb5LoginModule required \n" +
" useKeyTab=true \n" +
" storeKey=true \n" +
" useTicketCache=false \n" +
" keyTab=\"%s\" \n" +
" principal=\"%s\";", config.getKeytabLocation(),
config.getPrincipal()));
}
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf,
config.getPartitions(), config.getInitialPartitionOffsets(),
table);
config.getMaxNumberRecords(), table);
context.setInput(Input.of(config.referenceName,
new SourceInputFormatProvider(KafkaInputFormat.class, conf)));
}
Expand Down
Loading