Skip to content

Commit

Permalink
Avoid late records preemptively rotating/commiting S3 output files
Browse files Browse the repository at this point in the history
When late data is arriving on a Kafka partition (e.g. data for the previous
hourly encodedPartition) the following check triggers an immediate rotation
and commit of files:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410

When late data is interleaved with up-to-date data arriving the problem
is exacerbated.
When this happens, a quick succession of rotations cause a large number of
small files to be committed to S3.
This affects both the performance/throughput of Kafka Connect as well
as downstream consumers which need to deal with the many small file fragments.

This PR adds a new `max.open.files.per.partition` S3SinkConnectorConfig.
It defaults to 1, which preserves the current existing behavior.

If set to a value > 1, the following behavior is enabled:

- A separate commit file is kept open for each encodedPartition target
   up to a maximum of `max.open.files.per.partition`

- Only when any of the encodedPartition targets hits its rotation condition
   (`flush.size`, `rotate.interval.ms`) does rotation occur, committing all
   open files. All files are committed so that S3Sink's pre-commit hook will
   commit a high watermark of offset to the Kafka consumer group. This avoids
   buffered gaps of data still being in-flight when that occurs.

It's worth noting that this issue/limitation was previously encountered
and is well-described as part of:
"CC-2313 Handle late arriving records in storage cloud sink connectors"
#187

However, that feature was subsequently reverted:
a2ce6fc
confluentinc/kafka-connect-storage-common#87

N.B. Unlike the solution proposed on CC-2313, we do not opt to write
late data to an incorrect encodedPartition. i.e. late data for hour 7 will
not land in a path/file for hour 8
  • Loading branch information
frankgrimes97 committed Oct 21, 2022
1 parent 918730d commit 9351a51
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
public static final String ELASTIC_BUFFER_INIT_CAPACITY = "s3.elastic.buffer.init.capacity";
public static final int ELASTIC_BUFFER_INIT_CAPACITY_DEFAULT = 128 * 1024; // 128KB

public static final String MAX_OPEN_FILES_PER_PARTITION_CONFIG = "max.open.files.per.partition";
public static final int MAX_OPEN_FILES_PER_PARTITION_DEFAULT = 1;

private final String name;

private final Map<String, ComposableConfig> propertyToConfig = new HashMap<>();
Expand Down Expand Up @@ -717,6 +720,19 @@ public static ConfigDef newConfigDef() {
"Elastic buffer initial capacity"
);

configDef.define(
MAX_OPEN_FILES_PER_PARTITION_CONFIG,
Type.INT,
MAX_OPEN_FILES_PER_PARTITION_DEFAULT,
atLeast(1),
Importance.LOW,
"Max open files per partition.",
group,
++orderInGroup,
Width.LONG,
"Max open files per partition"
);

}
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@

import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PART_RETRIES_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_RETRY_BACKOFF_CONFIG;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;

public class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
Expand All @@ -73,20 +76,23 @@ public class TopicPartitionWriter {
private final SinkTaskContext context;
private final boolean isTaggingEnabled;
private final boolean ignoreTaggingErrors;
private int recordCount;
private long recordCount;
private final int flushSize;
private final long rotateIntervalMs;
private final long rotateScheduleIntervalMs;
private long nextScheduledRotation;
private long currentOffset;
private final int maxOpenFilesPerPartition;
private Long currentTimestamp;
private String currentEncodedPartition;
private Long baseRecordTimestamp;
private final Map<String, Long> currentTimestamps;
private final Set<String> openEncodedPartitions;
private final Map<String, Long> baseRecordTimestamps;
private Long offsetToCommit;
private final RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
private final Map<String, Long> startOffsets;
private final Map<String, Long> endOffsets;
private final Map<String, Long> recordCounts;
private final Map<String, LongAdder> encodedPartitionRecordCounts;
private long timeoutMs;
private long failureTime;
private final StorageSchemaCompatibility compatibility;
Expand Down Expand Up @@ -156,13 +162,19 @@ public TopicPartitionWriter(TopicPartition tp,
compatibility = StorageSchemaCompatibility.getCompatibility(
connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG));

maxOpenFilesPerPartition = connectorConfig.getInt(
S3SinkConnectorConfig.MAX_OPEN_FILES_PER_PARTITION_CONFIG);

buffer = new LinkedList<>();
commitFiles = new HashMap<>();
writers = new HashMap<>();
currentSchemas = new HashMap<>();
currentTimestamps = new HashMap<>();
openEncodedPartitions = new HashSet<>();
baseRecordTimestamps = new HashMap<>();
startOffsets = new HashMap<>();
endOffsets = new HashMap<>();
recordCounts = new HashMap<>();
encodedPartitionRecordCounts = new HashMap<>();
state = State.WRITE_STARTED;
failureTime = -1L;
currentOffset = -1L;
Expand Down Expand Up @@ -226,13 +238,6 @@ private void executeState(long now) {
// fallthrough
case WRITE_PARTITION_PAUSED:
SinkRecord record = buffer.peek();
if (timestampExtractor != null) {
currentTimestamp = timestampExtractor.extract(record, now);
if (baseRecordTimestamp == null) {
baseRecordTimestamp = currentTimestamp;
}
}
Schema valueSchema = record.valueSchema();
String encodedPartition;
try {
encodedPartition = partitioner.encodePartition(record, now);
Expand All @@ -245,6 +250,17 @@ private void executeState(long now) {
throw e;
}
}

if (timestampExtractor != null) {
final long currentRecordTimestamp = timestampExtractor.extract(record, now);
currentTimestamp = currentRecordTimestamp;
currentTimestamps.put(encodedPartition, currentRecordTimestamp);
if (!baseRecordTimestamps.containsKey(encodedPartition)) {
baseRecordTimestamps.put(encodedPartition, currentRecordTimestamp);
}
}

Schema valueSchema = record.valueSchema();
Schema currentValueSchema = currentSchemas.get(encodedPartition);
if (currentValueSchema == null) {
currentSchemas.put(encodedPartition, valueSchema);
Expand Down Expand Up @@ -284,9 +300,14 @@ private boolean checkRotationOrAppend(
String encodedPartition,
long now
) {

// rotateOnTime is safe to go before writeRecord, because it is acceptable
// even for a faulty record to trigger time-based rotation if it applies
if (rotateOnTime(encodedPartition, currentTimestamp, now)) {
if (rotateOnTime(
encodedPartition,
currentTimestamps.getOrDefault(encodedPartition, currentTimestamp),
now)) {

setNextScheduledRotation();
nextState();
return true;
Expand Down Expand Up @@ -315,11 +336,11 @@ private boolean checkRotationOrAppend(
return false;
}

if (rotateOnSize()) {
if (rotateOnSize(encodedPartition)) {
log.info(
"Starting commit and rotation for topic partition {} with start offset {}",
tp,
startOffsets
"Starting commit and rotation for encodedPartition {} with start offset {}",
encodedPartition,
startOffsets.getOrDefault(encodedPartition, 0L)
);
nextState();
return true;
Expand All @@ -330,18 +351,46 @@ private boolean checkRotationOrAppend(

private void commitOnTimeIfNoData(long now) {
if (buffer.isEmpty()) {
boolean shouldCommitFiles = false;
// committing files after waiting for rotateIntervalMs time but less than flush.size
// records available
if (recordCount > 0 && rotateOnTime(currentEncodedPartition, currentTimestamp, now)) {
if (maxOpenFilesPerPartition == 1
&& recordCount > 0
&& rotateOnTime(currentEncodedPartition, currentTimestamp, now)) {

log.info(
"Committing files after waiting for rotateIntervalMs time but less than flush.size "
+ "records available."
);
setNextScheduledRotation();

commitFiles();
shouldCommitFiles = true;
} else if (maxOpenFilesPerPartition > 1) {
for (String encodedPartition : openEncodedPartitions) {
if (rotateOnTime(
encodedPartition,
currentTimestamps.getOrDefault(encodedPartition, now),
now)) {

log.info(
"Committing files after waiting for rotateIntervalMs time for encodedPartition "
+ "'{}' but less than flush.size records available.",
encodedPartition);

setNextScheduledRotation();

// At least one encodedPartition needs committing, so commit all so that
// the S3SinkTask's preCommit contains no buffered offsets, only
// those writen to S3
shouldCommitFiles = true;
break;
}
}
}

if (shouldCommitFiles) {
commitFiles();
}
resume();
setState(State.WRITE_STARTED);
}
Expand Down Expand Up @@ -398,23 +447,53 @@ private void setState(State state) {
this.state = state;
}

private static final boolean recordTimestampExceedsRotationInterval(
final Long recordTimestamp,
final Long baseRecordTimestamp,
final long rotateIntervalMs) {

return recordTimestamp - baseRecordTimestamp >= rotateIntervalMs;
}

private static final boolean encodedPartitionChangeNecessitatesRotation(
final String encodedPartition,
final String currentEncodedPartition,
final int maxOpenFilesPerPartition) {

return !encodedPartition.equals(currentEncodedPartition)
&& maxOpenFilesPerPartition == 1;
}

private boolean rotateOnTime(String encodedPartition, Long recordTimestamp, long now) {
if (recordCount <= 0) {
return false;
}

final Long baseRecordTimestamp = baseRecordTimestamps.getOrDefault(encodedPartition, -1L);
// rotateIntervalMs > 0 implies timestampExtractor != null
boolean periodicRotation = rotateIntervalMs > 0
&& timestampExtractor != null
&& (
recordTimestamp - baseRecordTimestamp >= rotateIntervalMs
|| !encodedPartition.equals(currentEncodedPartition)
);
final boolean hasTimestampExtractor = rotateIntervalMs > 0 && timestampExtractor != null;
boolean periodicRotation = hasTimestampExtractor
&& (recordTimestampExceedsRotationInterval(
recordTimestamp,
baseRecordTimestamp,
rotateIntervalMs)
|| encodedPartitionChangeNecessitatesRotation(
encodedPartition,
currentEncodedPartition,
maxOpenFilesPerPartition)
|| baseRecordTimestamps.size() > maxOpenFilesPerPartition
);

log.trace(
"Checking rotation on time for topic-partition '{}' "
+ "with recordCount '{}' and encodedPartition '{}'",
tp,
recordCount,
maxOpenFilesPerPartition == 1
? recordCount
: encodedPartitionRecordCounts.getOrDefault(
encodedPartition,
new LongAdder()
).longValue(),
encodedPartition
);

Expand Down Expand Up @@ -471,12 +550,25 @@ private void setNextScheduledRotation() {
}
}

private boolean rotateOnSize() {
boolean messageSizeRotation = recordCount >= flushSize;
private static final long getRecordCount(
final String encodedPartition,
final Map<String, LongAdder> recordCountsPerEncodedPartition) {

return recordCountsPerEncodedPartition
.getOrDefault(encodedPartition, new LongAdder())
.longValue();
}

private boolean rotateOnSize(final String encodedPartition) {
long encodedPartitionRecordCount = maxOpenFilesPerPartition == 1 ? recordCount : getRecordCount(
encodedPartition,
encodedPartitionRecordCounts
);
boolean messageSizeRotation = encodedPartitionRecordCount >= flushSize;
log.trace("Should apply size-based rotation for topic-partition '{}':"
+ " (count {} >= flush size {})? {}",
tp,
recordCount,
encodedPartitionRecordCount,
flushSize,
messageSizeRotation
);
Expand Down Expand Up @@ -580,10 +672,11 @@ private boolean writeRecord(SinkRecord record, String encodedPartition) {
}

currentEncodedPartition = encodedPartition;
openEncodedPartitions.add(encodedPartition);
currentOffset = record.kafkaOffset();
if (shouldRemoveStartOffset) {
log.trace(
"Setting writer's start offset for '{}' to {}",
"Setting writer's current offset for '{}' to {}",
currentEncodedPartition,
currentOffset
);
Expand All @@ -592,12 +685,15 @@ private boolean writeRecord(SinkRecord record, String encodedPartition) {
// value, we know that we have at least one record. This allows us
// to initialize all our maps at the same time, and saves future
// checks on the existence of keys
recordCounts.put(currentEncodedPartition, 0L);
endOffsets.put(currentEncodedPartition, 0L);
encodedPartitionRecordCounts.remove(currentEncodedPartition);
endOffsets.remove(currentEncodedPartition);
}
++recordCount;

recordCounts.put(currentEncodedPartition, recordCounts.get(currentEncodedPartition) + 1);
encodedPartitionRecordCounts.computeIfAbsent(
currentEncodedPartition,
k -> new LongAdder()
).increment();
endOffsets.put(currentEncodedPartition, currentOffset);
return true;
}
Expand All @@ -615,15 +711,15 @@ private void commitFiles() {
}
startOffsets.remove(encodedPartition);
endOffsets.remove(encodedPartition);
recordCounts.remove(encodedPartition);
encodedPartitionRecordCounts.remove(encodedPartition);
log.debug("Committed {} for {}", entry.getValue(), tp);
baseRecordTimestamps.remove(encodedPartition);
}

offsetToCommit = currentOffset + 1;
commitFiles.clear();
currentSchemas.clear();
recordCount = 0;
baseRecordTimestamp = null;
log.info("Files committed to S3. Target commit offset for {} is {}", tp, offsetToCommit);
}

Expand All @@ -645,8 +741,8 @@ private void commitFile(String encodedPartition) {
private void tagFile(String encodedPartition, String s3ObjectPath) {
Long startOffset = startOffsets.get(encodedPartition);
Long endOffset = endOffsets.get(encodedPartition);
Long recordCount = recordCounts.get(encodedPartition);
if (startOffset == null || endOffset == null || recordCount == null) {
Long recordCount = getRecordCount(encodedPartition, encodedPartitionRecordCounts);
if (startOffset == null || endOffset == null || recordCount == 0L) {
log.warn(
"Missing tags when attempting to tag file {}. "
+ "Starting offset tag: {}, "
Expand All @@ -655,7 +751,7 @@ private void tagFile(String encodedPartition, String s3ObjectPath) {
encodedPartition,
startOffset == null ? "missing" : startOffset,
endOffset == null ? "missing" : endOffset,
recordCount == null ? "missing" : recordCount
recordCount == 0L ? "missing" : recordCount
);
return;
}
Expand Down
Loading

0 comments on commit 9351a51

Please sign in to comment.