Skip to content

Commit

Permalink
Merge pull request #155 from jiftechnify/handle-closed
Browse files Browse the repository at this point in the history
Handle CLOSED message from relays
  • Loading branch information
jiftechnify authored Dec 13, 2023
2 parents 79d7917 + 193bfa5 commit 9b0aae1
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 10 deletions.
5 changes: 4 additions & 1 deletion .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ const config = {
plugins: ["@typescript-eslint"],
extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended", "prettier"],
rules: {
"@typescript-eslint/no-unused-vars": ["warn", { argsIgnorePattern: "^_" }],
"@typescript-eslint/no-unused-vars": [
"warn",
{ argsIgnorePattern: "^_", varsIgnorePattern: "^_" },
],
},
},
],
Expand Down
28 changes: 28 additions & 0 deletions packages/kernel/src/nostr.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ describe("parseR2CMessage", () => {
expect(subId).toBe("sub_id");
});

test("parses CLOSED message", () => {
const closedJSON = `["CLOSED", "sub_id", "reason"]`;

const parsed = parseR2CMessage(closedJSON);
if (parsed === undefined) {
fail("parsing failed unexpectedly");
}
if (parsed[0] !== "CLOSED") {
fail(`unexpected message type: ${parsed[0]}`);
}
const [, subId, reason] = parsed;
expect(subId).toBe("sub_id");
expect(reason).toBe("reason");
});

test("parses NOTICE message", () => {
const noticeJSON = `["NOTICE", "error!"]`;

Expand Down Expand Up @@ -94,6 +109,19 @@ describe("parseR2CMessage", () => {
}
});

test("fails on malformed CLOSED message", () => {
const malformedClosedJSON = [
`["CLOSED"]`,
`["CLOSED", 42]`,
`["CLOSED", null]`,
`["CLOSED", "sub_id", null]`,
];

for (const msgJSON of malformedClosedJSON) {
expect(parseR2CMessage(msgJSON)).toBeUndefined();
}
});

test("fails on malformed NOTICE message", () => {
const malformedNoticeJSON = [`["NOTICE"]`];

Expand Down
22 changes: 14 additions & 8 deletions packages/kernel/src/nostr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,18 @@ export type C2RMessageType = C2RMessage[0];
// relay to client messages
type R2CEvent = [type: "EVENT", subId: string, event: NostrEvent];
type R2CEose = [type: "EOSE", subId: string];
type R2CClosed = [type: "CLOSED", subId: string, message: string];
type R2CNotice = [type: "NOTICE", notice: string];

export type R2CMessage = R2CEvent | R2CEose | R2CNotice;
export type R2CMessage = R2CEvent | R2CEose | R2CClosed | R2CNotice;
export type R2CMessageType = R2CMessage[0];

export type R2CSubMessage = R2CEvent | R2CEose;
export type R2CSubMessageType = R2CSubMessage[0];

const msgTypeNames: string[] = ["EVENT", "EOSE", "NOTICE", "OK", "AUTH", "COUNT"];
const supportedR2CMsgTypes: R2CMessageType[] = ["EVENT", "EOSE", "CLOSED", "NOTICE"];
const isSupportedR2CMsgType = (s: string): s is R2CMessageType =>
(supportedR2CMsgTypes as string[]).includes(s);

export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => {
let parsed: unknown;
Expand All @@ -129,12 +132,8 @@ export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => {
}

const msgType = parsed[0] as string;
if (!msgTypeNames.includes(msgType)) {
console.error("unknown R2C message type:", parsed[0]);
return undefined;
}
if (msgType === "OK" || msgType === "AUTH" || msgType === "COUNT") {
console.warn("ignoring R2C OK/AUTH/COUNT message");
if (!isSupportedR2CMsgType(msgType)) {
console.error("unsupported R2C message type:", parsed[0]);
return undefined;
}
switch (msgType) {
Expand All @@ -161,6 +160,13 @@ export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => {
}
return parsed as R2CEose;
}
case "CLOSED": {
if (parsed.length !== 3 || typeof parsed[1] !== "string" || typeof parsed[2] !== "string") {
console.error("malformed R2C CLOSED");
return undefined;
}
return parsed as R2CClosed;
}
case "NOTICE": {
if (parsed.length !== 2) {
console.error("malformed R2C NOTICE");
Expand Down
25 changes: 24 additions & 1 deletion packages/nostr-fetch/src/fetcherBackend.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { FetchTillEoseOptions, NostrFetcherBackend } from "@nostr-fetch/kernel/fetcherBackend";
import {
FetchTillEoseOptions,
NostrFetcherBackend,
isFetchTillEoseFailedSignal,
} from "@nostr-fetch/kernel/fetcherBackend";
import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer";
import { DefaultFetcherBackend } from "./fetcherBackend";

Expand Down Expand Up @@ -57,6 +61,25 @@ describe("DefaultFetcherBackend", () => {
await expect(wsServer).toReceiveMessage(["CLOSE", "test"]);
});

test("handles subscription close by relay w/ CLOSED message", async () => {
setupMockRelayServer(wsServer, [{ type: "closed", message: "invalid: malformed filter" }]);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, defaultOpts);
try {
for await (const _ of iter) {
// do nothing
}
expect.unreachable("should throw FetchTillEoseFailedSignal");
} catch (err) {
if (isFetchTillEoseFailedSignal(err)) {
expect(err.message).toMatch("invalid: malformed filter");
} else {
expect.unreachable("should throw FetchTillEoseFailedSignal");
}
}
});

test("aborts subscription on NOTICE", async () => {
setupMockRelayServer(wsServer, [
{ type: "events", eventsSpec: { content: "test", n: 9 } },
Expand Down
5 changes: 5 additions & 0 deletions packages/nostr-fetch/src/fetcherBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ export class DefaultFetcherBackend implements NostrFetcherBackend {
tx.close();
}
});
sub.on("closed", (msg) => {
tx.error(
new FetchTillEoseFailedSignal(`subscription (id: ${sub.subId}) closed by relay: ${msg}`),
);
});

// common process to close subscription
const closeSub = () => {
Expand Down
32 changes: 32 additions & 0 deletions packages/nostr-fetch/src/relay.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
RelayDisconnectCb,
RelayErrorCb,
RelayNoticeCb,
SubClosedCb,
SubEoseCb,
SubEventCb,
WSCloseEvent,
Expand Down Expand Up @@ -149,13 +150,15 @@ describe("Relay", () => {
let spyCbs: {
event: SubEventCb;
eose: SubEoseCb;
closed: SubClosedCb;
};

beforeEach(() => {
server = new WS(rurl, { jsonProtocol: true });
spyCbs = {
event: vi.fn((_) => {}),
eose: vi.fn((_) => {}),
closed: vi.fn((_) => {}),
};
});
afterEach(() => {
Expand All @@ -177,6 +180,7 @@ describe("Relay", () => {
sub.on("event", spyCbs.event);
sub.on("eose", spyCbs.eose);
sub.on("eose", () => waitEose.resolve());
sub.on("closed", spyCbs.closed);

sub.req();
await expect(server).toReceiveMessage(["REQ", "normal", {}]);
Expand All @@ -189,6 +193,34 @@ describe("Relay", () => {
// mock relay sends: 5 EVENTs then EOSE
expect(spyCbs.event).toBeCalledTimes(5);
expect(spyCbs.eose).toBeCalledTimes(1);
expect(spyCbs.closed).not.toBeCalled();
});

test("CLOSED by relay", async () => {
const r = initRelay(rurl, { connectTimeoutMs: 5000 });
setupMockRelayServer(server, [{ type: "closed", message: "closed by relay" }]);

await r.connect();

const waitClosed = new Deferred<void>();

const sub = r.prepareSub([{}], {
skipVerification: false,
abortSubBeforeEoseTimeoutMs: 1000,
subId: "malformed_sub",
});
sub.on("event", spyCbs.event);
sub.on("eose", spyCbs.eose);
sub.on("closed", spyCbs.closed);
sub.on("closed", () => waitClosed.resolve());

sub.req();
await waitClosed.promise;

expect(spyCbs.closed).toBeCalledTimes(1);
expect(spyCbs.closed).toBeCalledWith("closed by relay");
expect(spyCbs.event).not.toBeCalled();
expect(spyCbs.eose).not.toBeCalled();
});

test("aborts before EOSE if relay doesn't return events for a while", async () => {
Expand Down
16 changes: 16 additions & 0 deletions packages/nostr-fetch/src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class RelayImpl implements Relay {
this.forwardToSub(subId, (sub) => sub._forwardEose());
break;
}
case "CLOSED": {
const [, subId, msg] = parsed;
this.forwardToSub(subId, (sub) => sub._forwardClosed(msg));
break;
}
case "NOTICE": {
const [, notice] = parsed;
this.#listeners.notice.forEach((cb) => cb(notice));
Expand Down Expand Up @@ -213,10 +218,12 @@ type EoseEventPayload = {

export type SubEventCb = Callback<NostrEvent>;
export type SubEoseCb = Callback<EoseEventPayload>;
export type SubClosedCb = Callback<string>;

export type SubEventCbTypes = {
event: SubEventCb;
eose: SubEoseCb;
closed: SubClosedCb;
};

export type SubEventTypes = keyof SubEventCbTypes;
Expand Down Expand Up @@ -248,6 +255,7 @@ class RelaySubscription implements Subscription {
#listeners: SubListenersTable = {
event: new Set(),
eose: new Set(),
closed: new Set(),
};

#abortSubTimer: NodeJS.Timeout | undefined;
Expand Down Expand Up @@ -315,4 +323,12 @@ class RelaySubscription implements Subscription {
}
this.#listeners.eose.forEach((cb) => cb({ aborted: false }));
}

_forwardClosed(msg: string) {
this.#listeners.closed.forEach((cb) => cb(msg));

// subscription has been closed by the relay -> clean up things
this.#clearListeners();
this.#relay._removeSub(this.#subId);
}
}
10 changes: 10 additions & 0 deletions packages/testutil/src/mockRelayServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type MockRelaySubResponseAction =
eventsSpec: FakeEventsSpec;
intervalMs?: number;
}
| {
type: "closed";
message: string;
}
| {
type: "notice";
notice: string;
Expand All @@ -31,6 +35,7 @@ type MockRelaySubResponseScenario = MockRelaySubResponseAction[];

const r2cEventMsg = (subId: string, ev: NostrEvent) => JSON.stringify(["EVENT", subId, ev]);
const r2cEoseMsg = (subId: string) => JSON.stringify(["EOSE", subId]);
const r2cClosedMsg = (subId: string, message: string) => JSON.stringify(["CLOSED", subId, message]);
const r2cNoticeMsg = (notice: string) => JSON.stringify(["NOTICE", notice]);

// play the "subscription response scenario" on subscription request
Expand All @@ -41,6 +46,7 @@ const playSubScenario = async (
subId: string,
) => {
for (const action of scenario) {
console.log("playSubScenario", action);
switch (action.type) {
case "events":
for (const ev of generateFakeEvents(action.eventsSpec)) {
Expand All @@ -52,6 +58,10 @@ const playSubScenario = async (
}
break;

case "closed":
socket.send(r2cClosedMsg(subId, action.message));
break;

case "notice":
socket.send(r2cNoticeMsg(action.notice));
break;
Expand Down

0 comments on commit 9b0aae1

Please sign in to comment.