Skip to content

Commit

Permalink
Add payloadAsStream option (#308)
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina authored Oct 25, 2024
1 parent 069dfc7 commit 587cd2f
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 23 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Injects a fake request into an HTTP server.
- `signal` - An `AbortSignal` that may be used to abort an ongoing request. Requires Node v16+.
- `Request` - Optional type from which the `request` object should inherit
instead of `stream.Readable`
- `payloadAsStream` - if set to `true`, the response will be streamed and not accumulated; in this case `res.payload`, `res.rawPayload` will be undefined.
- `callback` - the callback function using the signature `function (err, res)` where:
- `err` - error object
- `res` - a response object where:
Expand Down
4 changes: 3 additions & 1 deletion lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ function Request (options) {
this._lightMyRequest = {
payload,
isDone: false,
simulate: options.simulate || {}
simulate: options.simulate || {},
payloadAsStream: options.payloadAsStream,
signal: options.signal
}

const signal = options.signal
Expand Down
87 changes: 65 additions & 22 deletions lib/response.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
'use strict'

const http = require('node:http')
const { Writable, Readable } = require('node:stream')
const { Writable, Readable, addAbortSignal } = require('node:stream')
const util = require('node:util')

const setCookie = require('set-cookie-parser')

function Response (req, onEnd, reject) {
http.ServerResponse.call(this, req)

this._lightMyRequest = { headers: null, trailers: {}, payloadChunks: [] }
if (req._lightMyRequest?.payloadAsStream) {
this._lightMyRequest = { headers: null, trailers: {}, stream: new Readable({ read () {} }) }
const signal = req._lightMyRequest.signal

if (signal) {
addAbortSignal(signal, this._lightMyRequest.stream)
}
} else {
this._lightMyRequest = { headers: null, trailers: {}, payloadChunks: [] }
}
// This forces node@8 to always render the headers
this.setHeader('foo', 'bar'); this.removeHeader('foo')

Expand All @@ -26,21 +35,37 @@ function Response (req, onEnd, reject) {
}
process.nextTick(() => onEnd(null, payload))
}
this._lightMyRequest.onEndSuccess = onEndSuccess

const onEndFailure = (err) => {
if (called) return
called = true
if (this._promiseCallback) {
return process.nextTick(() => reject(err))
if (this._lightMyRequest.stream) {
const res = generatePayload(this)
res.raw.req = req
this._lightMyRequest.stream._read = function () {
this.destroy(err || new Error('premature close'))
}
onEndSuccess(res)
} else {
if (this._promiseCallback) {
return process.nextTick(() => reject(err))
}
process.nextTick(() => onEnd(err, null))
}
process.nextTick(() => onEnd(err, null))
}

this.once('finish', () => {
const res = generatePayload(this)
res.raw.req = req
onEndSuccess(res)
})
if (this._lightMyRequest.stream) {
this.once('finish', () => {
this._lightMyRequest.stream.push(null)
})
} else {
this.once('finish', () => {
const res = generatePayload(this)
res.raw.req = req
onEndSuccess(res)
})
}

this.connection.once('error', onEndFailure)

Expand All @@ -64,6 +89,10 @@ Response.prototype.writeHead = function () {

copyHeaders(this)

if (this._lightMyRequest.stream) {
this._lightMyRequest.onEndSuccess(generatePayload(this))
}

return result
}

Expand All @@ -72,7 +101,11 @@ Response.prototype.write = function (data, encoding, callback) {
clearTimeout(this.timeoutHandle)
}
http.ServerResponse.prototype.write.call(this, data, encoding, callback)
this._lightMyRequest.payloadChunks.push(Buffer.from(data, encoding))
if (this._lightMyRequest.stream) {
this._lightMyRequest.stream.push(Buffer.from(data, encoding))
} else {
this._lightMyRequest.payloadChunks.push(Buffer.from(data, encoding))
}
return true
}

Expand Down Expand Up @@ -129,22 +162,32 @@ function generatePayload (response) {
}
}

// Prepare payload and trailers
const rawBuffer = Buffer.concat(response._lightMyRequest.payloadChunks)
res.rawPayload = rawBuffer

// we keep both of them for compatibility reasons
res.payload = rawBuffer.toString()
res.body = res.payload
res.trailers = response._lightMyRequest.trailers

// Prepare payload parsers
res.json = function parseJsonPayload () {
return JSON.parse(res.payload)
if (response._lightMyRequest.payloadChunks) {
// Prepare payload and trailers
const rawBuffer = Buffer.concat(response._lightMyRequest.payloadChunks)
res.rawPayload = rawBuffer

// we keep both of them for compatibility reasons
res.payload = rawBuffer.toString()
res.body = res.payload

// Prepare payload parsers
res.json = function parseJsonPayload () {
return JSON.parse(res.payload)
}
} else {
res.json = function () {
throw new Error('Response payload is not available with payloadAsStream: true')
}
}

// Provide stream Readable for advanced user
res.stream = function streamPayload () {
if (response._lightMyRequest.stream) {
return response._lightMyRequest.stream
}
return Readable.from(response._lightMyRequest.payloadChunks)
}

Expand Down Expand Up @@ -179,7 +222,7 @@ function copyHeaders (response) {
// Add raw headers
;['Date', 'Connection', 'Transfer-Encoding'].forEach((name) => {
const regex = new RegExp('\\r\\n' + name + ': ([^\\r]*)\\r\\n')
const field = response._header.match(regex)
const field = response._header?.match(regex)
if (field) {
response._lightMyRequest.headers[name.toLowerCase()] = field[1]
}
Expand Down
Loading

0 comments on commit 587cd2f

Please sign in to comment.