Skip to content

Commit

Permalink
KAFKA-16890: Compute valid log-start-offset when deleting overlapping…
Browse files Browse the repository at this point in the history
… remote segments (apache#16237)

The listRemoteLogSegments returns the metadata list sorted by the start-offset. However, the returned metadata list contains all the uploaded segment information including the duplicate and overlapping remote-log-segments. The reason for duplicate/overlapping remote-log-segments cases is explained [here](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java#L103).

The list returned by the RLMM#listRemoteLogSegments can contain the duplicate segment metadata at the end of the list. So, while computing the next log-start-offset we should take the maximum of segments (end-offset + 1).

Reviewers: Satish Duggana <[email protected]>
  • Loading branch information
kamalcph authored and jeqo committed Jul 23, 2024
1 parent 2878c2d commit 017e471
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
8 changes: 6 additions & 2 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,9 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada
}
}
if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
}
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize);
}
Expand All @@ -895,7 +897,9 @@ public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadat
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
// are ascending with in an epoch.
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
}
logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs);
}
Expand Down
69 changes: 69 additions & 0 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,75 @@ public void testDeletionOnRetentionBreachedSegments(long retentionSize,
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
}

@ParameterizedTest(name = "testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} retentionMs={1}")
@CsvSource(value = {"0, -1", "-1, 0"})
public void testDeletionOnOverlappingRetentionBreachedSegments(long retentionSize,
long retentionMs)
throws RemoteStorageException, ExecutionException, InterruptedException {
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", retentionSize);
logProps.put("retention.ms", retentionMs);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);

List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);

RemoteLogSegmentMetadata metadata1 = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024,
epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED)
.get(0);
// overlapping segment
RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
metadata1.startOffset(), metadata1.endOffset() + 5, metadata1.maxTimestampMs(),
metadata1.brokerId() + 1, metadata1.eventTimestampMs(), metadata1.segmentSizeInBytes() + 128,
metadata1.customMetadata(), metadata1.state(), metadata1.segmentLeaderEpochs());

// When there are overlapping/duplicate segments, the RemoteLogMetadataManager#listRemoteLogSegments
// returns the segments in order of (valid ++ unreferenced) segments:
// (eg) B0 uploaded segment S0 with offsets 0-100 and B1 uploaded segment S1 with offsets 0-200.
// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds.
// The order of segments returned by listRemoteLogSegments will be S1, S0.
// While computing the next-log-start-offset, taking the max of deleted segment's end-offset + 1.
List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
metadataList.add(metadata2);
metadataList.add(metadata1);

when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenAnswer(ans -> metadataList.iterator());
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenReturn(CompletableFuture.runAsync(() -> { }));

// Verify the metrics for remote deletes and for failures is zero before attempt to delete segments
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(0);
task.cleanupExpiredRemoteLogSegments();

assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));

// Verify the metric for remote delete is updated correctly
assertEquals(2, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
// Verify we did not report any failure for remote deletes
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
// Verify aggregate metrics
assertEquals(2, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
}

@ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments retentionSize={0} retentionMs={1}")
@CsvSource(value = {"0, -1", "-1, 0"})
public void testRemoteDeleteLagsOnRetentionBreachedSegments(long retentionSize,
Expand Down

0 comments on commit 017e471

Please sign in to comment.