diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 26628ed9dfa7..53f6468c089e 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -878,7 +878,9 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada } } if (shouldDeleteSegment) { - logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + } logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); } @@ -895,7 +897,9 @@ public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadat remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals // are ascending with in an epoch. - logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + } logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 3cdbb6b347b3..3c15280adf67 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -1920,6 +1920,75 @@ public void testDeletionOnRetentionBreachedSegments(long retentionSize, assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count()); } + @ParameterizedTest(name = "testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} retentionMs={1}") + @CsvSource(value = {"0, -1", "-1, 0"}) + public void testDeletionOnOverlappingRetentionBreachedSegments(long retentionSize, + long retentionMs) + throws RemoteStorageException, ExecutionException, InterruptedException { + Map logProps = new HashMap<>(); + logProps.put("retention.bytes", retentionSize); + logProps.put("retention.ms", retentionMs); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + List epochEntries = Collections.singletonList(epochEntry0); + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + RemoteLogSegmentMetadata metadata1 = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024, + epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED) + .get(0); + // overlapping segment + RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), + metadata1.startOffset(), metadata1.endOffset() + 5, metadata1.maxTimestampMs(), + metadata1.brokerId() + 1, metadata1.eventTimestampMs(), metadata1.segmentSizeInBytes() + 128, + metadata1.customMetadata(), metadata1.state(), metadata1.segmentLeaderEpochs()); + + // When there are overlapping/duplicate segments, the RemoteLogMetadataManager#listRemoteLogSegments + // returns the segments in order of (valid ++ unreferenced) segments: + // (eg) B0 uploaded segment S0 with offsets 0-100 and B1 uploaded segment S1 with offsets 0-200. + // We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. + // The order of segments returned by listRemoteLogSegments will be S1, S0. + // While computing the next-log-start-offset, taking the max of deleted segment's end-offset + 1. + List metadataList = new ArrayList<>(); + metadataList.add(metadata2); + metadataList.add(metadata1); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(metadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenAnswer(ans -> metadataList.iterator()); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenReturn(CompletableFuture.runAsync(() -> { })); + + // Verify the metrics for remote deletes and for failures is zero before attempt to delete segments + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count()); + // Verify aggregate metrics + assertEquals(0, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + task.cleanupExpiredRemoteLogSegments(); + + assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1)); + + // Verify the metric for remote delete is updated correctly + assertEquals(2, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count()); + // Verify we did not report any failure for remote deletes + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count()); + // Verify aggregate metrics + assertEquals(2, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count()); + } + @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments retentionSize={0} retentionMs={1}") @CsvSource(value = {"0, -1", "-1, 0"}) public void testRemoteDeleteLagsOnRetentionBreachedSegments(long retentionSize,