Skip to content

Commit

Permalink
Fixed incorrect I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Apr 15, 2024
1 parent c1fb3e1 commit 8a76367
Showing 1 changed file with 65 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ private protected abstract class Partition : ConcurrentStorageAccess
internal readonly long FirstIndex, PartitionNumber, LastIndex;
private Partition? previous, next;
protected MemoryOwner<CacheRecord> entryCache;
protected int runningIndex;

protected Partition(DirectoryInfo location, int offset, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, FileAttributes attributes = FileAttributes.NotContentIndexed)
: base(Path.Combine(location.FullName, partitionNumber.ToString(InvariantCulture)), offset, bufferSize, manager.BufferAllocator, readersCount, writeMode, initialSize, attributes)
Expand Down Expand Up @@ -142,14 +143,16 @@ internal ValueTask PersistCachedEntryAsync(long absoluteIndex, bool removeFromMe
? ValueTask.CompletedTask
: removeFromMemory
? PersistAndDeleteAsync(cachedEntry.Content.Memory, index, offset)
: PersistAsync(cachedEntry.Content.Memory, offset);
: PersistAsync(cachedEntry.Content.Memory, index, offset);
}

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
private async ValueTask PersistAsync(ReadOnlyMemory<byte> content, long offset)
private async ValueTask PersistAsync(ReadOnlyMemory<byte> content, int index, long offset)
{
await SetWritePositionAsync(offset).ConfigureAwait(false);
await writer.WriteAsync(content).ConfigureAwait(false);

runningIndex = index;
}

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
Expand All @@ -165,6 +168,8 @@ private async ValueTask PersistAndDeleteAsync(ReadOnlyMemory<byte> content, int
{
entryCache[index].Dispose();
}

runningIndex = index;
}

internal long GetTerm(long absoluteIndex)
Expand Down Expand Up @@ -213,13 +218,13 @@ internal ValueTask WriteAsync<TEntry>(TEntry entry, long absoluteIndex, Cancella
// Perf: we can skip FileWriter internal buffer and write cached log entry directly to the disk
switch (cachedEntry.PersistenceMode)
{
case CachedLogEntryPersistenceMode.CopyToBuffer:
goto exit;
case CachedLogEntryPersistenceMode.SkipBuffer:
case CachedLogEntryPersistenceMode.SkipBuffer when !writer.HasBufferedData:
return WriteThroughAsync(cachedEntry, relativeIndex, token);
default:
case CachedLogEntryPersistenceMode.None:
OnCached(in cachedEntry, relativeIndex);
return ValueTask.CompletedTask;
default:
goto exit;
}
}

Expand Down Expand Up @@ -331,6 +336,8 @@ protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index,
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
{
Debug.Assert(writer.HasBufferedData is false);

if (entry.Length > maxLogEntrySize)
throw new InvalidOperationException(ExceptionMessages.LogEntryPayloadTooLarge);

Expand All @@ -341,6 +348,8 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i
await RandomAccess.WriteAsync(Handle, this, GetMetadataOffset(index), token).ConfigureAwait(false);
payloadBuffer = default;
metadataTable[index].Metadata = metadata;

writer.FilePosition = metadata.End;
}

ReadOnlyMemory<byte> IReadOnlyList<ReadOnlyMemory<byte>>.this[int index] => index switch
Expand Down Expand Up @@ -507,32 +516,14 @@ protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index,
}

metadata.Format(GetMetadataSpan(index));

if (index == LastIndex)
{
// write footer with metadata table
await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false);
RandomAccess.SetLength(Handle, metadata.End + footer.Length);

// seal the partition
IsSealed = true;
}
else if (IsSealed)
{
// unseal
IsSealed = false;
}
else
{
return;
}

await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false);
runningIndex = index;
}

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
{
Debug.Assert(writer.HasBufferedData is false);

var writeAddress = GetWriteAddress(index);
var startPos = writeAddress + LogEntryMetadata.Size;
var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length);
Expand All @@ -543,27 +534,9 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i
payloadBuffer = default;

metadata.Format(GetMetadataSpan(index));
runningIndex = index;

if (index == LastIndex)
{
// write footer with metadata table
await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false);
RandomAccess.SetLength(Handle, metadata.End + footer.Length);

// seal the partition
IsSealed = true;
}
else if (IsSealed)
{
// unseal
IsSealed = false;
}
else
{
return;
}

await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false);
writer.FilePosition = metadata.End;
}

protected override void OnCached(in CachedLogEntry cachedEntry, int index)
Expand All @@ -573,6 +546,45 @@ protected override void OnCached(in CachedLogEntry cachedEntry, int index)
metadata.Format(GetMetadataSpan(index));
}

public override ValueTask FlushAsync(CancellationToken token = default)
{
return runningIndex == LastIndex
? FlushAndSealAsync(token)
: IsSealed
? FlushAndUnsealAsync(token)
: base.FlushAsync(token);
}

private async ValueTask FlushAndSealAsync(CancellationToken token)
{
await writer.WriteAsync(token).ConfigureAwait(false);

// write footer with metadata table
await WriteFooterAsync(token).ConfigureAwait(false);
RandomAccess.SetLength(Handle, writer.FilePosition + footer.Length);

IsSealed = true;
await WriteHeaderAsync(token).ConfigureAwait(false);

writer.FlushToDisk();
}

private async ValueTask FlushAndUnsealAsync(CancellationToken token)
{
await writer.WriteAsync(token).ConfigureAwait(false);

IsSealed = false;
await WriteHeaderAsync(token).ConfigureAwait(false);

writer.FlushToDisk();
}

private ValueTask WriteHeaderAsync(CancellationToken token)
=> RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token);

private ValueTask WriteFooterAsync(CancellationToken token)
=> RandomAccess.WriteAsync(Handle, footer.Memory, writer.FilePosition, token);

ReadOnlyMemory<byte> IReadOnlyList<ReadOnlyMemory<byte>>.this[int index] => index switch
{
0 => metadataBuffer.Memory,
Expand Down Expand Up @@ -655,7 +667,7 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par
Debug.Assert(FirstPartition is null);
Debug.Assert(partition is null);
FirstPartition = LastPartition = partition = CreatePartition(partitionNumber);
goto exit;
return;
}

Debug.Assert(FirstPartition is not null);
Expand All @@ -669,14 +681,14 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par
if (previous < 0)
{
partition = Append(partitionNumber, partition);
goto exit;
return;
}

// nothing on the right side, create new tail
if (partition.IsLast)
{
LastPartition = partition = Append(partitionNumber, partition);
goto exit;
return;
}

partition = partition.Next;
Expand All @@ -685,28 +697,25 @@ private void GetOrCreatePartition(long recordIndex, [NotNull] ref Partition? par
if (previous > 0)
{
partition = Prepend(partitionNumber, partition);
goto exit;
return;
}

// nothing on the left side, create new head
if (partition.IsFirst)
{
FirstPartition = partition = Prepend(partitionNumber, partition);
goto exit;
return;
}

partition = partition.Previous;
break;
default:
goto exit;
return;
}

Debug.Assert(partition is not null);
}

exit:
return;

Partition Prepend(long partitionNumber, Partition partition)
{
var tmp = CreatePartition(partitionNumber);
Expand Down

0 comments on commit 8a76367

Please sign in to comment.