Skip to content

Commit

Permalink
Fixed detection of ephemeral entry
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Apr 15, 2024
1 parent 8a76367 commit b3adac3
Showing 1 changed file with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ private sealed class Table : Partition, IReadOnlyList<ReadOnlyMemory<byte>>
{
private const int HeaderSize = 512;

private static readonly ReadOnlyMemory<byte> EmptyMetadata;
private static readonly ReadOnlyMemory<byte> EphemeralMetadata;

static Table()
{
EmptyMetadata = new byte[LogEntryMetadata.Size]; // all zeroes

var ephemeral = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L);
var buffer = new byte[LogEntryMetadata.Size];
ephemeral.Format(buffer);
EphemeralMetadata = buffer;
}

// metadata management
private MemoryOwner<byte> header, footer, metadataBuffer;
private ReadOnlyMemory<byte> payloadBuffer;
Expand All @@ -429,8 +442,7 @@ internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition,
// init ephemeral 0 entry
if (PartitionNumber is 0L)
{
var metadata = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L);
metadata.Format(footer.Span);
EphemeralMetadata.CopyTo(footer.Memory);
}
}

Expand Down Expand Up @@ -458,17 +470,28 @@ internal override void Initialize()
else
{
// read sequentially every log entry
var metadataBuffer = this.metadataBuffer.Span;
var metadataTable = footer.Span;
int footerOffset = 0;
for (long fileOffset = HeaderSize; ; footerOffset += LogEntryMetadata.Size)
int footerOffset;
long fileOffset;

if (PartitionNumber is 0L)
{
footerOffset = LogEntryMetadata.Size;
fileOffset = HeaderSize + LogEntryMetadata.Size;
}
else
{
footerOffset = 0;
fileOffset = HeaderSize;
}

for (Span<byte> metadataBuffer = this.metadataBuffer.Span, metadataTable = footer.Span; ; footerOffset += LogEntryMetadata.Size)
{
var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset);
if (count < LogEntryMetadata.Size)
break;

fileOffset = LogEntryMetadata.GetEndOfLogEntry(metadataBuffer);
if (fileOffset is 0L)
if (fileOffset <= 0L)
break;

metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size));
Expand Down Expand Up @@ -571,14 +594,38 @@ private async ValueTask FlushAndSealAsync(CancellationToken token)

private async ValueTask FlushAndUnsealAsync(CancellationToken token)
{
await writer.WriteAsync(token).ConfigureAwait(false);
await FlushAndEraseNextEntryAsync(token).ConfigureAwait(false);

IsSealed = false;
await WriteHeaderAsync(token).ConfigureAwait(false);

writer.FlushToDisk();
}

private ValueTask FlushAndEraseNextEntryAsync(CancellationToken token)
{
ValueTask task;

// write the rest of the entry,
// then cleanup next entry header to indicate that the current entry is the last entry
if (!writer.HasBufferedData)
{
task = RandomAccess.WriteAsync(Handle, EmptyMetadata, writer.FilePosition, token);
}
else if (writer.Buffer is { Length: >= LogEntryMetadata.Size } emptyMetadataStub)
{
emptyMetadataStub.Span.Slice(0, LogEntryMetadata.Size).Clear();
writer.Produce(LogEntryMetadata.Size);
task = writer.WriteAsync(token);
}
else
{
task = writer.WriteAsync(EmptyMetadata, token);
}

return task;
}

private ValueTask WriteHeaderAsync(CancellationToken token)
=> RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token);

Expand Down

0 comments on commit b3adac3

Please sign in to comment.