Skip to content

Commit

Permalink
feat: implement content-length and fix status text behavior (#180)
Browse files Browse the repository at this point in the history
* feat: implement content-length and fix status text behavior

* fix: add request streaming tests along with its fixes

* fix: add more tests and fixes
  • Loading branch information
cyco130 authored Nov 6, 2024
1 parent 5f4e686 commit accbe24
Show file tree
Hide file tree
Showing 15 changed files with 1,987 additions and 196 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ jobs:
deno-version: v1.x

- name: Install Bun
if: matrix.os != 'windows-latest'
uses: oven-sh/setup-bun@v2
with:
bun-version: latest
Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"typescript.tsdk": "node_modules/typescript/lib",
"deno.enablePaths": ["./_deno"]
"deno.enablePaths": ["./_deno"],
"deno.enable": false
}
3 changes: 3 additions & 0 deletions packages/adapter/adapter-aws-lambda/src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ export default function awsLambdaAdapter(
const readable = Readable.fromWeb(response.body as any);
readable.pipe(responseStream);
} else {
// Lambda always seems to return 200 if we don't call write first
responseStream.write("");
responseStream.end();
}

await new Promise((resolve, reject) => {
responseStream.on("finish", resolve);
responseStream.on("error", reject);
// This never seems to fire
// responseStream.on("close", () => {});
});
},
);
Expand Down
10 changes: 6 additions & 4 deletions packages/adapter/adapter-netlify-functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export default function netlifyFunctionsAdapter(
handler: HattipHandler<NetlifyFunctionsPlatformInfo>,
): NetlifyFunction {
return async (event, netlifyContext) => {
const requestBody = event.body;

const ip =
event.headers["x-nf-client-connection-ip"] ||
event.headers["client-ip"] ||
Expand All @@ -36,13 +38,13 @@ export default function netlifyFunctionsAdapter(
method: event.httpMethod,

body:
!event.body ||
event.httpMethod === "GET" ||
event.httpMethod === "HEAD"
event.httpMethod === "HEAD" ||
!requestBody
? undefined
: event.isBase64Encoded
? Buffer.from(event.body, "base64")
: event.body,
? Buffer.from(requestBody, "base64")
: requestBody,

headers: event.headers,
}),
Expand Down
17 changes: 5 additions & 12 deletions packages/adapter/adapter-node/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import type { IncomingMessage, ServerResponse } from "node:http";
import type { Socket } from "node:net";
import process from "node:process";
import { Buffer } from "node:buffer";
import { Readable } from "node:stream";

// @ts-ignore
const deno = typeof Deno !== "undefined";
// @ts-ignore
const bun = typeof Bun !== "undefined";
const isDeno = typeof Deno !== "undefined";

interface PossiblyEncryptedSocket extends Socket {
encrypted?: boolean;
Expand Down Expand Up @@ -143,16 +142,10 @@ function convertBody(req: DecoratedRequest): BodyInit | null | undefined {
return req.rawBody;
}

if (!bun && !deno) {
// Real Node can handle ReadableStream
if (!isDeno) {
// Bun and real Node can handle Readable as request body
return req as any;
}

return new ReadableStream({
start(controller) {
req.on("data", (chunk) => controller.enqueue(chunk));
req.on("end", () => controller.close());
req.on("error", (err) => controller.error(err));
},
});
return Readable.toWeb(req) as any;
}
138 changes: 99 additions & 39 deletions packages/adapter/adapter-node/src/response.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable @typescript-eslint/ban-ts-comment */
import { ServerResponse } from "node:http";
import { Readable } from "node:stream";
import { rawBodySymbol } from "./raw-body-symbol";
import { DecoratedRequest } from "./common";

Expand Down Expand Up @@ -28,58 +27,88 @@ export async function sendResponse(
res: ServerResponse,
fetchResponse: Response,
): Promise<void> {
const controller = new AbortController();
const signal = controller.signal;

req.once("close", () => {
controller.abort();
});

res.once("close", () => {
controller.abort();
});

const hasContentLength = fetchResponse.headers.has("Content-Length");

if ((fetchResponse as any)[rawBodySymbol]) {
writeHead(fetchResponse, res);
res.end((fetchResponse as any)[rawBodySymbol]);
return;
}

const { body: fetchBody } = fetchResponse;
const body = fetchResponse.body;
if (!body) {
// Deno doesn't handle Content-Length automatically
if (!hasContentLength) {
res.setHeader("Content-Length", "0");
}
writeHead(fetchResponse, res);
res.end();
return;
}

let body: Readable | null = null;
if (!deno && fetchBody instanceof Readable) {
body = fetchBody;
} else if (fetchBody instanceof ReadableStream) {
if (!deno && Readable.fromWeb) {
// Available in Node.js 17+
body = Readable.fromWeb(fetchBody as any);
let setImmediateFired = false;
setImmediate(() => {
setImmediateFired = true;
});

const chunks: Uint8Array[] = [];
let bufferWritten = false;
for await (const chunk of body) {
if (signal.aborted) {
body.cancel().catch(() => {});
return;
}
if (setImmediateFired) {
if (!bufferWritten) {
writeHead(fetchResponse, res);
for (const chunk of chunks) {
await writeAndAwait(chunk, res, signal);
if (signal.aborted) {
body.cancel().catch(() => {});
return;
}
}

bufferWritten = true;
}

await writeAndAwait(chunk, res, signal);
if (signal.aborted) {
body.cancel().catch(() => {});
return;
}
} else {
const reader = fetchBody.getReader();
body = new Readable({
async read() {
const { done, value } = await reader.read();
this.push(done ? null : value);
},
});
chunks.push(chunk);
}
} else if (fetchBody) {
body = Readable.from(fetchBody as any);
}

writeHead(fetchResponse, res);
if (signal.aborted) return;

if (body) {
body.pipe(res);
await new Promise<void>((resolve, reject) => {
body!.once("error", reject);
res.once("finish", resolve);
res.once("error", () => {
if (!res.writableEnded) {
body.destroy();
}
reject();
});
req.once("close", () => {
if (!res.writableEnded) {
body.destroy();
resolve();
}
});
});
} else {
res.setHeader("content-length", "0");
if (setImmediateFired) {
res.end();
return;
}

// We were able to read the whole body. Write at once.
const buffer = Buffer.concat(chunks);

// Deno doesn't handle Content-Length automatically
if (!hasContentLength) {
res.setHeader("Content-Length", buffer.length);
}
writeHead(fetchResponse, res);
res.end(buffer);
}

function writeHead(fetchResponse: Response, nodeResponse: ServerResponse) {
Expand All @@ -99,3 +128,34 @@ function writeHead(fetchResponse: Response, nodeResponse: ServerResponse) {
}
}
}

async function writeAndAwait(
chunk: Uint8Array,
res: ServerResponse,
signal: AbortSignal,
) {
const written = res.write(chunk);
if (!written) {
await new Promise<void>((resolve, reject) => {
function cleanup() {
res.off("drain", success);
res.off("error", failure);
signal.removeEventListener("abort", success);
}

function success() {
cleanup();
resolve();
}

function failure(reason: unknown) {
cleanup();
reject(reason);
}

res.once("drain", success);
res.once("error", reject);
signal.addEventListener("abort", success);
});
}
}
Loading

0 comments on commit accbe24

Please sign in to comment.