Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/2.2' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
chtyim committed Oct 17, 2019
2 parents dd846f4 + 99d7c8a commit d6b932f
Show file tree
Hide file tree
Showing 21 changed files with 1,181 additions and 710 deletions.
9 changes: 7 additions & 2 deletions kafka-plugins-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
<artifactId>kafka-plugins-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
Expand Down Expand Up @@ -182,8 +187,8 @@
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.0.0,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.0.0,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
</cdapArtifacts>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.Alert;
import io.cdap.cdap.etl.api.AlertPublisher;
import io.cdap.cdap.etl.api.AlertPublisherContext;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.plugin.common.KafkaHelpers;
import io.cdap.plugin.common.KeyValueListParser;
Expand Down Expand Up @@ -62,13 +63,14 @@ public KafkaAlertPublisher(Config config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
config.validate();
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
}

@Override
public void initialize(AlertPublisherContext context) throws Exception {
super.initialize(context);
config.validate();
config.validate(context.getFailureCollector());
context.getFailureCollector().getOrThrowException();
Properties props = new Properties();
// Add client id property with stage name as value.
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getStageName());
Expand Down Expand Up @@ -112,6 +114,7 @@ public void destroy() {
*/
public static class Config extends PluginConfig {

public static final String TOPIC = "topic";
@Name("brokers")
@Description("Specifies the connection string where Producer can find one or more brokers to " +
"determine the leader for each topic.")
Expand Down Expand Up @@ -157,7 +160,7 @@ private Map<String, String> getProducerProperties() {
return producerProps;
}

private void validate() {
private void validate(FailureCollector collector) {
// If the topic or brokers are macros they would not be available at config time. So do not perform
// validations yet.
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(brokers)) {
Expand All @@ -167,11 +170,13 @@ private void validate() {
try {
Topic.validate(topic);
} catch (InvalidTopicException e) {
throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()));
collector.addFailure(String.format(
"Topic name %s is not a valid kafka topic. Please provide valid kafka topic name. %s", topic,
e.getMessage()), null)
.withConfigProperty(TOPIC);
}

KafkaHelpers.validateKerberosSetting(principal, keytabLocation);
KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.common.io.ByteBuffers;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
Expand All @@ -53,6 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -122,9 +125,9 @@ public Map<String, String> getKafkaProperties() {
return conf;
}

public void validate() {
super.validate();
KafkaHelpers.validateKerberosSetting(principal, keytabLocation);
public void validate(FailureCollector collector) {
super.validate(collector);
KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector);
}
}

Expand All @@ -134,16 +137,23 @@ public KafkaBatchSource(Kafka10BatchConfig config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector failureCollector = stageConfigurer.getFailureCollector();
config.validate(failureCollector);
stageConfigurer.setOutputSchema(config.getSchema(failureCollector));
failureCollector.getOrThrowException();
}

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
Job job = JobUtils.createInstance();
Configuration conf = job.getConfiguration();

KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();
FailureCollector failureCollector = context.getFailureCollector();
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector);
Schema schema = config.getSchema(failureCollector);
Set<Integer> partitions = config.getPartitions(failureCollector);
failureCollector.getOrThrowException();

// If the offset directory is provided, try to load the file
if (!context.isPreviewEnabled() && config.getOffsetDir() != null) {
Expand All @@ -167,10 +177,11 @@ public void prepareRun(BatchSourceContext context) throws Exception {
KafkaHelpers.setupKerberosLogin(kafkaConf, config.getPrincipal(), config.getKeytabLocation());
kafkaConf.putAll(config.getKafkaProperties());
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf,
config.getPartitions(), config.getMaxNumberRecords(),
partitions,
config.getMaxNumberRecords(),
partitionOffsets);
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
Schema schema = config.getSchema();

if (schema != null) {
lineageRecorder.createExternalDataset(schema);
if (schema.getFields() != null && !schema.getFields().isEmpty()) {
Expand Down Expand Up @@ -203,8 +214,11 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);

schema = config.getSchema();
Schema messageSchema = config.getMessageSchema();
schema = config.getSchema(context.getFailureCollector());
Schema messageSchema = config.getMessageSchema(context.getFailureCollector());
if (schema == null || messageSchema == null) {
return;
}
for (Schema.Field field : schema.getFields()) {
String name = field.getName();
if (!name.equals(config.getKeyField()) && !name.equals(config.getPartitionField()) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cdap.plugin.common;

import com.google.common.base.Strings;
import io.cdap.cdap.etl.api.FailureCollector;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,8 @@
public final class KafkaHelpers {
private static final Logger LOG = LoggerFactory.getLogger(KafkaHelpers.class);
public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
public static final String PRINCIPAL = "principal";
public static final String KEYTAB = "keytab";

// This class cannot be instantiated
private KafkaHelpers() {
Expand Down Expand Up @@ -113,11 +116,28 @@ public static void setupKerberosLogin(Map<String, ? super String> conf, @Nullabl
*/
public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab) {
if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) {
String emptyField = Strings.isNullOrEmpty(principal) ? "principal" : "keytab";
String emptyField = Strings.isNullOrEmpty(principal) ? PRINCIPAL : KEYTAB;
String message = emptyField + " is empty. When Kerberos security is enabled for Kafka, " +
"then both the principal and the keytab have " +
"to be specified. If Kerberos is not enabled, then both should be empty.";
throw new IllegalArgumentException(message);
}
}

/**
* Validates whether the principal and keytab are both set or both of them are null/empty.
* Stores the result in the provided failureCollector.
*
* @param principal Kerberos principal
* @param keytab Kerberos keytab for the principal
* @param collector input failureCollector into which the error will be added if present
*/
public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab,
FailureCollector collector) {
if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) {
String emptyField = Strings.isNullOrEmpty(principal) ? PRINCIPAL : KEYTAB;
String message = "Field " + emptyField + " must be specified.";
collector.addFailure(message, null).withConfigProperty(emptyField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
Expand Down Expand Up @@ -58,6 +59,8 @@
@Description("KafkaSink to write events to kafka")
public class KafkaBatchSink extends ReferenceBatchSink<StructuredRecord, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchSink.class);
private static final String ASYNC = "async";
private static final String TOPIC = "topic";

// Configuration for the plugin.
private final Config producerConfig;
Expand All @@ -77,7 +80,8 @@ public KafkaBatchSink(Config producerConfig) {
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);

producerConfig.validate();
producerConfig.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
pipelineConfigurer.getStageConfigurer().getFailureCollector().getOrThrowException();
}

@Override
Expand Down Expand Up @@ -135,35 +139,42 @@ public void destroy() {
*/
public static class Config extends ReferencePluginConfig {

@Name("brokers")
private static final String BROKERS = "brokers";
private static final String KEY = "key";
private static final String TOPIC = KafkaBatchSink.TOPIC;
private static final String FORMAT = "format";
private static final String KAFKA_PROPERTIES = "kafkaProperties";
private static final String COMPRESSION_TYPE = "compressionType";

@Name(BROKERS)
@Description("Specifies the connection string where Producer can find one or more brokers to " +
"determine the leader for each topic")
@Macro
private String brokers;

@Name("async")
@Name(ASYNC)
@Description("Specifies whether an acknowledgment is required from broker that message was received. " +
"Default is FALSE")
@Macro
private String async;

@Name("key")
@Name(KEY)
@Description("Specify the key field to be used in the message. Only String Partitioner is supported.")
@Macro
@Nullable
private String key;

@Name("topic")
@Name(TOPIC)
@Description("Topic to which message needs to be published")
@Macro
private String topic;

@Name("format")
@Name(FORMAT)
@Description("Format a structured record should be converted to")
@Macro
private String format;

@Name("kafkaProperties")
@Name(KAFKA_PROPERTIES)
@Description("Additional kafka producer properties to set")
@Macro
@Nullable
Expand All @@ -179,7 +190,7 @@ public static class Config extends ReferencePluginConfig {
@Nullable
private String keytabLocation;

@Name("compressionType")
@Name(COMPRESSION_TYPE)
@Description("Compression type to be applied on message")
@Macro
private String compressionType;
Expand All @@ -196,21 +207,23 @@ public Config(String brokers, String async, String key, String topic, String for
this.compressionType = compressionType;
}

private void validate() {
private void validate(FailureCollector collector) {
if (!async.equalsIgnoreCase("true") && !async.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Async flag has to be either TRUE or FALSE.");
collector.addFailure("Async flag has to be either TRUE or FALSE.", null)
.withConfigProperty(ASYNC);
}

KafkaHelpers.validateKerberosSetting(principal, keytabLocation);
KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector);
}
}

private static class KafkaOutputFormatProvider implements OutputFormatProvider {
public static final String HAS_KEY = "hasKey";
private final Map<String, String> conf;

KafkaOutputFormatProvider(Config kafkaSinkConfig) {
this.conf = new HashMap<>();
conf.put("topic", kafkaSinkConfig.topic);
conf.put(TOPIC, kafkaSinkConfig.topic);

conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.brokers);
conf.put("compression.type", kafkaSinkConfig.compressionType);
Expand All @@ -220,13 +233,13 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider {
KafkaHelpers.setupKerberosLogin(conf, kafkaSinkConfig.principal, kafkaSinkConfig.keytabLocation);
addKafkaProperties(kafkaSinkConfig.kafkaProperties);

conf.put("async", kafkaSinkConfig.async);
conf.put(ASYNC, kafkaSinkConfig.async);
if (kafkaSinkConfig.async.equalsIgnoreCase("true")) {
conf.put(ACKS_REQUIRED, "1");
}

if (!Strings.isNullOrEmpty(kafkaSinkConfig.key)) {
conf.put("hasKey", kafkaSinkConfig.key);
conf.put(HAS_KEY, kafkaSinkConfig.key);
}
}

Expand Down
Loading

0 comments on commit d6b932f

Please sign in to comment.