Skip to content

Commit

Permalink
Feat readable body (#486)
Browse files Browse the repository at this point in the history
* make body a readableStream instead of a string

* force non empty when necessry

* fix error lambda streaming

* fix some issue

* changeset
  • Loading branch information
conico974 committed Aug 21, 2024
1 parent 1b91708 commit b88ae13
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 70 deletions.
5 changes: 5 additions & 0 deletions .changeset/lucky-dots-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"open-next": minor
---

Replace InternalResult body from string to ReadableStream
22 changes: 12 additions & 10 deletions packages/open-next/src/adapters/edge-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { InternalEvent, InternalResult } from "types/open-next";
import type { ReadableStream } from "node:stream/web";

import type { InternalEvent, InternalResult } from "types/open-next";
import { emptyReadableStream } from "utils/stream";

// We import it like that so that the edge plugin can replace it
import { NextConfig } from "../adapters/config";
Expand All @@ -8,11 +11,14 @@ import {
convertToQueryString,
} from "../core/routing/util";

declare global {
var isEdgeRuntime: true;
}

const defaultHandler = async (
internalEvent: InternalEvent,
): Promise<InternalResult> => {
// TODO: We need to handle splitted function here
// We should probably create an host resolver to redirect correctly
globalThis.isEdgeRuntime = true;

const host = internalEvent.headers.host
? `https://${internalEvent.headers.host}`
Expand All @@ -35,10 +41,6 @@ const defaultHandler = async (
url,
body: convertBodyToReadableStream(internalEvent.method, internalEvent.body),
});

const arrayBuffer = await response.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);

const responseHeaders: Record<string, string | string[]> = {};
response.headers.forEach((value, key) => {
if (key.toLowerCase() === "set-cookie") {
Expand All @@ -49,9 +51,9 @@ const defaultHandler = async (
responseHeaders[key] = value;
}
});
// console.log("responseHeaders", responseHeaders);
const body = buffer.toString();
// console.log("body", body);

const body =
(response.body as ReadableStream<Uint8Array>) ?? emptyReadableStream();

return {
type: "core",
Expand Down
9 changes: 5 additions & 4 deletions packages/open-next/src/adapters/image-optimization-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
// @ts-ignore
import type { NextUrlWithParsedQuery } from "next/dist/server/request-meta";
import { InternalEvent, InternalResult } from "types/open-next.js";
import { emptyReadableStream, toReadableStream } from "utils/stream.js";

import { createGenericHandler } from "../core/createGenericHandler.js";
import { resolveImageLoader } from "../core/resolve.js";
Expand Down Expand Up @@ -82,7 +83,7 @@ export async function defaultHandler(
return {
statusCode: 304,
headers: {},
body: "",
body: emptyReadableStream(),
isBase64Encoded: false,
type: "core",
};
Expand Down Expand Up @@ -169,7 +170,7 @@ function buildSuccessResponse(
return {
type: "core",
statusCode: 200,
body: result.buffer.toString("base64"),
body: toReadableStream(result.buffer, true),
isBase64Encoded: true,
headers,
};
Expand All @@ -191,7 +192,7 @@ function buildFailureResponse(
"Cache-Control": `public,max-age=60,immutable`,
"Content-Type": "application/json",
});
response.end(e?.message || e?.toString() || e);
response.end(e?.message || e?.toString() || "An error occurred");
}
return {
type: "core",
Expand All @@ -203,7 +204,7 @@ function buildFailureResponse(
"Cache-Control": `public,max-age=60,immutable`,
"Content-Type": "application/json",
},
body: e?.message || e?.toString() || e,
body: toReadableStream(e?.message || e?.toString() || "An error occurred"),
};
}

Expand Down
9 changes: 6 additions & 3 deletions packages/open-next/src/converters/aws-apigw-v1.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda";
import type { Converter, InternalEvent, InternalResult } from "types/open-next";
import { fromReadableStream } from "utils/stream";

import { debug } from "../adapters/logger";
import { removeUndefinedFromQuery } from "./utils";
Expand Down Expand Up @@ -74,9 +75,9 @@ async function convertFromAPIGatewayProxyEvent(
};
}

function convertToApiGatewayProxyResult(
async function convertToApiGatewayProxyResult(
result: InternalResult,
): APIGatewayProxyResult {
): Promise<APIGatewayProxyResult> {
const headers: Record<string, string> = {};
const multiValueHeaders: Record<string, string[]> = {};
Object.entries(result.headers).forEach(([key, value]) => {
Expand All @@ -91,10 +92,12 @@ function convertToApiGatewayProxyResult(
}
});

const body = await fromReadableStream(result.body, result.isBase64Encoded);

const response: APIGatewayProxyResult = {
statusCode: result.statusCode,
headers,
body: result.body,
body,
isBase64Encoded: result.isBase64Encoded,
multiValueHeaders,
};
Expand Down
9 changes: 6 additions & 3 deletions packages/open-next/src/converters/aws-apigw-v2.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda";
import { parseCookies } from "http/util";
import type { Converter, InternalEvent, InternalResult } from "types/open-next";
import { fromReadableStream } from "utils/stream";

import { debug } from "../adapters/logger";
import { convertToQuery } from "../core/routing/util";
Expand Down Expand Up @@ -85,9 +86,9 @@ async function convertFromAPIGatewayProxyEventV2(
};
}

function convertToApiGatewayProxyResultV2(
async function convertToApiGatewayProxyResultV2(
result: InternalResult,
): APIGatewayProxyResultV2 {
): Promise<APIGatewayProxyResultV2> {
const headers: Record<string, string> = {};
Object.entries(result.headers)
.filter(
Expand All @@ -104,11 +105,13 @@ function convertToApiGatewayProxyResultV2(
headers[key] = Array.isArray(value) ? value.join(", ") : `${value}`;
});

const body = await fromReadableStream(result.body, result.isBase64Encoded);

const response: APIGatewayProxyResultV2 = {
statusCode: result.statusCode,
headers,
cookies: parseCookies(result.headers["set-cookie"]),
body: result.body,
body,
isBase64Encoded: result.isBase64Encoded,
};
debug(response);
Expand Down
11 changes: 9 additions & 2 deletions packages/open-next/src/converters/aws-cloudfront.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { OutgoingHttpHeader } from "http";
import { parseCookies } from "http/util";
import type { Converter, InternalEvent, InternalResult } from "types/open-next";
import { fromReadableStream } from "utils/stream";

import { debug } from "../adapters/logger";
import {
Expand Down Expand Up @@ -159,14 +160,18 @@ async function convertToCloudFrontRequestResult(
const serverResponse = createServerResponse(result.internalEvent, {});
await proxyRequest(result.internalEvent, serverResponse);
const externalResult = convertRes(serverResponse);
const body = await fromReadableStream(
externalResult.body,
externalResult.isBase64Encoded,
);
const cloudfrontResult = {
status: externalResult.statusCode.toString(),
statusDescription: "OK",
headers: convertToCloudfrontHeaders(externalResult.headers, true),
bodyEncoding: externalResult.isBase64Encoded
? ("base64" as const)
: ("text" as const),
body: externalResult.body,
body,
};
debug("externalResult", cloudfrontResult);
return cloudfrontResult;
Expand Down Expand Up @@ -208,12 +213,14 @@ async function convertToCloudFrontRequestResult(
return response;
}

const body = await fromReadableStream(result.body, result.isBase64Encoded);

const response: CloudFrontRequestResult = {
status: result.statusCode.toString(),
statusDescription: "OK",
headers: convertToCloudfrontHeaders(responseHeaders, true),
bodyEncoding: result.isBase64Encoded ? "base64" : "text",
body: result.body,
body,
};
debug(response);
return response;
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/converters/edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const converter: Converter<
for (const [key, value] of Object.entries(result.headers)) {
headers.set(key, Array.isArray(value) ? value.join(",") : value);
}
return new Response(result.body, {
return new Response(result.body as ReadableStream, {
status: result.statusCode,
headers: headers,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/converters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const converter: Converter = {
};
},
// Nothing to do here, it's streaming
convertTo: (internalResult: InternalResult) => ({
convertTo: async (internalResult: InternalResult) => ({
body: internalResult.body,
headers: internalResult.headers,
statusCode: internalResult.statusCode,
Expand Down
23 changes: 17 additions & 6 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from "http/index.js";
import { InternalEvent, InternalResult } from "types/open-next";
import { DetachedPromiseRunner } from "utils/promise";
import { fromReadableStream } from "utils/stream";

import { debug, error, warn } from "../adapters/logger";
import { convertRes, createServerResponse, proxyRequest } from "./routing/util";
Expand Down Expand Up @@ -63,12 +64,22 @@ export async function openNextHandler(
}, {});

if ("type" in preprocessResult) {
// res is used only in the streaming case
const res = createServerResponse(internalEvent, headers, responseStreaming);
res.statusCode = preprocessResult.statusCode;
res.flushHeaders();
res.write(preprocessResult.body);
res.end();
// // res is used only in the streaming case
if (responseStreaming) {
const res = createServerResponse(
internalEvent,
headers,
responseStreaming,
);
res.statusCode = preprocessResult.statusCode;
res.flushHeaders();
const body = await fromReadableStream(
preprocessResult.body,
preprocessResult.isBase64Encoded,
);
res.write(body);
res.end();
}
return preprocessResult;
} else {
const preprocessedEvent = preprocessResult.internalEvent;
Expand Down
14 changes: 9 additions & 5 deletions packages/open-next/src/core/routing/matcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
RouteHas,
} from "types/next-types";
import { InternalEvent, InternalResult } from "types/open-next";
import { emptyReadableStream, toReadableStream } from "utils/stream";

import { debug } from "../../adapters/logger";
import { localizePath } from "./i18n";
Expand Down Expand Up @@ -243,8 +244,11 @@ export function handleRewrites<T extends RewriteDefinition>(
};
}

function handleTrailingSlashRedirect(event: InternalEvent) {
function handleTrailingSlashRedirect(
event: InternalEvent,
): false | InternalResult {
const url = new URL(event.url, "http://localhost");
const emptyBody = emptyReadableStream();

if (
// Someone is trying to redirect to a different origin, let's not do that
Expand All @@ -270,7 +274,7 @@ function handleTrailingSlashRedirect(event: InternalEvent) {
headersLocation[1] ? `?${headersLocation[1]}` : ""
}`,
},
body: "",
body: emptyBody,
isBase64Encoded: false,
};
// eslint-disable-next-line sonarjs/elseif-without-else
Expand All @@ -288,7 +292,7 @@ function handleTrailingSlashRedirect(event: InternalEvent) {
headersLocation[1] ? `?${headersLocation[1]}` : ""
}`,
},
body: "",
body: emptyBody,
isBase64Encoded: false,
};
} else return false;
Expand All @@ -311,7 +315,7 @@ export function handleRedirects(
headers: {
Location: internalEvent.url,
},
body: "",
body: emptyReadableStream(),
isBase64Encoded: false,
};
}
Expand All @@ -328,7 +332,7 @@ export function fixDataPage(
return {
type: internalEvent.type,
statusCode: 404,
body: "{}",
body: toReadableStream("{}"),
headers: {
"Content-Type": "application/json",
},
Expand Down
11 changes: 6 additions & 5 deletions packages/open-next/src/core/routing/middleware.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { ReadableStream } from "node:stream/web";

import { MiddlewareManifest, NextConfig } from "config/index.js";
import { InternalEvent, InternalResult } from "types/open-next.js";
import { emptyReadableStream } from "utils/stream.js";

//NOTE: we should try to avoid importing stuff from next as much as possible
// every release of next could break this
Expand Down Expand Up @@ -137,7 +140,7 @@ export async function handleMiddleware(
) ?? resHeaders.location;
// res.setHeader("Location", location);
return {
body: "",
body: emptyReadableStream(),
type: internalEvent.type,
statusCode: statusCode,
headers: resHeaders,
Expand Down Expand Up @@ -182,16 +185,14 @@ export async function handleMiddleware(
// the body immediately to the client.
if (result.body) {
// transfer response body to res
const arrayBuffer = await result.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);
// res.end(buffer);
const body = result.body as ReadableStream<Uint8Array>;

// await pipeReadable(result.response.body, res);
return {
type: internalEvent.type,
statusCode: statusCode,
headers: resHeaders,
body: buffer.toString(),
body,
isBase64Encoded: false,
};
}
Expand Down
9 changes: 5 additions & 4 deletions packages/open-next/src/core/routing/util.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import crypto from "node:crypto";
import { OutgoingHttpHeaders } from "node:http";
import { Readable } from "node:stream";

import { BuildId, HtmlPages } from "config/index.js";
import type { IncomingMessage, StreamCreator } from "http/index.js";
import { OpenNextNodeResponse } from "http/openNextResponse.js";
import { parseHeaders } from "http/util.js";
import type { MiddlewareManifest } from "types/next-types";
import { InternalEvent } from "types/open-next.js";
import { InternalEvent, InternalResult } from "types/open-next.js";

import { isBinaryContentType } from "../../adapters/binary.js";
import { debug, error } from "../../adapters/logger.js";
Expand Down Expand Up @@ -69,7 +70,7 @@ export function getUrlParts(url: string, isExternal: boolean) {
*
* @__PURE__
*/
export function convertRes(res: OpenNextNodeResponse) {
export function convertRes(res: OpenNextNodeResponse): InternalResult {
// Format Next.js response to Lambda response
const statusCode = res.statusCode || 200;
// When using HEAD requests, it seems that flushHeaders is not called, not sure why
Expand All @@ -80,9 +81,9 @@ export function convertRes(res: OpenNextNodeResponse) {
? headers["content-type"][0]
: headers["content-type"],
);
const encoding = isBase64Encoded ? "base64" : "utf8";
const body = res.getBody().toString(encoding);
const body = Readable.toWeb(res);
return {
type: "core",
statusCode,
headers,
body,
Expand Down
Loading

0 comments on commit b88ae13

Please sign in to comment.