From 551b42decffc39e3b4784a161d8eb8173593cfc4 Mon Sep 17 00:00:00 2001 From: Tien Do Nam Date: Sat, 2 Nov 2024 20:32:12 +0100 Subject: [PATCH] feat: improve download bytes performance while tracking progress --- rhttp/README.md | 2 ++ rhttp/lib/src/request.dart | 23 +++++++------- rhttp/lib/src/util/byte_stream_converter.dart | 31 +++++++++++++++++++ rhttp/lib/src/util/progress_notifier.dart | 4 +++ 4 files changed, 49 insertions(+), 11 deletions(-) create mode 100644 rhttp/lib/src/util/byte_stream_converter.dart diff --git a/rhttp/README.md b/rhttp/README.md index b2417b7..8907f74 100644 --- a/rhttp/README.md +++ b/rhttp/README.md @@ -319,6 +319,8 @@ Please note that request and response bodies must be either `Stream` or `Uint8Li The parameter `total` can be `-1` if the total size is unknown. +It always emits the final value with `sent` / `received` and `total` being equal after the request is finished. + ```dart final request = Rhttp.post( 'https://example.com', diff --git a/rhttp/lib/src/request.dart b/rhttp/lib/src/request.dart index 7efdbdd..9e86c31 100644 --- a/rhttp/lib/src/request.dart +++ b/rhttp/lib/src/request.dart @@ -14,6 +14,7 @@ import 'package:rhttp/src/rust/api/error.dart' as rust_error; import 'package:rhttp/src/rust/api/http.dart' as rust; import 'package:rhttp/src/rust/api/stream.dart' as rust_stream; import 'package:rhttp/src/rust/lib.dart' as rust_lib; +import 'package:rhttp/src/util/byte_stream_converter.dart'; import 'package:rhttp/src/util/collection.dart'; import 'package:rhttp/src/util/progress_notifier.dart'; import 'package:rhttp/src/util/stream_listener.dart'; @@ -55,7 +56,7 @@ Future requestInternalGeneric(HttpRequest request) async { // transform to Stream request = request.copyWith( body: HttpBody.stream( - Stream.fromIterable(body.bytes.toList().map((e) => [e])), + body.bytes.toStream(chunkSize: 1024), length: body.bytes.length, ), ); @@ -95,6 +96,7 @@ Future requestInternalGeneric(HttpRequest request) async { await sender.add(data: data); }, onDone: () async { + sendNotifier?.notifyDone(bodyLength); await sender.close(); }); requestBodyStream = receiver; @@ -172,10 +174,14 @@ Future requestInternalGeneric(HttpRequest request) async { ?.$2 ?? '-1'; final contentLength = int.tryParse(contentLengthStr) ?? -1; - stream = stream.map((event) { - receiveNotifier!.notify(event.length, contentLength); - return event; - }); + final backingStream = stream; + stream = () async* { + await for (final chunk in backingStream) { + receiveNotifier!.notify(chunk.length, contentLength); + yield chunk; + } + receiveNotifier!.notifyDone(contentLength); + }(); } HttpResponse response = parseHttpResponse( @@ -185,17 +191,12 @@ Future requestInternalGeneric(HttpRequest request) async { ); if (convertBackToBytes) { - // Using BytesBuilder to efficiently concatenate all bytes - final bytesBuilder = BytesBuilder(copy: false); - await for (final chunk in stream) { - bytesBuilder.add(chunk); - } response = HttpBytesResponse( request: request, version: response.version, statusCode: response.statusCode, headers: response.headers, - body: bytesBuilder.takeBytes(), + body: await stream.toUint8List(), ); } diff --git a/rhttp/lib/src/util/byte_stream_converter.dart b/rhttp/lib/src/util/byte_stream_converter.dart new file mode 100644 index 0000000..699fe51 --- /dev/null +++ b/rhttp/lib/src/util/byte_stream_converter.dart @@ -0,0 +1,31 @@ +import 'dart:typed_data'; + +extension Uint8ListExt on Uint8List { + Stream toStream({required int chunkSize}) async* { + final byteArray = this; + + int offset = 0; + while (offset < byteArray.length) { + int end = offset + chunkSize; + if (end > byteArray.length) end = byteArray.length; + + // Create a view of the data without copying + var chunk = Uint8List.sublistView(byteArray, offset, end); + yield chunk; + + offset = end; + } + } +} + +extension ByteStreamExt on Stream> { + Future toUint8List() async { + // Using BytesBuilder to efficiently concatenate all bytes + final bytesBuilder = BytesBuilder(copy: false); + await for (final chunk in this) { + bytesBuilder.add(chunk); + } + + return bytesBuilder.takeBytes(); + } +} diff --git a/rhttp/lib/src/util/progress_notifier.dart b/rhttp/lib/src/util/progress_notifier.dart index aa57752..9c97bab 100644 --- a/rhttp/lib/src/util/progress_notifier.dart +++ b/rhttp/lib/src/util/progress_notifier.dart @@ -20,4 +20,8 @@ class ProgressNotifier { onProgress(bytes, total); } } + + void notifyDone(int total) { + onProgress(total, total); + } }