Skip to content

Commit

Permalink
Fixed TarFile stream reader
Browse files Browse the repository at this point in the history
* Streams that do not always return full buffers were not properly handled
* Also added async version of TarFile.EnumerateFiles
  • Loading branch information
LTRData committed Aug 8, 2024
1 parent 146943c commit d2dfc1b
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 3 deletions.
106 changes: 105 additions & 1 deletion Library/DiscUtils.Core/Archives/TarFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
// DEALINGS IN THE SOFTWARE.
//

using LTRData.Extensions.Buffers;
using DiscUtils.Streams;
using LTRData.Extensions.Buffers;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;

namespace DiscUtils.Archives;

Expand Down Expand Up @@ -292,6 +294,108 @@ public static IEnumerable<TarFileData> EnumerateFiles(Stream archive)
}
}

#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP
public static async IAsyncEnumerable<TarFileData> EnumerateFilesAsync(Stream archive, [EnumeratorCancellation] CancellationToken cancellationToken)
{
var hdrBuf = new byte[512];

string long_path = null;

for (; ; )
{
if (await archive.ReadMaximumAsync(hdrBuf.AsMemory(0, 512), cancellationToken).ConfigureAwait(false) < 512)
{
break;
}

var hdr = new TarHeader(hdrBuf);

if (long_path is not null)
{
hdr.FileName = long_path;
long_path = null;
}

if (hdr.FileLength == 0 && string.IsNullOrEmpty(hdr.FileName))
{
break;
}

if (hdr.FileLength == 0)
{
yield return new(hdr, source: null);
}
else if (hdr.FileType == UnixFileType.TarEntryLongLink &&
hdr.FileName == "././@LongLink")
{
var data = ArrayPool<byte>.Shared.Rent(checked((int)hdr.FileLength));
try
{
await archive.ReadExactlyAsync(data.AsMemory(0, (int)hdr.FileLength), cancellationToken).ConfigureAwait(false);

long_path = EncodingUtilities
.GetLatin1Encoding()
.GetString(TarHeader.ReadNullTerminatedString(data.AsSpan(0, (int)hdr.FileLength)));
}
finally
{
ArrayPool<byte>.Shared.Return(data);
}

var moveForward = (int)(-(hdr.FileLength & 511) & 511);

if (await archive.ReadMaximumAsync(hdrBuf.AsMemory(0, moveForward), cancellationToken).ConfigureAwait(false) < moveForward)
{
break;
}
}
else if (archive.CanSeek)
{
var location = archive.Position;

var datastream = new SubStream(archive, location, hdr.FileLength);

yield return new(hdr, datastream);

archive.Position = location + hdr.FileLength + ((-datastream.Length) & 511);
}
else
{
Stream datastream;

if (hdr.FileLength >= FileDatabufferChunkSize)
{
var data = new SparseMemoryBuffer(FileDatabufferChunkSize);

if (await data.WriteFromStreamAsync(0, archive, hdr.FileLength, cancellationToken).ConfigureAwait(false) < hdr.FileLength)
{
throw new EndOfStreamException("Unexpected end of tar stream");
}

datastream = new SparseMemoryStream(data, FileAccess.Read);
}
else
{
var data = new byte[hdr.FileLength];

await archive.ReadExactlyAsync(data, cancellationToken).ConfigureAwait(false);

datastream = new MemoryStream(data, writable: false);
}

var moveForward = (int)((-datastream.Length) & 511);

yield return new(hdr, datastream);

if (await archive.ReadMaximumAsync(hdrBuf.AsMemory(0, moveForward), cancellationToken).ConfigureAwait(false) < moveForward)
{
break;
}
}
}
}
#endif

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
Expand Down
50 changes: 48 additions & 2 deletions Library/DiscUtils.Streams/SparseMemoryBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace DiscUtils.Streams;

Expand Down Expand Up @@ -320,7 +322,7 @@ public long WriteFromStream(long pos, Stream source, long count)
_buffers[chunk] = chunkBuffer;
}

var numRead = source.Read(chunkBuffer, chunkOffset, numToWrite);
var numRead = source.ReadMaximum(chunkBuffer, chunkOffset, numToWrite);

if (numRead <= 0)
{
Expand All @@ -341,4 +343,48 @@ public long WriteFromStream(long pos, Stream source, long count)
return totalWritten;
}

}
/// <summary>
/// Writes from a stream into the sparse buffer.
/// </summary>
/// <param name="pos">The start offset within the sparse buffer.</param>
/// <param name="source">The stream to get data from.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken"></param>
public async ValueTask<long> WriteFromStreamAsync(long pos, Stream source, long count, CancellationToken cancellationToken)
{
long totalWritten = 0;

while (totalWritten < count)
{
var chunk = (int)(pos / ChunkSize);
var chunkOffset = (int)(pos % ChunkSize);
var numToWrite = (int)Math.Min(ChunkSize - chunkOffset, count - totalWritten);

if (!_buffers.TryGetValue(chunk, out var chunkBuffer))
{
chunkBuffer = new byte[ChunkSize];
_buffers[chunk] = chunkBuffer;
}

var numRead = await source.ReadMaximumAsync(chunkBuffer.AsMemory(chunkOffset, numToWrite), cancellationToken).ConfigureAwait(false);

if (numRead <= 0)
{
break;
}

totalWritten += numRead;
pos += numRead;

if (numRead < numToWrite)
{
break;
}
}

_capacity = Math.Max(_capacity, pos);

return totalWritten;
}

}

0 comments on commit d2dfc1b

Please sign in to comment.