diff --git a/gax/src/streamingCalls/streaming.ts b/gax/src/streamingCalls/streaming.ts index ba2cbe31d..c74facaa3 100644 --- a/gax/src/streamingCalls/streaming.ts +++ b/gax/src/streamingCalls/streaming.ts @@ -271,11 +271,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { */ streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void { let enteredError = false; - const eventsToForward = ['metadata', 'response', 'status']; - - eventsToForward.forEach(event => { - stream.on(event, this.emit.bind(this, event)); - }); + this.eventForwardHelper(stream); stream.on('error', error => { enteredError = true; @@ -296,20 +292,14 @@ export class StreamProxy extends duplexify implements GRPCCallResult { }); } - /** - * Forward events from an API request stream to the user's stream. - * @param {Stream} stream - The API request stream. - * @param {RetryOptions} retry - Configures the exceptions upon which the - * function should retry, and the parameters to the exponential backoff retry - * algorithm. - */ - - forwardEvents(stream: Stream) { + eventForwardHelper(stream: Stream) { const eventsToForward = ['metadata', 'response', 'status']; eventsToForward.forEach(event => { stream.on(event, this.emit.bind(this, event)); }); + } + statusMetadataHelper(stream: Stream) { // gRPC is guaranteed emit the 'status' event but not 'metadata', and 'status' is the last event to emit. // Emit the 'response' event if stream has no 'metadata' event. // This avoids the stream swallowing the other events, such as 'end'. @@ -340,6 +330,18 @@ export class StreamProxy extends duplexify implements GRPCCallResult { }); this._responseHasSent = true; }); + } + /** + * Forward events from an API request stream to the user's stream. + * @param {Stream} stream - The API request stream. + * @param {RetryOptions} retry - Configures the exceptions upon which the + * function should retry, and the parameters to the exponential backoff retry + * algorithm. + */ + forwardEvents(stream: Stream) { + this.eventForwardHelper(stream); + this.statusMetadataHelper(stream); + stream.on('error', error => { GoogleError.parseGRPCStatusDetails(error); }); @@ -368,40 +370,8 @@ export class StreamProxy extends duplexify implements GRPCCallResult { retry: RetryOptions ): CancellableStream | undefined { let retryStream = this.stream; - const eventsToForward = ['metadata', 'response', 'status']; - eventsToForward.forEach(event => { - stream.on(event, this.emit.bind(this, event)); - }); - // gRPC is guaranteed emit the 'status' event but not 'metadata', and 'status' is the last event to emit. - // Emit the 'response' event if stream has no 'metadata' event. - // This avoids the stream swallowing the other events, such as 'end'. - stream.on('status', () => { - if (!this._responseHasSent) { - stream.emit('response', { - code: 200, - details: '', - message: 'OK', - }); - } - }); - - // We also want to supply the status data as 'response' event to support - // the behavior of google-cloud-node expects. - // see: - // https://github.com/GoogleCloudPlatform/google-cloud-node/pull/1775#issuecomment-259141029 - // https://github.com/GoogleCloudPlatform/google-cloud-node/blob/116436fa789d8b0f7fc5100b19b424e3ec63e6bf/packages/common/src/grpc-service.js#L355 - stream.on('metadata', metadata => { - // Create a response object with succeeds. - // TODO: unify this logic with the decoration of gRPC response when it's - // added. see: https://github.com/googleapis/gax-nodejs/issues/65 - stream.emit('response', { - code: 200, - details: '', - message: 'OK', - metadata, - }); - this._responseHasSent = true; - }); + this.eventForwardHelper(stream); + this.statusMetadataHelper(stream); stream.on('error', error => { const timeout = retry.backoffSettings.totalTimeoutMillis; diff --git a/gax/src/streamingRetryRequest.ts b/gax/src/streamingRetryRequest.ts index 3972a26ed..83414f7b7 100644 --- a/gax/src/streamingRetryRequest.ts +++ b/gax/src/streamingRetryRequest.ts @@ -30,7 +30,7 @@ const requestOps = null; const objectMode = true; // we don't support objectMode being false interface streamingRetryRequestOptions { - request?: Function; + request: Function; maxRetries?: number; } /** @@ -40,14 +40,8 @@ interface streamingRetryRequestOptions { */ export function streamingRetryRequest(opts: streamingRetryRequestOptions) { opts = Object.assign({}, DEFAULTS, opts); - if (opts.request === undefined) { - try { - // eslint-disable-next-line node/no-unpublished-require - opts.request = require('request'); - } catch (e) { - throw new Error('A request library must be provided to retry-request.'); - } + throw new Error('A request function must be provided'); } let numNoResponseAttempts = 0; diff --git a/gax/test/unit/streamingRetryRequest.ts b/gax/test/unit/streamingRetryRequest.ts index 5dca3ef32..fb760916a 100644 --- a/gax/test/unit/streamingRetryRequest.ts +++ b/gax/test/unit/streamingRetryRequest.ts @@ -96,14 +96,14 @@ describe('retry-request', () => { }); assert.strictEqual(retryStream._readableState.objectMode, true); }); - it('throws request error', done => { try { const opts = {}; + //@ts-expect-error streamingRetryRequest(opts); } catch (err) { assert(err instanceof Error); - assert.match(err.message, /A request library must be provided/); + assert.match(err.message, /A request function must be provided/); done(); } });