From 1d2892300a8cae338ecaf5277531e6d202c6925b Mon Sep 17 00:00:00 2001 From: Julien Saguet Date: Mon, 25 Nov 2024 18:57:55 +0100 Subject: [PATCH] fix: always initialize iterator state in RpcOutputStreamController --- .../runtime-rpc/spec/server-streaming-call.spec.ts | 6 +++--- packages/runtime-rpc/src/rpc-output-stream.ts | 14 +++----------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/packages/runtime-rpc/spec/server-streaming-call.spec.ts b/packages/runtime-rpc/spec/server-streaming-call.spec.ts index 88920823..252ef849 100644 --- a/packages/runtime-rpc/spec/server-streaming-call.spec.ts +++ b/packages/runtime-rpc/spec/server-streaming-call.spec.ts @@ -34,7 +34,7 @@ describe('ServerStreamingCall', () => { ); }); - it('should provide correct data', async function () { + it('should provide correct data when getting the values first', async function () { expect(call.requestHeaders).toBe(requestHeaders); expect(call.request).toBe(request); expect(call.responses).toBe(stream); @@ -48,7 +48,7 @@ describe('ServerStreamingCall', () => { }); - it('should run through with wrong order', async function () { + it('should provide correct data when getting the values last', async function () { expect(call.requestHeaders).toBe(requestHeaders); expect(call.request).toBe(request); expect(call.responses).toBe(stream); @@ -58,7 +58,7 @@ describe('ServerStreamingCall', () => { for await (let x of call.responses) { ids.push(x.id); } - expect(ids.length).toBe(0); + expect(ids).toEqual(["one", "two", "three"]); }); it('should provide correct data when finished', async function () { diff --git a/packages/runtime-rpc/src/rpc-output-stream.ts b/packages/runtime-rpc/src/rpc-output-stream.ts index 08452539..187697a0 100644 --- a/packages/runtime-rpc/src/rpc-output-stream.ts +++ b/packages/runtime-rpc/src/rpc-output-stream.ts @@ -61,7 +61,7 @@ type RemoveListenerFn = () => void; /** * A `RpcOutputStream` that you control. */ -export class RpcOutputStreamController { +export class RpcOutputStreamController implements RpcOutputStream { constructor() { @@ -181,14 +181,14 @@ export class RpcOutputStreamController { // iterator state. // is undefined when no iterator has been acquired yet. - private _itState: undefined | { + private _itState: { // a pending result. we yielded that because we were // waiting for messages at the time. p?: Deferred>, // a queue of results that we produced faster that the iterator consumed q: Array | Error>, - }; + } = {q: []}; /** @@ -205,12 +205,6 @@ export class RpcOutputStreamController { * messages are queued. */ [Symbol.asyncIterator](): AsyncIterator { - - // init the iterator state, enabling pushIt() - if (!this._itState) { - this._itState = {q: []}; - } - // if we are closed, we are definitely not receiving any more messages. // but we can't let the iterator get stuck. we want to either: // a) finish the new iterator immediately, because we are completed @@ -249,8 +243,6 @@ export class RpcOutputStreamController { // this either resolves a pending promise, or enqueues the result. private pushIt(result: IteratorResult | Error): void { let state = this._itState; - if (!state) - return; // is the consumer waiting for us? if (state.p) {