Skip to content

Commit

Permalink
feat: improve download bytes performance while tracking progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Tienisto committed Nov 2, 2024
1 parent e4f7719 commit 551b42d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 11 deletions.
2 changes: 2 additions & 0 deletions rhttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ Please note that request and response bodies must be either `Stream` or `Uint8Li

The parameter `total` can be `-1` if the total size is unknown.

It always emits the final value with `sent` / `received` and `total` being equal after the request is finished.

```dart
final request = Rhttp.post(
'https://example.com',
Expand Down
23 changes: 12 additions & 11 deletions rhttp/lib/src/request.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import 'package:rhttp/src/rust/api/error.dart' as rust_error;
import 'package:rhttp/src/rust/api/http.dart' as rust;
import 'package:rhttp/src/rust/api/stream.dart' as rust_stream;
import 'package:rhttp/src/rust/lib.dart' as rust_lib;
import 'package:rhttp/src/util/byte_stream_converter.dart';
import 'package:rhttp/src/util/collection.dart';
import 'package:rhttp/src/util/progress_notifier.dart';
import 'package:rhttp/src/util/stream_listener.dart';
Expand Down Expand Up @@ -55,7 +56,7 @@ Future<HttpResponse> requestInternalGeneric(HttpRequest request) async {
// transform to Stream
request = request.copyWith(
body: HttpBody.stream(
Stream.fromIterable(body.bytes.toList().map((e) => [e])),
body.bytes.toStream(chunkSize: 1024),
length: body.bytes.length,
),
);
Expand Down Expand Up @@ -95,6 +96,7 @@ Future<HttpResponse> requestInternalGeneric(HttpRequest request) async {
await sender.add(data: data);
},
onDone: () async {
sendNotifier?.notifyDone(bodyLength);
await sender.close();
});
requestBodyStream = receiver;
Expand Down Expand Up @@ -172,10 +174,14 @@ Future<HttpResponse> requestInternalGeneric(HttpRequest request) async {
?.$2 ??
'-1';
final contentLength = int.tryParse(contentLengthStr) ?? -1;
stream = stream.map((event) {
receiveNotifier!.notify(event.length, contentLength);
return event;
});
final backingStream = stream;
stream = () async* {
await for (final chunk in backingStream) {
receiveNotifier!.notify(chunk.length, contentLength);
yield chunk;
}
receiveNotifier!.notifyDone(contentLength);
}();
}

HttpResponse response = parseHttpResponse(
Expand All @@ -185,17 +191,12 @@ Future<HttpResponse> requestInternalGeneric(HttpRequest request) async {
);

if (convertBackToBytes) {
// Using BytesBuilder to efficiently concatenate all bytes
final bytesBuilder = BytesBuilder(copy: false);
await for (final chunk in stream) {
bytesBuilder.add(chunk);
}
response = HttpBytesResponse(
request: request,
version: response.version,
statusCode: response.statusCode,
headers: response.headers,
body: bytesBuilder.takeBytes(),
body: await stream.toUint8List(),
);
}

Expand Down
31 changes: 31 additions & 0 deletions rhttp/lib/src/util/byte_stream_converter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import 'dart:typed_data';

extension Uint8ListExt on Uint8List {
Stream<Uint8List> toStream({required int chunkSize}) async* {
final byteArray = this;

int offset = 0;
while (offset < byteArray.length) {
int end = offset + chunkSize;
if (end > byteArray.length) end = byteArray.length;

// Create a view of the data without copying
var chunk = Uint8List.sublistView(byteArray, offset, end);
yield chunk;

offset = end;
}
}
}

extension ByteStreamExt on Stream<List<int>> {
Future<Uint8List> toUint8List() async {
// Using BytesBuilder to efficiently concatenate all bytes
final bytesBuilder = BytesBuilder(copy: false);
await for (final chunk in this) {
bytesBuilder.add(chunk);
}

return bytesBuilder.takeBytes();
}
}
4 changes: 4 additions & 0 deletions rhttp/lib/src/util/progress_notifier.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ class ProgressNotifier {
onProgress(bytes, total);
}
}

void notifyDone(int total) {
onProgress(total, total);
}
}

0 comments on commit 551b42d

Please sign in to comment.