Skip to content

Commit

Permalink
KAFKA-15695: Update the local log start offset of a log after rebuild…
Browse files Browse the repository at this point in the history
…ing the auxiliary state (apache#14649)

Reviewers: Satish Duggana <[email protected]>, Luke Chen <[email protected]>,  Divij Vaidya <[email protected]>, Kamal Chandraprakash<[email protected]>, Alexandre Dupriez <[email protected]>
  • Loading branch information
nikramakrishnan authored and AnatolyPopov committed Dec 21, 2023
1 parent 552fd11 commit 72662ef
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
Partition partition = replicaMgr.getPartitionOrException(topicPartition);
partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset));

// Build leader epoch cache.
// Increment start offsets
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);

// Build leader epoch cache.
List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
if (unifiedLog.leaderEpochCache().isDefined()) {
unifiedLog.leaderEpochCache().get().assign(epochs);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}

private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
lock synchronized {
if (newLocalLogStartOffset > localLogStartOffset()) {
_localLogStartOffset = newLocalLogStartOffset
Expand Down Expand Up @@ -1756,6 +1756,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
leaderEpochCache.foreach(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(newOffset)
logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
if (remoteLogEnabled()) _localLogStartOffset = newOffset
rebuildProducerState(newOffset, producerStateManager)
updateHighWatermark(localLog.logEndOffsetMetadata)
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3642,6 +3642,12 @@ class UnifiedLogTest {
log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(newLogStartOffset, log.logStartOffset)
assertEquals(log.logStartOffset, log.localLogStartOffset())

// Truncate the local log and verify that the offsets are updated to expected values
val newLocalLogStartOffset = 60L;
log.truncateFullyAndStartAt(newLocalLogStartOffset, Option.apply(newLogStartOffset))
assertEquals(newLogStartOffset, log.logStartOffset)
assertEquals(newLocalLogStartOffset, log.localLogStartOffset())
}

private class MockLogOffsetsListener extends LogOffsetsListener {
Expand Down

0 comments on commit 72662ef

Please sign in to comment.