Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle CLOSED message from relays #155

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ const config = {
plugins: ["@typescript-eslint"],
extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended", "prettier"],
rules: {
"@typescript-eslint/no-unused-vars": ["warn", { argsIgnorePattern: "^_" }],
"@typescript-eslint/no-unused-vars": [
"warn",
{ argsIgnorePattern: "^_", varsIgnorePattern: "^_" },
],
},
},
],
Expand Down
28 changes: 28 additions & 0 deletions packages/kernel/src/nostr.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ describe("parseR2CMessage", () => {
expect(subId).toBe("sub_id");
});

test("parses CLOSED message", () => {
const closedJSON = `["CLOSED", "sub_id", "reason"]`;

const parsed = parseR2CMessage(closedJSON);
if (parsed === undefined) {
fail("parsing failed unexpectedly");
}
if (parsed[0] !== "CLOSED") {
fail(`unexpected message type: ${parsed[0]}`);
}
const [, subId, reason] = parsed;
expect(subId).toBe("sub_id");
expect(reason).toBe("reason");
});

test("parses NOTICE message", () => {
const noticeJSON = `["NOTICE", "error!"]`;

Expand Down Expand Up @@ -94,6 +109,19 @@ describe("parseR2CMessage", () => {
}
});

test("fails on malformed CLOSED message", () => {
const malformedClosedJSON = [
`["CLOSED"]`,
`["CLOSED", 42]`,
`["CLOSED", null]`,
`["CLOSED", "sub_id", null]`,
];

for (const msgJSON of malformedClosedJSON) {
expect(parseR2CMessage(msgJSON)).toBeUndefined();
}
});

test("fails on malformed NOTICE message", () => {
const malformedNoticeJSON = [`["NOTICE"]`];

Expand Down
22 changes: 14 additions & 8 deletions packages/kernel/src/nostr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,18 @@ export type C2RMessageType = C2RMessage[0];
// relay to client messages
type R2CEvent = [type: "EVENT", subId: string, event: NostrEvent];
type R2CEose = [type: "EOSE", subId: string];
type R2CClosed = [type: "CLOSED", subId: string, message: string];
type R2CNotice = [type: "NOTICE", notice: string];

export type R2CMessage = R2CEvent | R2CEose | R2CNotice;
export type R2CMessage = R2CEvent | R2CEose | R2CClosed | R2CNotice;
export type R2CMessageType = R2CMessage[0];

export type R2CSubMessage = R2CEvent | R2CEose;
export type R2CSubMessageType = R2CSubMessage[0];

const msgTypeNames: string[] = ["EVENT", "EOSE", "NOTICE", "OK", "AUTH", "COUNT"];
const supportedR2CMsgTypes: R2CMessageType[] = ["EVENT", "EOSE", "CLOSED", "NOTICE"];
const isSupportedR2CMsgType = (s: string): s is R2CMessageType =>
(supportedR2CMsgTypes as string[]).includes(s);

export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => {
let parsed: unknown;
Expand All @@ -129,12 +132,8 @@ export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => {
}

const msgType = parsed[0] as string;
if (!msgTypeNames.includes(msgType)) {
console.error("unknown R2C message type:", parsed[0]);
return undefined;
}
if (msgType === "OK" || msgType === "AUTH" || msgType === "COUNT") {
console.warn("ignoring R2C OK/AUTH/COUNT message");
if (!isSupportedR2CMsgType(msgType)) {
console.error("unsupported R2C message type:", parsed[0]);
return undefined;
}
switch (msgType) {
Expand All @@ -161,6 +160,13 @@ export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => {
}
return parsed as R2CEose;
}
case "CLOSED": {
if (parsed.length !== 3 || typeof parsed[1] !== "string" || typeof parsed[2] !== "string") {
console.error("malformed R2C CLOSED");
return undefined;
}
return parsed as R2CClosed;
}
case "NOTICE": {
if (parsed.length !== 2) {
console.error("malformed R2C NOTICE");
Expand Down
25 changes: 24 additions & 1 deletion packages/nostr-fetch/src/fetcherBackend.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { FetchTillEoseOptions, NostrFetcherBackend } from "@nostr-fetch/kernel/fetcherBackend";
import {
FetchTillEoseOptions,
NostrFetcherBackend,
isFetchTillEoseFailedSignal,
} from "@nostr-fetch/kernel/fetcherBackend";
import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer";
import { DefaultFetcherBackend } from "./fetcherBackend";

Expand Down Expand Up @@ -57,6 +61,25 @@ describe("DefaultFetcherBackend", () => {
await expect(wsServer).toReceiveMessage(["CLOSE", "test"]);
});

test("handles subscription close by relay w/ CLOSED message", async () => {
setupMockRelayServer(wsServer, [{ type: "closed", message: "invalid: malformed filter" }]);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, defaultOpts);
try {
for await (const _ of iter) {
// do nothing
}
expect.unreachable("should throw FetchTillEoseFailedSignal");
} catch (err) {
if (isFetchTillEoseFailedSignal(err)) {
expect(err.message).toMatch("invalid: malformed filter");
} else {
expect.unreachable("should throw FetchTillEoseFailedSignal");
}
}
});

test("aborts subscription on NOTICE", async () => {
setupMockRelayServer(wsServer, [
{ type: "events", eventsSpec: { content: "test", n: 9 } },
Expand Down
5 changes: 5 additions & 0 deletions packages/nostr-fetch/src/fetcherBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ export class DefaultFetcherBackend implements NostrFetcherBackend {
tx.close();
}
});
sub.on("closed", (msg) => {
tx.error(
new FetchTillEoseFailedSignal(`subscription (id: ${sub.subId}) closed by relay: ${msg}`),
);
});

// common process to close subscription
const closeSub = () => {
Expand Down
32 changes: 32 additions & 0 deletions packages/nostr-fetch/src/relay.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
RelayDisconnectCb,
RelayErrorCb,
RelayNoticeCb,
SubClosedCb,
SubEoseCb,
SubEventCb,
WSCloseEvent,
Expand Down Expand Up @@ -149,13 +150,15 @@ describe("Relay", () => {
let spyCbs: {
event: SubEventCb;
eose: SubEoseCb;
closed: SubClosedCb;
};

beforeEach(() => {
server = new WS(rurl, { jsonProtocol: true });
spyCbs = {
event: vi.fn((_) => {}),
eose: vi.fn((_) => {}),
closed: vi.fn((_) => {}),
};
});
afterEach(() => {
Expand All @@ -177,6 +180,7 @@ describe("Relay", () => {
sub.on("event", spyCbs.event);
sub.on("eose", spyCbs.eose);
sub.on("eose", () => waitEose.resolve());
sub.on("closed", spyCbs.closed);

sub.req();
await expect(server).toReceiveMessage(["REQ", "normal", {}]);
Expand All @@ -189,6 +193,34 @@ describe("Relay", () => {
// mock relay sends: 5 EVENTs then EOSE
expect(spyCbs.event).toBeCalledTimes(5);
expect(spyCbs.eose).toBeCalledTimes(1);
expect(spyCbs.closed).not.toBeCalled();
});

test("CLOSED by relay", async () => {
const r = initRelay(rurl, { connectTimeoutMs: 5000 });
setupMockRelayServer(server, [{ type: "closed", message: "closed by relay" }]);

await r.connect();

const waitClosed = new Deferred<void>();

const sub = r.prepareSub([{}], {
skipVerification: false,
abortSubBeforeEoseTimeoutMs: 1000,
subId: "malformed_sub",
});
sub.on("event", spyCbs.event);
sub.on("eose", spyCbs.eose);
sub.on("closed", spyCbs.closed);
sub.on("closed", () => waitClosed.resolve());

sub.req();
await waitClosed.promise;

expect(spyCbs.closed).toBeCalledTimes(1);
expect(spyCbs.closed).toBeCalledWith("closed by relay");
expect(spyCbs.event).not.toBeCalled();
expect(spyCbs.eose).not.toBeCalled();
});

test("aborts before EOSE if relay doesn't return events for a while", async () => {
Expand Down
16 changes: 16 additions & 0 deletions packages/nostr-fetch/src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class RelayImpl implements Relay {
this.forwardToSub(subId, (sub) => sub._forwardEose());
break;
}
case "CLOSED": {
const [, subId, msg] = parsed;
this.forwardToSub(subId, (sub) => sub._forwardClosed(msg));
break;
}
case "NOTICE": {
const [, notice] = parsed;
this.#listeners.notice.forEach((cb) => cb(notice));
Expand Down Expand Up @@ -213,10 +218,12 @@ type EoseEventPayload = {

export type SubEventCb = Callback<NostrEvent>;
export type SubEoseCb = Callback<EoseEventPayload>;
export type SubClosedCb = Callback<string>;

export type SubEventCbTypes = {
event: SubEventCb;
eose: SubEoseCb;
closed: SubClosedCb;
};

export type SubEventTypes = keyof SubEventCbTypes;
Expand Down Expand Up @@ -248,6 +255,7 @@ class RelaySubscription implements Subscription {
#listeners: SubListenersTable = {
event: new Set(),
eose: new Set(),
closed: new Set(),
};

#abortSubTimer: NodeJS.Timeout | undefined;
Expand Down Expand Up @@ -315,4 +323,12 @@ class RelaySubscription implements Subscription {
}
this.#listeners.eose.forEach((cb) => cb({ aborted: false }));
}

_forwardClosed(msg: string) {
this.#listeners.closed.forEach((cb) => cb(msg));

// subscription has been closed by the relay -> clean up things
this.#clearListeners();
this.#relay._removeSub(this.#subId);
}
}
10 changes: 10 additions & 0 deletions packages/testutil/src/mockRelayServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type MockRelaySubResponseAction =
eventsSpec: FakeEventsSpec;
intervalMs?: number;
}
| {
type: "closed";
message: string;
}
| {
type: "notice";
notice: string;
Expand All @@ -31,6 +35,7 @@ type MockRelaySubResponseScenario = MockRelaySubResponseAction[];

const r2cEventMsg = (subId: string, ev: NostrEvent) => JSON.stringify(["EVENT", subId, ev]);
const r2cEoseMsg = (subId: string) => JSON.stringify(["EOSE", subId]);
const r2cClosedMsg = (subId: string, message: string) => JSON.stringify(["CLOSED", subId, message]);
const r2cNoticeMsg = (notice: string) => JSON.stringify(["NOTICE", notice]);

// play the "subscription response scenario" on subscription request
Expand All @@ -41,6 +46,7 @@ const playSubScenario = async (
subId: string,
) => {
for (const action of scenario) {
console.log("playSubScenario", action);
switch (action.type) {
case "events":
for (const ev of generateFakeEvents(action.eventsSpec)) {
Expand All @@ -52,6 +58,10 @@ const playSubScenario = async (
}
break;

case "closed":
socket.send(r2cClosedMsg(subId, action.message));
break;

case "notice":
socket.send(r2cNoticeMsg(action.notice));
break;
Expand Down
Loading