From d60ed055260d423ca0f2d7ac3be1716678f6e549 Mon Sep 17 00:00:00 2001 From: 101arrowz Date: Tue, 6 Feb 2024 17:29:25 -0500 Subject: [PATCH] Improve streaming API --- CHANGELOG.md | 9 ++ README.md | 4 +- docs/README.md | 23 ++++ docs/classes/AsyncDecompress.md | 18 +++ docs/classes/AsyncDeflate.md | 32 +++++ docs/classes/AsyncGunzip.md | 18 +++ docs/classes/AsyncGzip.md | 32 +++++ docs/classes/AsyncInflate.md | 18 +++ docs/classes/AsyncUnzlib.md | 18 +++ docs/classes/AsyncZlib.md | 32 +++++ docs/classes/Deflate.md | 14 ++ docs/classes/Gzip.md | 14 ++ docs/classes/Zlib.md | 14 ++ scripts/rewriteBuilds.ts | 2 +- src/index.ts | 232 +++++++++++++++++++++++++++----- 15 files changed, 445 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb0abe0..f493fca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 0.8.2 +- Fixed broken UMD build +- Fixed edge-case causing skipped data during streaming compression +- Fixed bug in GZIP streaming on member boundary +- Improved streaming performance on inconsistent chunk sizes +- Improved `unzip` performance on undercompressed archives +- Added flushing support into streaming API +- Added backpressure support into async streaming API + - Use new `ondrain` handler and `queuedSize` ## 0.8.1 - Fixed reallocating on pre-supplied buffer in `inflateSync` and `unzlibSync` - Minor documentation fixes diff --git a/README.md b/README.md index 996a863..2dd3adc 100644 --- a/README.md +++ b/README.md @@ -536,9 +536,9 @@ Before you decide that `fflate` is the end-all compression library, you should n ## What about `CompressionStream`? Like `fflate`, the [Compression Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Compression_Streams_API) provides DEFLATE, GZIP, and Zlib compression and decompression support. It's a good option if you'd like to compress or decompress data without installing any third-party libraries, and it wraps native Zlib bindings to achieve better performance than what most JavaScript programs can achieve. -However, browsers do not offer any native non-streaming compression API, and `CompressionStream` has surprisingly poor performance on data already loaded into memory; `fflate` tends to be faster even for files that are dozens of megabytes large. Similarly, `fflate` is much faster for files under a megabyte because it avoids marshalling overheads. Even when streaming hundreds of megabytes of data, the native API usually only performs between 5% slower and 10% faster than `fflate`. And Compression Streams have many other disadvantages - no ability to control compression level, poor support for older browsers, no ZIP support, etc. +However, browsers do not offer any native non-streaming compression API, and `CompressionStream` has surprisingly poor performance on data already loaded into memory; `fflate` tends to be faster even for files that are dozens of megabytes large. Similarly, `fflate` is much faster for files under a megabyte because it avoids marshalling overheads. Even when streaming hundreds of megabytes of data, the native API usually performs between 30% faster and 10% slower than `fflate`. And Compression Streams have many other disadvantages - no ability to control compression level, poor support for older browsers, no ZIP support, etc. -If you'd still prefer to depend upon a native browser API, you can use an `fflate`-based [Compression Streams ponyfill](https://github.com/101arrowz/compression-streams-polyfill) for a seamless transition to `CompressionStream` and `DecompressionStream` if ever they become substantially faster than `fflate`. +If you'd still prefer to depend upon a native browser API but want to support older browsers, you can use an `fflate`-based [Compression Streams ponyfill](https://github.com/101arrowz/compression-streams-polyfill). ## Browser support `fflate` makes heavy use of typed arrays (`Uint8Array`, `Uint16Array`, etc.). Typed arrays can be polyfilled at the cost of performance, but the most recent browser that doesn't support them [is from 2011](https://caniuse.com/typedarrays), so I wouldn't bother. diff --git a/docs/README.md b/docs/README.md index 31cb1de..e9520a4 100644 --- a/docs/README.md +++ b/docs/README.md @@ -71,6 +71,7 @@ ### Type Aliases +- [AsyncFlateDrainHandler](README.md#asyncflatedrainhandler) - [AsyncFlateStreamHandler](README.md#asyncflatestreamhandler) - [AsyncZippableFile](README.md#asynczippablefile) - [FlateCallback](README.md#flatecallback) @@ -135,6 +136,28 @@ Renames and re-exports [gzipSync](README.md#gzipsync) ## Type Aliases +### AsyncFlateDrainHandler + +Ƭ **AsyncFlateDrainHandler**: (`size`: `number`) => `void` + +#### Type declaration + +▸ (`size`): `void` + +Handler for the asynchronous completion of (de)compression for a data chunk + +##### Parameters + +| Name | Type | Description | +| :------ | :------ | :------ | +| `size` | `number` | The number of bytes that were processed. This is measured in terms of the input (i.e. compressed bytes for decompression, uncompressed bytes for compression.) | + +##### Returns + +`void` + +___ + ### AsyncFlateStreamHandler Ƭ **AsyncFlateStreamHandler**: (`err`: [`FlateError`](interfaces/FlateError.md) \| ``null``, `data`: `Uint8Array`, `final`: `boolean`) => `void` diff --git a/docs/classes/AsyncDecompress.md b/docs/classes/AsyncDecompress.md index 6b5a0ae..1337f8a 100644 --- a/docs/classes/AsyncDecompress.md +++ b/docs/classes/AsyncDecompress.md @@ -11,6 +11,8 @@ Asynchronous streaming GZIP, Zlib, or raw DEFLATE decompression ### Properties - [ondata](AsyncDecompress.md#ondata) +- [ondrain](AsyncDecompress.md#ondrain) +- [queuedSize](AsyncDecompress.md#queuedsize) ### Methods @@ -49,6 +51,22 @@ Creates an asynchronous decompression stream The handler to call whenever data is available +___ + +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + +### queuedSize + +• **queuedSize**: `number` + +The number of compressed bytes buffered in the stream + ## Methods ### push diff --git a/docs/classes/AsyncDeflate.md b/docs/classes/AsyncDeflate.md index 7b570e1..35dbb23 100644 --- a/docs/classes/AsyncDeflate.md +++ b/docs/classes/AsyncDeflate.md @@ -11,10 +11,13 @@ Asynchronous streaming DEFLATE compression ### Properties - [ondata](AsyncDeflate.md#ondata) +- [ondrain](AsyncDeflate.md#ondrain) +- [queuedSize](AsyncDeflate.md#queuedsize) - [terminate](AsyncDeflate.md#terminate) ### Methods +- [flush](AsyncDeflate.md#flush) - [push](AsyncDeflate.md#push) ## Constructors @@ -52,6 +55,22 @@ The handler to call whenever data is available ___ +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + +### queuedSize + +• **queuedSize**: `number` + +The number of uncompressed bytes buffered in the stream + +___ + ### terminate • **terminate**: [`AsyncTerminable`](../interfaces/AsyncTerminable.md) @@ -61,6 +80,19 @@ push() will silently fail. ## Methods +### flush + +▸ **flush**(): `void` + +Flushes buffered uncompressed data. Useful to immediately retrieve the +deflated output for small inputs. + +#### Returns + +`void` + +___ + ### push ▸ **push**(`chunk`, `final?`): `void` diff --git a/docs/classes/AsyncGunzip.md b/docs/classes/AsyncGunzip.md index 06cbc29..040c26e 100644 --- a/docs/classes/AsyncGunzip.md +++ b/docs/classes/AsyncGunzip.md @@ -11,7 +11,9 @@ Asynchronous streaming single or multi-member GZIP decompression ### Properties - [ondata](AsyncGunzip.md#ondata) +- [ondrain](AsyncGunzip.md#ondrain) - [onmember](AsyncGunzip.md#onmember) +- [queuedSize](AsyncGunzip.md#queuedsize) - [terminate](AsyncGunzip.md#terminate) ### Methods @@ -53,6 +55,14 @@ The handler to call whenever data is available ___ +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + ### onmember • `Optional` **onmember**: [`GunzipMemberHandler`](../README.md#gunzipmemberhandler) @@ -61,6 +71,14 @@ The handler to call whenever a new GZIP member is found ___ +### queuedSize + +• **queuedSize**: `number` + +The number of compressed bytes buffered in the stream + +___ + ### terminate • **terminate**: [`AsyncTerminable`](../interfaces/AsyncTerminable.md) diff --git a/docs/classes/AsyncGzip.md b/docs/classes/AsyncGzip.md index d34da0d..1cbe5d8 100644 --- a/docs/classes/AsyncGzip.md +++ b/docs/classes/AsyncGzip.md @@ -11,10 +11,13 @@ Asynchronous streaming GZIP compression ### Properties - [ondata](AsyncGzip.md#ondata) +- [ondrain](AsyncGzip.md#ondrain) +- [queuedSize](AsyncGzip.md#queuedsize) - [terminate](AsyncGzip.md#terminate) ### Methods +- [flush](AsyncGzip.md#flush) - [push](AsyncGzip.md#push) ## Constructors @@ -52,6 +55,22 @@ The handler to call whenever data is available ___ +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + +### queuedSize + +• **queuedSize**: `number` + +The number of uncompressed bytes buffered in the stream + +___ + ### terminate • **terminate**: [`AsyncTerminable`](../interfaces/AsyncTerminable.md) @@ -61,6 +80,19 @@ push() will silently fail. ## Methods +### flush + +▸ **flush**(): `void` + +Flushes buffered uncompressed data. Useful to immediately retrieve the +GZIPped output for small inputs. + +#### Returns + +`void` + +___ + ### push ▸ **push**(`chunk`, `final?`): `void` diff --git a/docs/classes/AsyncInflate.md b/docs/classes/AsyncInflate.md index 1607d98..01fa233 100644 --- a/docs/classes/AsyncInflate.md +++ b/docs/classes/AsyncInflate.md @@ -11,6 +11,8 @@ Asynchronous streaming DEFLATE decompression ### Properties - [ondata](AsyncInflate.md#ondata) +- [ondrain](AsyncInflate.md#ondrain) +- [queuedSize](AsyncInflate.md#queuedsize) - [terminate](AsyncInflate.md#terminate) ### Methods @@ -52,6 +54,22 @@ The handler to call whenever data is available ___ +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + +### queuedSize + +• **queuedSize**: `number` + +The number of compressed bytes buffered in the stream + +___ + ### terminate • **terminate**: [`AsyncTerminable`](../interfaces/AsyncTerminable.md) diff --git a/docs/classes/AsyncUnzlib.md b/docs/classes/AsyncUnzlib.md index e5dbfc8..6e5317b 100644 --- a/docs/classes/AsyncUnzlib.md +++ b/docs/classes/AsyncUnzlib.md @@ -11,6 +11,8 @@ Asynchronous streaming Zlib decompression ### Properties - [ondata](AsyncUnzlib.md#ondata) +- [ondrain](AsyncUnzlib.md#ondrain) +- [queuedSize](AsyncUnzlib.md#queuedsize) - [terminate](AsyncUnzlib.md#terminate) ### Methods @@ -52,6 +54,22 @@ The handler to call whenever data is available ___ +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + +### queuedSize + +• **queuedSize**: `number` + +The number of compressed bytes buffered in the stream + +___ + ### terminate • **terminate**: [`AsyncTerminable`](../interfaces/AsyncTerminable.md) diff --git a/docs/classes/AsyncZlib.md b/docs/classes/AsyncZlib.md index 3752f40..eec3bea 100644 --- a/docs/classes/AsyncZlib.md +++ b/docs/classes/AsyncZlib.md @@ -11,10 +11,13 @@ Asynchronous streaming Zlib compression ### Properties - [ondata](AsyncZlib.md#ondata) +- [ondrain](AsyncZlib.md#ondrain) +- [queuedSize](AsyncZlib.md#queuedsize) - [terminate](AsyncZlib.md#terminate) ### Methods +- [flush](AsyncZlib.md#flush) - [push](AsyncZlib.md#push) ## Constructors @@ -52,6 +55,22 @@ The handler to call whenever data is available ___ +### ondrain + +• `Optional` **ondrain**: [`AsyncFlateDrainHandler`](../README.md#asyncflatedrainhandler) + +The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + +___ + +### queuedSize + +• **queuedSize**: `number` + +The number of uncompressed bytes buffered in the stream + +___ + ### terminate • **terminate**: [`AsyncTerminable`](../interfaces/AsyncTerminable.md) @@ -61,6 +80,19 @@ push() will silently fail. ## Methods +### flush + +▸ **flush**(): `void` + +Flushes buffered uncompressed data. Useful to immediately retrieve the +zlibbed output for small inputs. + +#### Returns + +`void` + +___ + ### push ▸ **push**(`chunk`, `final?`): `void` diff --git a/docs/classes/Deflate.md b/docs/classes/Deflate.md index a33526c..0e2816d 100644 --- a/docs/classes/Deflate.md +++ b/docs/classes/Deflate.md @@ -14,6 +14,7 @@ Streaming DEFLATE compression ### Methods +- [flush](Deflate.md#flush) - [push](Deflate.md#push) ## Constructors @@ -51,6 +52,19 @@ The handler to call whenever data is available ## Methods +### flush + +▸ **flush**(): `void` + +Flushes buffered uncompressed data. Useful to immediately retrieve the +deflated output for small inputs. + +#### Returns + +`void` + +___ + ### push ▸ **push**(`chunk`, `final?`): `void` diff --git a/docs/classes/Gzip.md b/docs/classes/Gzip.md index b0838f6..975a041 100644 --- a/docs/classes/Gzip.md +++ b/docs/classes/Gzip.md @@ -14,6 +14,7 @@ Streaming GZIP compression ### Methods +- [flush](Gzip.md#flush) - [push](Gzip.md#push) ## Constructors @@ -51,6 +52,19 @@ The handler to call whenever data is available ## Methods +### flush + +▸ **flush**(): `void` + +Flushes buffered uncompressed data. Useful to immediately retrieve the +GZIPped output for small inputs. + +#### Returns + +`void` + +___ + ### push ▸ **push**(`chunk`, `final?`): `void` diff --git a/docs/classes/Zlib.md b/docs/classes/Zlib.md index 19ad62c..d48dcbd 100644 --- a/docs/classes/Zlib.md +++ b/docs/classes/Zlib.md @@ -14,6 +14,7 @@ Streaming Zlib compression ### Methods +- [flush](Zlib.md#flush) - [push](Zlib.md#push) ## Constructors @@ -51,6 +52,19 @@ The handler to call whenever data is available ## Methods +### flush + +▸ **flush**(): `void` + +Flushes buffered uncompressed data. Useful to immediately retrieve the +zlibbed output for small inputs. + +#### Returns + +`void` + +___ + ### push ▸ **push**(`chunk`, `final?`): `void` diff --git a/scripts/rewriteBuilds.ts b/scripts/rewriteBuilds.ts index 8a02c5d..18d6483 100644 --- a/scripts/rewriteBuilds.ts +++ b/scripts/rewriteBuilds.ts @@ -1,7 +1,7 @@ import { readFileSync, writeFileSync, unlinkSync } from 'fs'; import { join } from 'path'; const atClass = /\/\*\* \@class \*\//g, pure = '/*#__PURE__*/'; -const esModule = /exports\.__esModule = true;\n/; +const esModule = /(exports\.__esModule = true;|Object\.defineProperty\(exports, "__esModule", { value: true }\);)\n/; const libDir = join(__dirname, '..', 'lib'); const libIndex = join(libDir, 'index.js'); const lib = readFileSync(libIndex, 'utf-8') diff --git a/src/index.ts b/src/index.ts index 40799b4..4ebb372 100644 --- a/src/index.ts +++ b/src/index.ts @@ -931,6 +931,13 @@ export type FlateStreamHandler = (data: Uint8Array, final: boolean) => void; */ export type AsyncFlateStreamHandler = (err: FlateError | null, data: Uint8Array, final: boolean) => void; +/** + * Handler for the asynchronous completion of (de)compression for a data chunk + * @param size The number of bytes that were processed. This is measured in terms of the input + * (i.e. compressed bytes for decompression, uncompressed bytes for compression.) + */ +export type AsyncFlateDrainHandler = (size: number) => void; + /** * Callback for asynchronous (de)compression methods * @param err Any error that occurred @@ -1007,7 +1014,7 @@ const dopt = (dat: Uint8Array, opt: DeflateOptions, pre: number, post: number, s st.w = dict.length; } } - return dflt(dat, opt.level == null ? 6 : opt.level, opt.mem == null ? Math.ceil(Math.max(8, Math.min(13, Math.log(dat.length))) * 1.5) : (12 + opt.mem), pre, post, st); + return dflt(dat, opt.level == null ? 6 : opt.level, opt.mem == null ? (st.l ? Math.ceil(Math.max(8, Math.min(13, Math.log(dat.length))) * 1.5) : 20) : (12 + opt.mem), pre, post, st); } @@ -1125,34 +1132,47 @@ type CmpDecmpStrm = Inflate | Deflate | Gzip | Gunzip | Zlib | Unzlib; // auto stream const astrm = (strm: CmpDecmpStrm) => { strm.ondata = (dat, final) => (postMessage as Worker['postMessage'])([dat, final], [dat.buffer]); - return (ev: MessageEvent<[Uint8Array, boolean]>) => strm.push(ev.data[0], ev.data[1]); + return (ev: MessageEvent<[Uint8Array, boolean] | []>) => { + if (ev.data.length) { + strm.push(ev.data[0], ev.data[1]); + (postMessage as Worker['postMessage'])([ev.data[0].length]); + } else (strm as Deflate | Gzip | Zlib).flush() + } } -type Astrm = { ondata: AsyncFlateStreamHandler; push: (d: Uint8Array, f?: boolean) => void; terminate: AsyncTerminable; }; +type Astrm = { ondata: AsyncFlateStreamHandler; push: (d: Uint8Array, f?: boolean) => void; terminate: AsyncTerminable; flush?: () => void; ondrain?: AsyncFlateDrainHandler; queuedSize: number; }; // async stream attach -const astrmify = (fns: (() => unknown[])[], strm: Astrm, opts: T | 0, init: (ev: MessageEvent) => void, id: number, ext?: (msg: unknown) => unknown) => { +const astrmify = (fns: (() => unknown[])[], strm: Astrm, opts: T | 0, init: (ev: MessageEvent) => void, id: number, flush: 0 | 1, ext?: (msg: unknown) => unknown) => { let t: boolean; - const w = wrkr( + const w = wrkr( fns, init, id, (err, dat) => { if (err) w.terminate(), strm.ondata.call(strm, err); else if (!Array.isArray(dat)) ext(dat); - else { + else if (dat.length == 1) { + strm.queuedSize -= dat[0]; + if (strm.ondrain) strm.ondrain(dat[0]); + } else { if (dat[1]) w.terminate(); strm.ondata.call(strm, err, dat[0], dat[1]); } } ) w.postMessage(opts); + strm.queuedSize = 0; strm.push = (d, f) => { if (!strm.ondata) err(5); if (t) strm.ondata(err(4, 0, 1), null, !!f); + strm.queuedSize += d.length; w.postMessage([d, t = f], [d.buffer]); }; strm.terminate = () => { w.terminate(); }; + if (flush) { + strm.flush = () => { w.postMessage([]); }; + } } // read 2 bytes @@ -1284,12 +1304,12 @@ export class Deflate { newBuf.set(this.b.subarray(0, this.s.z)); this.b = newBuf; } + const split = this.b.length - this.s.z; - if (split) { - this.b.set(chunk.subarray(0, split), this.s.z); - this.s.z = this.b.length; - this.p(this.b, false); - } + this.b.set(chunk.subarray(0, split), this.s.z); + this.s.z = this.b.length; + this.p(this.b, false); + this.b.set(this.b.subarray(-32768)); this.b.set(chunk.subarray(split), 32768); this.s.z = chunk.length - split + 32768; @@ -1304,6 +1324,17 @@ export class Deflate { this.s.w = this.s.i, this.s.i -= 2; } } + + /** + * Flushes buffered uncompressed data. Useful to immediately retrieve the + * deflated output for small inputs. + */ + flush() { + if (!this.ondata) err(5); + if (this.s.l) err(4); + this.p(this.b, false); + this.s.w = this.s.i, this.s.i -= 2; + } } /** @@ -1315,6 +1346,16 @@ export class AsyncDeflate { */ ondata: AsyncFlateStreamHandler; + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of uncompressed bytes buffered in the stream + */ + queuedSize: number; + /** * Creates an asynchronous DEFLATE stream * @param opts The compression options @@ -1333,7 +1374,7 @@ export class AsyncDeflate { ], this as unknown as Astrm, StrmOpt.call(this, opts, cb), ev => { const strm = new Deflate(ev.data); onmessage = astrm(strm); - }, 6); + }, 6, 1); } /** @@ -1343,6 +1384,13 @@ export class AsyncDeflate { */ // @ts-ignore push(chunk: Uint8Array, final?: boolean): void; + + /** + * Flushes buffered uncompressed data. Useful to immediately retrieve the + * deflated output for small inputs. + */ + // @ts-ignore + flush(): void; /** * A method to terminate the stream's internal worker. Subsequent calls to @@ -1456,6 +1504,16 @@ export class AsyncInflate { */ ondata: AsyncFlateStreamHandler; + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of compressed bytes buffered in the stream + */ + queuedSize: number; + /** * Creates an asynchronous DEFLATE decompression stream * @param opts The decompression options @@ -1474,7 +1532,7 @@ export class AsyncInflate { ], this as unknown as Astrm, StrmOpt.call(this, opts, cb), ev => { const strm = new Inflate(ev.data); onmessage = astrm(strm); - }, 7); + }, 7, 0); } /** @@ -1573,6 +1631,14 @@ export class Gzip { if (f) wbytes(raw, raw.length - 8, this.c.d()), wbytes(raw, raw.length - 4, this.l); this.ondata(raw, f); } + + /** + * Flushes buffered uncompressed data. Useful to immediately retrieve the + * GZIPped output for small inputs. + */ + flush() { + Deflate.prototype.flush.call(this); + } } /** @@ -1584,6 +1650,16 @@ export class AsyncGzip { */ ondata: AsyncFlateStreamHandler; + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of uncompressed bytes buffered in the stream + */ + queuedSize: number; + /** * Creates an asynchronous GZIP stream * @param opts The compression options @@ -1603,7 +1679,7 @@ export class AsyncGzip { ], this as unknown as Astrm, StrmOpt.call(this, opts, cb), ev => { const strm = new Gzip(ev.data); onmessage = astrm(strm); - }, 8); + }, 8, 1); } /** @@ -1614,6 +1690,13 @@ export class AsyncGzip { // @ts-ignore push(chunk: Uint8Array, final?: boolean): void; + /** + * Flushes buffered uncompressed data. Useful to immediately retrieve the + * GZIPped output for small inputs. + */ + // @ts-ignore + flush(): void; + /** * A method to terminate the stream's internal worker. Subsequent calls to * push() will silently fail. @@ -1721,11 +1804,11 @@ export class Gunzip { // This allows for workerization to function correctly (Inflate.prototype as unknown as { c: typeof Inflate.prototype['c'] }).c.call(this, final); // process concatenated GZIP - if (this.s.f && !this.s.l) { + if (this.s.f && !this.s.l && !final) { this.v = shft(this.s.p) + 9; this.s = { i: 0 }; this.o = new u8(0); - if (this.p.length) this.push(new u8(0), final); + this.push(new u8(0), final); } } } @@ -1738,6 +1821,17 @@ export class AsyncGunzip { * The handler to call whenever data is available */ ondata: AsyncFlateStreamHandler; + + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of compressed bytes buffered in the stream + */ + queuedSize: number; + /** * The handler to call whenever a new GZIP member is found */ @@ -1763,7 +1857,7 @@ export class AsyncGunzip { const strm = new Gunzip(ev.data); strm.onmember = (offset) => (postMessage as Worker['postMessage'])(offset); onmessage = astrm(strm); - }, 9, offset => this.onmember && this.onmember(offset as number)); + }, 9, 0, offset => this.onmember && this.onmember(offset as number)); } /** @@ -1862,6 +1956,14 @@ export class Zlib { if (f) wbytes(raw, raw.length - 4, this.c.d()); this.ondata(raw, f); } + + /** + * Flushes buffered uncompressed data. Useful to immediately retrieve the + * zlibbed output for small inputs. + */ + flush() { + Deflate.prototype.flush.call(this); + } } /** @@ -1873,6 +1975,16 @@ export class AsyncZlib { */ ondata: AsyncFlateStreamHandler; + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of uncompressed bytes buffered in the stream + */ + queuedSize: number; + /** * Creates an asynchronous Zlib stream * @param opts The compression options @@ -1892,7 +2004,7 @@ export class AsyncZlib { ], this as unknown as Astrm, StrmOpt.call(this, opts, cb), ev => { const strm = new Zlib(ev.data); onmessage = astrm(strm); - }, 10); + }, 10, 1); } /** @@ -1903,6 +2015,13 @@ export class AsyncZlib { // @ts-ignore push(chunk: Uint8Array, final?: boolean): void; + /** + * Flushes buffered uncompressed data. Useful to immediately retrieve the + * zlibbed output for small inputs. + */ + // @ts-ignore + flush(): void; + /** * A method to terminate the stream's internal worker. Subsequent calls to * push() will silently fail. @@ -2005,6 +2124,16 @@ export class AsyncUnzlib { */ ondata: AsyncFlateStreamHandler; + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of compressed bytes buffered in the stream + */ + queuedSize: number; + /** * Creates an asynchronous Zlib decompression stream * @param opts The decompression options @@ -2024,7 +2153,7 @@ export class AsyncUnzlib { ], this as unknown as Astrm, StrmOpt.call(this, opts, cb), ev => { const strm = new Unzlib(ev.data); onmessage = astrm(strm); - }, 11); + }, 11, 0); } /** @@ -2085,12 +2214,13 @@ export { gzipSync as compressSync, Gzip as Compress } * Streaming GZIP, Zlib, or raw DEFLATE decompression */ export class Decompress { - private G = Gunzip; - private I = Inflate; - private Z = Unzlib; + private G: typeof Gunzip; + private I: typeof Inflate; + private Z: typeof Unzlib; private o: InflateOptions; private s: Inflate | Gunzip | Unzlib; private p: Uint8Array; + /** * The handler to call whenever data is available */ @@ -2109,6 +2239,17 @@ export class Decompress { constructor(cb?: FlateStreamHandler); constructor(opts?: InflateStreamOptions | FlateStreamHandler, cb?: FlateStreamHandler) { this.o = StrmOpt.call(this, opts, cb) || {}; + this.G = Gunzip; + this.I = Inflate; + this.Z = Unzlib; + } + + // init substream + // overriden by AsyncDecompress + private i() { + this.s.ondata = (dat, final) => { + this.ondata(dat, final); + } } /** @@ -2124,33 +2265,44 @@ export class Decompress { n.set(this.p), n.set(chunk, this.p.length); } else this.p = chunk; if (this.p.length > 2) { - const _this = this; - // enables reuse of this method by AsyncDecompress - const cb: FlateStreamHandler = function() { _this.ondata.apply(_this, arguments); } this.s = (this.p[0] == 31 && this.p[1] == 139 && this.p[2] == 8) - ? new this.G(this.o, cb) + ? new this.G(this.o) : ((this.p[0] & 15) != 8 || (this.p[0] >> 4) > 7 || ((this.p[0] << 8 | this.p[1]) % 31)) - ? new this.I(this.o, cb) - : new this.Z(this.o, cb); + ? new this.I(this.o) + : new this.Z(this.o); + this.i(); this.s.push(this.p, final); this.p = null; } } else this.s.push(chunk, final); } + + } /** * Asynchronous streaming GZIP, Zlib, or raw DEFLATE decompression */ export class AsyncDecompress { - private G = AsyncGunzip; - private I = AsyncInflate; - private Z = AsyncUnzlib; + private G: typeof AsyncGunzip; + private I: typeof AsyncInflate; + private Z: typeof AsyncUnzlib; + /** * The handler to call whenever data is available */ ondata: AsyncFlateStreamHandler; + /** + * The handler to call whenever buffered source data is processed (i.e. `queuedSize` updates) + */ + ondrain?: AsyncFlateDrainHandler; + + /** + * The number of compressed bytes buffered in the stream + */ + queuedSize: number; + /** * Creates an asynchronous decompression stream * @param opts The decompression options @@ -2164,6 +2316,20 @@ export class AsyncDecompress { constructor(cb?: AsyncFlateStreamHandler); constructor(opts?: InflateStreamOptions | AsyncFlateStreamHandler, cb?: AsyncFlateStreamHandler) { Decompress.call(this, opts, cb); + this.queuedSize = 0; + this.G = AsyncGunzip; + this.I = AsyncInflate; + this.Z = AsyncUnzlib; + } + + private i() { + (this as unknown as { s: AsyncInflate }).s.ondata = (err, dat, final) => { + this.ondata(err, dat, final); + } + (this as unknown as { s: AsyncInflate }).s.ondrain = size => { + this.queuedSize -= size; + if (this.ondrain) this.ondrain(size); + } } /** @@ -2172,6 +2338,7 @@ export class AsyncDecompress { * @param final Whether this is the last chunk */ push(chunk: Uint8Array, final?: boolean) { + this.queuedSize += chunk.length; Decompress.prototype.push.call(this, chunk, final); } } @@ -3549,7 +3716,8 @@ export function unzip(data: Uint8Array, opts: AsyncUnzipOptions | UnzipCallback, if (!c) cbl(null, slc(data, b, b + sc)) else if (c == 8) { const infl = data.subarray(b, b + sc); - if (sc < 320000) { + // Synchronously decompress under 512KB, or barely-compressed data + if (su < 524288 || sc > 0.8 * su) { try { cbl(null, inflateSync(infl, { out: new u8(su) })); } catch(e) {