Skip to content

Commit

Permalink
Add initial support for ReadableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
lxsmnsyc committed Sep 9, 2023
1 parent 1bd9931 commit 2fb64a9
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 21 deletions.
1 change: 1 addition & 0 deletions packages/seroval/src/core/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export const enum SerovalNodeType {
ReadableStreamConstructor = 31,
ReadableStreamEnqueue = 32,
ReadableStreamClose = 33,
ReadableStreamError = 34,
}

export const enum SerovalObjectFlags {
Expand Down
50 changes: 46 additions & 4 deletions packages/seroval/src/core/cross/serialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import type {
SerovalPromiseConstructorNode,
SerovalPromiseResolveNode,
SerovalPromiseRejectNode,
SerovalReadableStreamCloseNode,
SerovalReadableStreamEnqueueNode,
SerovalReadableStreamErrorNode,
SerovalReadableStreamConstructorNode,
} from '../types';
import {
SerovalObjectRecordSpecialKey,
Expand All @@ -48,11 +52,15 @@ import {
import type { Assignment } from '../assignments';
import { resolveAssignments, resolveFlags } from '../assignments';
import {
GLOBAL_CONTEXT_PROMISE_REJECT,
GLOBAL_CONTEXT_PROMISE_RESOLVE,
GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR,
GLOBAL_CONTEXT_REFERENCES,
GLOBAL_CONTEXT_REJECT,
GLOBAL_CONTEXT_RESOLVE,
REFERENCES_KEY,
GLOBAL_CONTEXT_STREAM_CLOSE,
GLOBAL_CONTEXT_STREAM_ENQUEUE,
GLOBAL_CONTEXT_STREAM_ERROR,
GLOBAL_CONTEXT_STREAM_CONSTRUCTOR,
} from '../keys';

export function getRefExpr(id: number): string {
Expand Down Expand Up @@ -718,14 +726,14 @@ function serializePromiseResolve(
ctx: CrossSerializerContext,
node: SerovalPromiseResolveNode,
): string {
return GLOBAL_CONTEXT_REFERENCES + '[' + node.i + '].' + GLOBAL_CONTEXT_RESOLVE + '(' + crossSerializeTree(ctx, node.f) + ')';
return GLOBAL_CONTEXT_PROMISE_RESOLVE + '(' + node.i + ',' + crossSerializeTree(ctx, node.f) + ')';
}

function serializePromiseReject(
ctx: CrossSerializerContext,
node: SerovalPromiseRejectNode,
): string {
return GLOBAL_CONTEXT_REFERENCES + '[' + node.i + '].' + GLOBAL_CONTEXT_REJECT + '(' + crossSerializeTree(ctx, node.f) + ')';
return GLOBAL_CONTEXT_PROMISE_REJECT + '(' + node.i + ',' + crossSerializeTree(ctx, node.f) + ')';
}

function serializePromiseConstructor(
Expand All @@ -734,6 +742,32 @@ function serializePromiseConstructor(
return assignIndexedValue(node.i, GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR + '()');
}

function serializeReadableStreamClose(
node: SerovalReadableStreamCloseNode,
): string {
return GLOBAL_CONTEXT_STREAM_CLOSE + '(' + node.i + ')';
}

function serializeReadableStreamEnqueue(
ctx: CrossSerializerContext,
node: SerovalReadableStreamEnqueueNode,
): string {
return GLOBAL_CONTEXT_STREAM_ENQUEUE + '(' + node.i + ',' + crossSerializeTree(ctx, node.f) + ')';
}

function serializeReadableStreamError(
ctx: CrossSerializerContext,
node: SerovalReadableStreamErrorNode,
): string {
return GLOBAL_CONTEXT_STREAM_ERROR + '(' + node.i + ',' + crossSerializeTree(ctx, node.f) + ')';
}

function serializeReadableStreamConstructor(
node: SerovalReadableStreamConstructorNode,
): string {
return assignIndexedValue(node.i, GLOBAL_CONTEXT_STREAM_CONSTRUCTOR + '()');
}

export default function crossSerializeTree(
ctx: CrossSerializerContext,
node: SerovalNode,
Expand Down Expand Up @@ -800,6 +834,14 @@ export default function crossSerializeTree(
return serializePromiseReject(ctx, node);
case SerovalNodeType.PromiseConstructor:
return serializePromiseConstructor(node);
case SerovalNodeType.ReadableStreamClose:
return serializeReadableStreamClose(node);
case SerovalNodeType.ReadableStreamConstructor:
return serializeReadableStreamConstructor(node);
case SerovalNodeType.ReadableStreamEnqueue:
return serializeReadableStreamEnqueue(ctx, node);
case SerovalNodeType.ReadableStreamError:
return serializeReadableStreamError(ctx, node);
default:
throw new Error('invariant');
}
Expand Down
88 changes: 88 additions & 0 deletions packages/seroval/src/core/cross/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import type {
SerovalPromiseNode,
SerovalPromiseConstructorNode,
SerovalSetNode,
SerovalReadableStreamConstructorNode,
} from '../types';
import {
SerovalObjectRecordSpecialKey,
Expand Down Expand Up @@ -490,6 +491,91 @@ function generateBoxedNode(
};
}

function generateReadableStreamNode(
ctx: StreamingCrossParserContext,
id: number,
current: ReadableStream<unknown>,
): SerovalReadableStreamConstructorNode {
assert(ctx.features & Feature.WebAPI, new UnsupportedTypeError(current));
const reader = current.getReader();

function push(): void {
reader.read().then(
(data) => {
if (ctx.alive) {
if (data.done) {
ctx.onParse({
t: SerovalNodeType.ReadableStreamClose,
i: id,
s: undefined,
l: undefined,
c: undefined,
m: undefined,
// Parse options first before the items
d: undefined,
a: undefined,
f: undefined,
b: undefined,
o: undefined,
}, false);
} else {
ctx.onParse({
t: SerovalNodeType.ReadableStreamEnqueue,
i: id,
s: undefined,
l: undefined,
c: undefined,
m: undefined,
// Parse options first before the items
d: undefined,
a: undefined,
f: crossParseStream(ctx, data.value),
b: undefined,
o: undefined,
}, false);
push();
}
}
},
(value) => {
if (ctx.alive) {
ctx.onParse({
t: SerovalNodeType.ReadableStreamError,
i: id,
s: undefined,
l: undefined,
c: undefined,
m: undefined,
// Parse options first before the items
d: undefined,
a: undefined,
f: crossParseStream(ctx, value),
b: undefined,
o: undefined,
}, false);
push();
}
},
);
}

push();

return {
t: SerovalNodeType.ReadableStreamConstructor,
i: id,
s: undefined,
l: undefined,
c: undefined,
m: undefined,
d: undefined,
a: undefined,
f: undefined,
b: undefined,
o: undefined,
};
}

function parseObject(
ctx: StreamingCrossParserContext,
current: object | null,
Expand Down Expand Up @@ -593,6 +679,8 @@ function parseObject(
return generateHeadersNode(ctx, id, current as unknown as Headers);
case FormData:
return generateFormDataNode(ctx, id, current as unknown as FormData);
case ReadableStream:
return generateReadableStreamNode(ctx, id, current as unknown as ReadableStream);
default:
break;
}
Expand Down
38 changes: 30 additions & 8 deletions packages/seroval/src/core/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,42 @@ export const REFERENCES_KEY = '__SEROVAL_REFS__';

export const GLOBAL_CONTEXT_REFERENCES = '$R';

export const GLOBAL_CONTEXT_RESOLVE = 's';
export const LOCAL_CONTEXT_PROMISE_RESOLVE = 's';

export const GLOBAL_CONTEXT_REJECT = 'f';
export const LOCAL_CONTEXT_PROMISE_REJECT = 'f';

export const GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR = '$P';

export const ROOT_REFERENCE = 't';
export const GLOBAL_CONTEXT_PROMISE_RESOLVE = '$Ps';

const GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR_FUNCTION_BODY = `(s,f,p){return p=new Promise(function(a,b){s=a,f=b}),p.${GLOBAL_CONTEXT_RESOLVE}=s,p.${GLOBAL_CONTEXT_REJECT}=f,p}`;
export const GLOBAL_CONTEXT_PROMISE_REJECT = '$Pf';

function getPromiseConstructor(): string {
return `function ${GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR}${GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR_FUNCTION_BODY}`;
}
export const LOCAL_CONTEXT_STREAM_ENQUEUE = 'q';

export const LOCAL_CONTEXT_STREAM_CLOSE = 'c';

export const LOCAL_CONTEXT_STREAM_ERROR = 'e';

export const GLOBAL_CONTEXT_STREAM_CONSTRUCTOR = '$S';

export const GLOBAL_CONTEXT_STREAM_ENQUEUE = '$Sq';

export const GLOBAL_CONTEXT_STREAM_CLOSE = '$Sc';

export const GLOBAL_CONTEXT_STREAM_ERROR = '$Se';

export const ROOT_REFERENCE = 't';

const GLOBAL_HEADER = `function ${GLOBAL_CONTEXT_PROMISE_CONSTRUCTOR}(s,f,p){return p=(new Promise(function(a,b){s=a,f=b})),p.${LOCAL_CONTEXT_PROMISE_RESOLVE}=s,p.${LOCAL_CONTEXT_PROMISE_REJECT}=f,p}
function $uP(p){delete p.${LOCAL_CONTEXT_PROMISE_RESOLVE};delete p.${LOCAL_CONTEXT_PROMISE_REJECT}}
function ${GLOBAL_CONTEXT_PROMISE_RESOLVE}(i,d){$R[i].${LOCAL_CONTEXT_PROMISE_RESOLVE}(d);$uP($R[i])}
function ${GLOBAL_CONTEXT_PROMISE_REJECT}(i,d){$R[i].${LOCAL_CONTEXT_PROMISE_REJECT}(d);$uP($R[i])}
function ${GLOBAL_CONTEXT_STREAM_CONSTRUCTOR}(n,e,t){function u(n,e){switch(e[0]){case 0:return n.enqueue(e[1]);case 1:return n.error(e[1]);case 2:return n.close()}}function r(t,u,r,c){for(n.push([t,u]),r=0,c=e.length;r<c;r++)e[r]([t,u])}return n=[],e=[],(t=new ReadableStream({start:function(t){!function(e,t,r){for(t=0,r=n.length;t<r;t++)u(e,n[t])}(t),e.push((function(n){u(t,n)}))}})).${LOCAL_CONTEXT_STREAM_ENQUEUE}=function(n){r(0,n)},t.${LOCAL_CONTEXT_STREAM_ERROR}=function(n){r(1,n)},t.${LOCAL_CONTEXT_STREAM_CLOSE}=function(n){r(2);delete t.${LOCAL_CONTEXT_STREAM_ENQUEUE};delete t.${LOCAL_CONTEXT_STREAM_ERROR};delete t.${LOCAL_CONTEXT_STREAM_CLOSE}},t}
function ${GLOBAL_CONTEXT_STREAM_ENQUEUE}(i,d){$R[i].${LOCAL_CONTEXT_STREAM_ENQUEUE}(d)}
function ${GLOBAL_CONTEXT_STREAM_ERROR}(i,d){$R[i].${LOCAL_CONTEXT_STREAM_ERROR}(d)}
function ${GLOBAL_CONTEXT_STREAM_CLOSE}(i){$R[i].${LOCAL_CONTEXT_STREAM_CLOSE}()}
`;

export function getCrossReferenceHeader(): string {
return `${GLOBAL_CONTEXT_REFERENCES}=[];${getPromiseConstructor()};`;
return `${GLOBAL_CONTEXT_REFERENCES}=[];${GLOBAL_HEADER}`;
}
28 changes: 27 additions & 1 deletion packages/seroval/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,28 @@ export interface SerovalPromiseRejectNode extends SerovalBaseNode {
f: SerovalNode;
}

export interface SerovalReadableStreamConstructorNode extends SerovalBaseNode {
t: SerovalNodeType.ReadableStreamConstructor;
i: number;
}

export interface SerovalReadableStreamEnqueueNode extends SerovalBaseNode {
t: SerovalNodeType.ReadableStreamEnqueue;
i: number;
f: SerovalNode;
}

export interface SerovalReadableStreamCloseNode extends SerovalBaseNode {
t: SerovalNodeType.ReadableStreamClose;
i: number;
}

export interface SerovalReadableStreamErrorNode extends SerovalBaseNode {
t: SerovalNodeType.ReadableStreamError;
i: number;
f: SerovalNode;
}

export type SerovalSyncNode =
| SerovalPrimitiveNode
| SerovalIndexedValueNode
Expand Down Expand Up @@ -346,7 +368,11 @@ export type SerovalAsyncNode =
| SerovalFileNode
| SerovalPromiseConstructorNode
| SerovalPromiseResolveNode
| SerovalPromiseRejectNode;
| SerovalPromiseRejectNode
| SerovalReadableStreamConstructorNode
| SerovalReadableStreamEnqueueNode
| SerovalReadableStreamCloseNode
| SerovalReadableStreamErrorNode;

export type SerovalNode =
| SerovalSyncNode
Expand Down
19 changes: 12 additions & 7 deletions packages/seroval/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ const serializer = new Serializer({

console.log('HEADER', [serializer.getHeader()]);

const delay = (value, ms) => new Promise(r => setTimeout(r, ms, value));
const source = new ReadableStream({
start(controller) {
let i = 0;

const source = ({
a: delay('A', 300),
b: delay('B', 200),
c: delay('C', 100),
const interval = setInterval(() => {
if (i > 10) {
controller.close();
clearInterval(interval);
} else {
controller.enqueue('Count: ' + i++);
}
}, 1000);
},
});

source.d = delay(source, 400);

serializer.write('1', source);
16 changes: 16 additions & 0 deletions packages/seroval/test2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import { crossSerializeStream } from './dist/esm/development/index.mjs';

const source = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('World');
controller.close();
}
})

crossSerializeStream(source, {
onSerialize(data) {
console.log([data]);
}
});
2 changes: 1 addition & 1 deletion packages/seroval/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"exclude": ["node_modules"],
"include": ["src", "types", "test"],
"include": ["src", "types", "test", "hehe.js"],
"compilerOptions": {
"module": "ESNext",
"lib": ["ESNext", "DOM"],
Expand Down

0 comments on commit 2fb64a9

Please sign in to comment.