Skip to content

Commit

Permalink
[Java] Make RecordingLog.invalidateEntry an O(1) operation and non-…
Browse files Browse the repository at this point in the history
…public.
  • Loading branch information
vyazelenko committed Nov 27, 2024
1 parent 653137d commit c9567be
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 43 deletions.
42 changes: 9 additions & 33 deletions aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static final class Entry
* @param type of the entry as a log of a term or a snapshot.
* @param archiveEndpoint archive where the snapshot is located, if
* <code>entryType == ENTRY_TYPE_STANDBY_SNAPSHOT</code>.
* @param isValid indicates if the entry is valid, {@link RecordingLog#invalidateEntry(long, int)}
* @param isValid indicates if the entry is valid, {@link RecordingLog#invalidateEntry(int)}
* marks it invalid.
* @param position of the entry on disk.
* @param entryIndex of the entry on disk.
Expand Down Expand Up @@ -1021,7 +1021,7 @@ public boolean invalidateLatestSnapshot()
final Entry entry = entriesCache.get(i);
if (isValidSnapshot(entry) && entry.serviceId == serviceId)
{
invalidateEntry(entry.leadershipTermId, entry.entryIndex);
invalidateEntry(i);
serviceId++;
}
else
Expand Down Expand Up @@ -1307,42 +1307,18 @@ public void commitLogPosition(final long leadershipTermId, final long logPositio
}
}

/**
* Invalidate an entry in the log, so it is no longer valid. Be careful that the recording log is not left in an
* invalid state for recovery.
*
* @param leadershipTermId to match for validation.
* @param entryIndex reached in the leadership term.
* @see #invalidateLatestSnapshot()
*/
public void invalidateEntry(final long leadershipTermId, final int entryIndex)
void invalidateEntry(final int index)
{
Entry invalidEntry = null;
final Entry invalidEntry = entriesCache.get(index).invalidate();
entriesCache.set(index, invalidEntry);

for (int i = entriesCache.size() - 1; i >= 0; i--)
if (ENTRY_TYPE_TERM == invalidEntry.type)
{
final Entry entry = entriesCache.get(i);
if (entry.leadershipTermId == leadershipTermId && entry.entryIndex == entryIndex)
{
invalidEntry = entry.invalidate();
entriesCache.set(i, invalidEntry);

if (ENTRY_TYPE_TERM == entry.type)
{
cacheIndexByLeadershipTermIdMap.remove(leadershipTermId);
}
else if (ENTRY_TYPE_SNAPSHOT == entry.type || ENTRY_TYPE_STANDBY_SNAPSHOT == entry.type)
{
invalidSnapshots.add(i);
}

break;
}
cacheIndexByLeadershipTermIdMap.remove(invalidEntry.leadershipTermId);
}

if (null == invalidEntry)
else if (ENTRY_TYPE_SNAPSHOT == invalidEntry.type || ENTRY_TYPE_STANDBY_SNAPSHOT == invalidEntry.type)
{
throw new ClusterException("unknown entry index: " + entryIndex);
invalidSnapshots.add(index);
}

final int invalidEntryType = ENTRY_TYPE_INVALID_FLAG | invalidEntry.type;
Expand Down
20 changes: 10 additions & 10 deletions aeron-cluster/src/test/java/io/aeron/cluster/RecordingLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ void shouldIgnoreInvalidMidSnapshotInRecoveryPlan()
recordingLog.appendSnapshot(5, 1L, 0, 999L, 0, 0);
recordingLog.appendSnapshot(6, 1L, 0, 999L, 0, SERVICE_ID);

recordingLog.invalidateEntry(1L, 2);
recordingLog.invalidateEntry(1L, 3);
recordingLog.invalidateEntry(2);
recordingLog.invalidateEntry(3);
}

try (RecordingLog recordingLog = new RecordingLog(tempDir, true))
Expand Down Expand Up @@ -168,7 +168,7 @@ void shouldIgnoreInvalidTermInRecoveryPlan()
assertNotNull(lastTerm);
assertEquals(999L, lastTerm.termBaseLogPosition);

recordingLog.invalidateEntry(removedLeadershipTerm, 6);
recordingLog.invalidateEntry(6);
}

try (RecordingLog recordingLog = new RecordingLog(tempDir, true))
Expand Down Expand Up @@ -350,8 +350,8 @@ void shouldRecoverSnapshotsMidLogMarkedInvalid()
recordingLog.appendSnapshot(5L, 1L, 10, 888L, 0, 0);
recordingLog.appendSnapshot(6L, 1L, 10, 888L, 0, SERVICE_ID);

recordingLog.invalidateEntry(1L, 2);
recordingLog.invalidateEntry(1L, 3);
recordingLog.invalidateEntry(2);
recordingLog.invalidateEntry(3);
}

try (RecordingLog recordingLog = new RecordingLog(tempDir, true))
Expand Down Expand Up @@ -529,7 +529,7 @@ void shouldAppendSnapshotWithLeadershipTermIdOutOfOrder()
{
recordingLog.appendTerm(3, 1, 0, 0);
recordingLog.appendSnapshot(10, 1, 0, 56, 42, SERVICE_ID);
recordingLog.invalidateEntry(1, 1);
recordingLog.invalidateEntry(1);

recordingLog.commitLogPosition(1, 200);
recordingLog.appendTerm(3, 2, 200, 555);
Expand Down Expand Up @@ -612,7 +612,7 @@ void appendTermShouldOnlyAllowASingleValidTermForTheSameLeadershipTermId()
recordingLog.appendTerm(8, 0, 0, 0);
recordingLog.appendTerm(8, 1, 1, 1);

recordingLog.invalidateEntry(0, 0);
recordingLog.invalidateEntry(0);
recordingLog.appendTerm(8, 0, 100, 100);

final ClusterException exception = assertThrows(ClusterException.class,
Expand Down Expand Up @@ -658,8 +658,8 @@ void entriesInTheRecordingLogShouldBeSorted()
recordingLog.appendTerm(0, 3, 500, 30);
recordingLog.appendTerm(0, 2, 1_000_000, 1_000_000);

recordingLog.invalidateEntry(1, 1);
recordingLog.invalidateEntry(2, 6);
recordingLog.invalidateEntry(1);
recordingLog.invalidateEntry(5);

recordingLog.appendTerm(0, 2, 400, 20);
recordingLog.appendTerm(0, 1, 90, 9);
Expand All @@ -671,7 +671,7 @@ void entriesInTheRecordingLogShouldBeSorted()
recordingLog.appendSnapshot(0, 1, 0, 777, 42, 2);
recordingLog.appendSnapshot(0, 2, 400, 1400, 200, 0);

recordingLog.invalidateEntry(2, 10);
recordingLog.invalidateEntry(8);

recordingLog.appendStandbySnapshot(0, 2, 400, 1400, 200, SERVICE_ID, archiveEndpoint);
recordingLog.appendStandbySnapshot(0, 2, 400, 1400, 200, 1, archiveEndpoint);
Expand Down

0 comments on commit c9567be

Please sign in to comment.