Skip to content

Commit

Permalink
Improved lifetime management of log entry context
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jun 9, 2024
1 parent 62a2f11 commit 8f7d060
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,24 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c
var entry = partition.Read(sessionId, startIndex);
var snapshotLength = await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
partition.ClearContext(startIndex);

// Remove log entry from the cache according to eviction policy
var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex;
var completedPartition = startIndex == partition.LastIndex;
if (!entry.IsPersisted)
{
await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (lastEntryInPartition)
if (startIndex == commitIndex | completedPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
partition.ClearContext(startIndex);
}
}
else if (lastEntryInPartition)

if (completedPartition)
{
partition.ClearContext(startIndex);
partition.ClearContext();
}

if (snapshotLength.HasValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,23 +692,24 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT
var entry = partition.Read(sessionId, startIndex);
await ApplyCoreAsync(entry).ConfigureAwait(false);
Volatile.Write(ref lastTerm, entry.Term);
partition.ClearContext(startIndex);

// Remove log entry from the cache according to eviction policy
var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex;
var completedPartition = startIndex == partition.LastIndex;
if (!entry.IsPersisted)
{
await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (lastEntryInPartition)
if (startIndex == commitIndex | completedPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
partition.ClearContext(startIndex);
}
}
else if (lastEntryInPartition)

if (completedPartition)
{
partition.ClearContext(startIndex);
partition.ClearContext();
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,12 @@ internal void ClearContext(long absoluteIndex)
if (context is not null)
{
var relativeIndex = ToRelativeIndex(absoluteIndex);

if (relativeIndex == context.Length - 1)
{
Array.Clear(context);
context = null;
}
else
{
Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(context), relativeIndex) = null;
}
Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(context), relativeIndex) = null;
}
}

internal void ClearContext() => context = null;

internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = false)
{
Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}");
Expand Down

0 comments on commit 8f7d060

Please sign in to comment.