Skip to content

Commit

Permalink
Merge pull request #3 from attentive-mobile/alpha/from-3.0-FLINK-28820
Browse files Browse the repository at this point in the history
[FLINK-28820] Improve the writing performance of PulsarSink
  • Loading branch information
dialalpha authored Apr 24, 2023
2 parents 91b479f + 15b2781 commit f3d0969
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 70 deletions.
1 change: 1 addition & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ private PulsarSinkOptions() {
"The allowed transaction recommit times if we meet some retryable exception."
+ " This is used in Pulsar Transaction.");

/** @deprecated This config option was removed for better performance. */
@Deprecated
public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ public static <T> ProducerBuilder<T> createProducerBuilder(
PULSAR_SEND_TIMEOUT_MS,
Math::toIntExact,
ms -> builder.sendTimeout(ms, MILLISECONDS));
configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages);
configuration.useOption(
PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
builder::maxPendingMessagesAcrossPartitions);
configuration.useOption(
PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Objects;

import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
Expand All @@ -51,7 +50,6 @@ public class SinkConfiguration extends PulsarConfiguration {
private final int partitionSwitchSize;
private final MessageKeyHash messageKeyHash;
private final boolean enableSchemaEvolution;
private final int maxPendingMessages;
private final int maxRecommitTimes;

public SinkConfiguration(Configuration configuration) {
Expand All @@ -63,7 +61,6 @@ public SinkConfiguration(Configuration configuration) {
this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM);
this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
}

Expand Down Expand Up @@ -111,14 +108,6 @@ public boolean isEnableSchemaEvolution() {
return enableSchemaEvolution;
}

/**
* Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a
* Pulsar writer instance.
*/
public int getMaxPendingMessages() {
return maxPendingMessages;
}

/** The maximum allowed recommitting time for a Pulsar transaction. */
public int getMaxRecommitTimes() {
return maxRecommitTimes;
Expand All @@ -141,7 +130,6 @@ public boolean equals(Object o) {
&& partitionSwitchSize == that.partitionSwitchSize
&& enableSchemaEvolution == that.enableSchemaEvolution
&& messageKeyHash == that.messageKeyHash
&& maxPendingMessages == that.maxPendingMessages
&& maxRecommitTimes == that.maxRecommitTimes;
}

Expand All @@ -154,7 +142,6 @@ public int hashCode() {
partitionSwitchSize,
messageKeyHash,
enableSchemaEvolution,
maxPendingMessages,
maxRecommitTimes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.base.Strings;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.emptyList;
import static org.apache.flink.util.IOUtils.closeAll;
Expand All @@ -66,18 +65,16 @@
public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);

private final SinkConfiguration sinkConfiguration;
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
private final TopicRouter<IN> topicRouter;
private final MessageDelayer<IN> messageDelayer;
private final SinkUserCallback<IN> userCallback;
private final DeliveryGuarantee deliveryGuarantee;
private final PulsarSinkContext sinkContext;
private final MailboxExecutor mailboxExecutor;
private final TopicProducerRegister producerRegister;

private long pendingMessages = 0;
private final MailboxExecutor mailboxExecutor;
private final AtomicLong pendingMessages = new AtomicLong(0);

/**
* Constructor creating a Pulsar writer.
Expand All @@ -101,7 +98,7 @@ public PulsarWriter(
MessageDelayer<IN> messageDelayer,
InitContext initContext,
SinkUserCallback<IN> userCallback) {
this.sinkConfiguration = checkNotNull(sinkConfiguration);
checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRouter = checkNotNull(topicRouter);
Expand All @@ -111,7 +108,6 @@ public PulsarWriter(

this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
this.mailboxExecutor = initContext.getMailboxExecutor();

// Initialize topic metadata listener.
LOG.debug("Initialize topic metadata after creating Pulsar writer.");
Expand All @@ -132,6 +128,7 @@ public PulsarWriter(

// Create this producer register after opening serialization schema!
this.producerRegister = new TopicProducerRegister(sinkConfiguration);
this.mailboxExecutor = initContext.getMailboxExecutor();
}

@Override
Expand Down Expand Up @@ -164,44 +161,29 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
invokeUserCallbackAfterSend(element, message, topic, id, ex);
});
} else {
// Waiting for permits to write message.
requirePermits();
mailboxExecutor.execute(
() -> enqueueMessageSending(topic, builder, element, message),
"Failed to send message to Pulsar");
}
}

private void enqueueMessageSending(
String topic, TypedMessageBuilder<?> builder, IN element, PulsarMessage<?> message)
throws ExecutionException, InterruptedException {
// Block the mailbox executor for yield method.
builder.sendAsync()
.whenComplete(
(id, ex) -> {
this.releasePermits();
if (ex != null) {
throw new FlinkRuntimeException(
"Failed to send data to Pulsar " + topic, ex);
} else {
LOG.debug(
"Sent message to Pulsar {} with message id {}", topic, id);
}
invokeUserCallbackAfterSend(element, message, topic, id, ex);
})
.get();
}

private void requirePermits() throws InterruptedException {
while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
LOG.info("Waiting for the available permits.");
mailboxExecutor.yield();
// Increase the pending message count.
pendingMessages.incrementAndGet();
builder.sendAsync()
.whenComplete(
(id, ex) -> {
pendingMessages.decrementAndGet();
if (ex != null) {
mailboxExecutor.execute(
() -> {
throw new FlinkRuntimeException(
"Failed to send data to Pulsar " + topic,
ex);
},
"Failed to send data to Pulsar");
} else {
LOG.debug(
"Sent message to Pulsar {} with message id {}",
topic,
id);
}
invokeUserCallbackAfterSend(element, message, topic, id, ex);
});
}
pendingMessages++;
}

private void releasePermits() {
this.pendingMessages -= 1;
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down Expand Up @@ -292,15 +274,15 @@ private void invokeUserCallbackAfterSend(
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
if (endOfInput) {
// Try flush only once when we meet the end of the input.
public void flush(boolean endOfInput) throws IOException {
if (endOfInput || deliveryGuarantee != DeliveryGuarantee.NONE) {
LOG.info("Flush the pending messages to Pulsar.");

// Try to flush pending messages.
producerRegister.flush();
} else {
while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) {
// Make sure all the pending messages should be flushed to Pulsar.
while (pendingMessages.longValue() > 0) {
producerRegister.flush();
LOG.info("Flush the pending messages to Pulsar.");
mailboxExecutor.yield();
}
}
}
Expand Down

0 comments on commit f3d0969

Please sign in to comment.