diff --git a/packages/netstring/package.json b/packages/netstring/package.json index ca9843feff..37adadd892 100644 --- a/packages/netstring/package.json +++ b/packages/netstring/package.json @@ -35,6 +35,7 @@ }, "dependencies": { "@endo/init": "^0.5.60", + "@endo/promise-kit": "^0.2.60", "@endo/stream": "^0.3.29", "ses": "^0.18.8" }, diff --git a/packages/netstring/test/test-netstring.js b/packages/netstring/test/test-netstring.js index b2e702f271..4a3309ee9f 100644 --- a/packages/netstring/test/test-netstring.js +++ b/packages/netstring/test/test-netstring.js @@ -187,6 +187,47 @@ const chunkedWrite = async (t, opts) => { test('chunked write', chunkedWrite); test('chunked write (chunked)', chunkedWrite, { chunked: true }); +test('concurrent chunked writes', async t => { + const { array, writer } = makeArrayWriter({ chunked: true }); + const concurrentChunkedMessages = [ + [], + [''], + ['A'], + ['hello', ' ', 'world'], + ['Hello', ', ', 'World', '!\n'], + ]; + await Promise.all( + concurrentChunkedMessages.flatMap(strChunks => [ + writer.next(strChunks.map(strChunk => encoder.encode(strChunk))), + writer.return(), + ]), + ); + + t.deepEqual( + concurrentChunkedMessages.map(strChunks => + encoder.encode(strChunks.join('')), + ), + await read(makeNetstringReader(array)), + ); +}); + +test('writer closes anywhere within chunk', async t => { + for (let count = 0; count < 4; count += 1) { + const pipe = makePipe(); + const writer = makeNetstringWriter(pipe[1], { chunked: true }); + for (let i = 0; i < count; i += 1) { + pipe[0].next(); + } + // close the writer: + pipe[0].return(); + // eslint-disable-next-line no-await-in-loop + const { done } = await writer.next( + ['Hello, ', 'World!\n'].map(str => encoder.encode(str)), + ); + t.assert(done); + } +}); + const varyingMessages = async (t, opts) => { const array = ['', 'A', 'hello']; diff --git a/packages/netstring/writer.js b/packages/netstring/writer.js index d85fe29588..9e28061761 100644 --- a/packages/netstring/writer.js +++ b/packages/netstring/writer.js @@ -1,6 +1,8 @@ // @ts-check /// +import { makePromiseKit } from '@endo/promise-kit'; + const COMMA_BUFFER = new Uint8Array([','.charCodeAt(0)]); /** @param {number} length */ @@ -39,14 +41,32 @@ export const makeNetstringWriter = (output, { chunked = false } = {}) => { const prefix = getLengthPrefixCharCodes(messageLength); if (chunked) { - return Promise.all([ + const ack = makePromiseKit(); + + const partsWritten = [ output.next(new Uint8Array(prefix)), - ...messageChunks.map(async chunk => output.next(chunk)), + ...messageChunks.map(chunk => output.next(chunk)), output.next(COMMA_BUFFER), - ]).then(([r1, r2, r3]) => ({ - done: !!(r1.done || r2.done || r3.done), - value: undefined, - })); + ]; + + // Resolve early if the output writer closes early. + for (const promise of partsWritten) { + promise.then(partWritten => { + if (partWritten.done) { + ack.resolve(partWritten); + } + }); + } + + Promise.all(partsWritten).then(results => { + // Redundant resolution is safe and clean. + ack.resolve({ + done: results.some(({ done }) => done), + value: undefined, + }); + }, ack.reject); + + return ack.promise; } else { const buffer = new Uint8Array(prefix.length + messageLength + 1); buffer.set(prefix, 0);