diff --git a/.eslintrc.cjs b/.eslintrc.cjs index d630dde..ba5e3f6 100644 --- a/.eslintrc.cjs +++ b/.eslintrc.cjs @@ -16,7 +16,10 @@ const config = { plugins: ["@typescript-eslint"], extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended", "prettier"], rules: { - "@typescript-eslint/no-unused-vars": ["warn", { argsIgnorePattern: "^_" }], + "@typescript-eslint/no-unused-vars": [ + "warn", + { argsIgnorePattern: "^_", varsIgnorePattern: "^_" }, + ], }, }, ], diff --git a/packages/kernel/src/nostr.spec.ts b/packages/kernel/src/nostr.spec.ts index f1de3a1..92e94aa 100644 --- a/packages/kernel/src/nostr.spec.ts +++ b/packages/kernel/src/nostr.spec.ts @@ -42,6 +42,21 @@ describe("parseR2CMessage", () => { expect(subId).toBe("sub_id"); }); + test("parses CLOSED message", () => { + const closedJSON = `["CLOSED", "sub_id", "reason"]`; + + const parsed = parseR2CMessage(closedJSON); + if (parsed === undefined) { + fail("parsing failed unexpectedly"); + } + if (parsed[0] !== "CLOSED") { + fail(`unexpected message type: ${parsed[0]}`); + } + const [, subId, reason] = parsed; + expect(subId).toBe("sub_id"); + expect(reason).toBe("reason"); + }); + test("parses NOTICE message", () => { const noticeJSON = `["NOTICE", "error!"]`; @@ -94,6 +109,19 @@ describe("parseR2CMessage", () => { } }); + test("fails on malformed CLOSED message", () => { + const malformedClosedJSON = [ + `["CLOSED"]`, + `["CLOSED", 42]`, + `["CLOSED", null]`, + `["CLOSED", "sub_id", null]`, + ]; + + for (const msgJSON of malformedClosedJSON) { + expect(parseR2CMessage(msgJSON)).toBeUndefined(); + } + }); + test("fails on malformed NOTICE message", () => { const malformedNoticeJSON = [`["NOTICE"]`]; diff --git a/packages/kernel/src/nostr.ts b/packages/kernel/src/nostr.ts index 7c77704..b1b45dd 100644 --- a/packages/kernel/src/nostr.ts +++ b/packages/kernel/src/nostr.ts @@ -104,15 +104,18 @@ export type C2RMessageType = C2RMessage[0]; // relay to client messages type R2CEvent = [type: "EVENT", subId: string, event: NostrEvent]; type R2CEose = [type: "EOSE", subId: string]; +type R2CClosed = [type: "CLOSED", subId: string, message: string]; type R2CNotice = [type: "NOTICE", notice: string]; -export type R2CMessage = R2CEvent | R2CEose | R2CNotice; +export type R2CMessage = R2CEvent | R2CEose | R2CClosed | R2CNotice; export type R2CMessageType = R2CMessage[0]; export type R2CSubMessage = R2CEvent | R2CEose; export type R2CSubMessageType = R2CSubMessage[0]; -const msgTypeNames: string[] = ["EVENT", "EOSE", "NOTICE", "OK", "AUTH", "COUNT"]; +const supportedR2CMsgTypes: R2CMessageType[] = ["EVENT", "EOSE", "CLOSED", "NOTICE"]; +const isSupportedR2CMsgType = (s: string): s is R2CMessageType => + (supportedR2CMsgTypes as string[]).includes(s); export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => { let parsed: unknown; @@ -129,12 +132,8 @@ export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => { } const msgType = parsed[0] as string; - if (!msgTypeNames.includes(msgType)) { - console.error("unknown R2C message type:", parsed[0]); - return undefined; - } - if (msgType === "OK" || msgType === "AUTH" || msgType === "COUNT") { - console.warn("ignoring R2C OK/AUTH/COUNT message"); + if (!isSupportedR2CMsgType(msgType)) { + console.error("unsupported R2C message type:", parsed[0]); return undefined; } switch (msgType) { @@ -161,6 +160,13 @@ export const parseR2CMessage = (rawMsg: string): R2CMessage | undefined => { } return parsed as R2CEose; } + case "CLOSED": { + if (parsed.length !== 3 || typeof parsed[1] !== "string" || typeof parsed[2] !== "string") { + console.error("malformed R2C CLOSED"); + return undefined; + } + return parsed as R2CClosed; + } case "NOTICE": { if (parsed.length !== 2) { console.error("malformed R2C NOTICE"); diff --git a/packages/nostr-fetch/src/fetcherBackend.spec.ts b/packages/nostr-fetch/src/fetcherBackend.spec.ts index 3d4f533..0f5a091 100644 --- a/packages/nostr-fetch/src/fetcherBackend.spec.ts +++ b/packages/nostr-fetch/src/fetcherBackend.spec.ts @@ -1,4 +1,8 @@ -import { FetchTillEoseOptions, NostrFetcherBackend } from "@nostr-fetch/kernel/fetcherBackend"; +import { + FetchTillEoseOptions, + NostrFetcherBackend, + isFetchTillEoseFailedSignal, +} from "@nostr-fetch/kernel/fetcherBackend"; import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer"; import { DefaultFetcherBackend } from "./fetcherBackend"; @@ -57,6 +61,25 @@ describe("DefaultFetcherBackend", () => { await expect(wsServer).toReceiveMessage(["CLOSE", "test"]); }); + test("handles subscription close by relay w/ CLOSED message", async () => { + setupMockRelayServer(wsServer, [{ type: "closed", message: "invalid: malformed filter" }]); + + await backend.ensureRelays([url], { connectTimeoutMs: 1000 }); + const iter = backend.fetchTillEose(url, {}, defaultOpts); + try { + for await (const _ of iter) { + // do nothing + } + expect.unreachable("should throw FetchTillEoseFailedSignal"); + } catch (err) { + if (isFetchTillEoseFailedSignal(err)) { + expect(err.message).toMatch("invalid: malformed filter"); + } else { + expect.unreachable("should throw FetchTillEoseFailedSignal"); + } + } + }); + test("aborts subscription on NOTICE", async () => { setupMockRelayServer(wsServer, [ { type: "events", eventsSpec: { content: "test", n: 9 } }, diff --git a/packages/nostr-fetch/src/fetcherBackend.ts b/packages/nostr-fetch/src/fetcherBackend.ts index d52c3db..3869135 100644 --- a/packages/nostr-fetch/src/fetcherBackend.ts +++ b/packages/nostr-fetch/src/fetcherBackend.ts @@ -121,6 +121,11 @@ export class DefaultFetcherBackend implements NostrFetcherBackend { tx.close(); } }); + sub.on("closed", (msg) => { + tx.error( + new FetchTillEoseFailedSignal(`subscription (id: ${sub.subId}) closed by relay: ${msg}`), + ); + }); // common process to close subscription const closeSub = () => { diff --git a/packages/nostr-fetch/src/relay.spec.ts b/packages/nostr-fetch/src/relay.spec.ts index ffcf82c..36547db 100644 --- a/packages/nostr-fetch/src/relay.spec.ts +++ b/packages/nostr-fetch/src/relay.spec.ts @@ -6,6 +6,7 @@ import type { RelayDisconnectCb, RelayErrorCb, RelayNoticeCb, + SubClosedCb, SubEoseCb, SubEventCb, WSCloseEvent, @@ -149,6 +150,7 @@ describe("Relay", () => { let spyCbs: { event: SubEventCb; eose: SubEoseCb; + closed: SubClosedCb; }; beforeEach(() => { @@ -156,6 +158,7 @@ describe("Relay", () => { spyCbs = { event: vi.fn((_) => {}), eose: vi.fn((_) => {}), + closed: vi.fn((_) => {}), }; }); afterEach(() => { @@ -177,6 +180,7 @@ describe("Relay", () => { sub.on("event", spyCbs.event); sub.on("eose", spyCbs.eose); sub.on("eose", () => waitEose.resolve()); + sub.on("closed", spyCbs.closed); sub.req(); await expect(server).toReceiveMessage(["REQ", "normal", {}]); @@ -189,6 +193,34 @@ describe("Relay", () => { // mock relay sends: 5 EVENTs then EOSE expect(spyCbs.event).toBeCalledTimes(5); expect(spyCbs.eose).toBeCalledTimes(1); + expect(spyCbs.closed).not.toBeCalled(); + }); + + test("CLOSED by relay", async () => { + const r = initRelay(rurl, { connectTimeoutMs: 5000 }); + setupMockRelayServer(server, [{ type: "closed", message: "closed by relay" }]); + + await r.connect(); + + const waitClosed = new Deferred(); + + const sub = r.prepareSub([{}], { + skipVerification: false, + abortSubBeforeEoseTimeoutMs: 1000, + subId: "malformed_sub", + }); + sub.on("event", spyCbs.event); + sub.on("eose", spyCbs.eose); + sub.on("closed", spyCbs.closed); + sub.on("closed", () => waitClosed.resolve()); + + sub.req(); + await waitClosed.promise; + + expect(spyCbs.closed).toBeCalledTimes(1); + expect(spyCbs.closed).toBeCalledWith("closed by relay"); + expect(spyCbs.event).not.toBeCalled(); + expect(spyCbs.eose).not.toBeCalled(); }); test("aborts before EOSE if relay doesn't return events for a while", async () => { diff --git a/packages/nostr-fetch/src/relay.ts b/packages/nostr-fetch/src/relay.ts index 9cab87a..69b67f6 100644 --- a/packages/nostr-fetch/src/relay.ts +++ b/packages/nostr-fetch/src/relay.ts @@ -113,6 +113,11 @@ class RelayImpl implements Relay { this.forwardToSub(subId, (sub) => sub._forwardEose()); break; } + case "CLOSED": { + const [, subId, msg] = parsed; + this.forwardToSub(subId, (sub) => sub._forwardClosed(msg)); + break; + } case "NOTICE": { const [, notice] = parsed; this.#listeners.notice.forEach((cb) => cb(notice)); @@ -213,10 +218,12 @@ type EoseEventPayload = { export type SubEventCb = Callback; export type SubEoseCb = Callback; +export type SubClosedCb = Callback; export type SubEventCbTypes = { event: SubEventCb; eose: SubEoseCb; + closed: SubClosedCb; }; export type SubEventTypes = keyof SubEventCbTypes; @@ -248,6 +255,7 @@ class RelaySubscription implements Subscription { #listeners: SubListenersTable = { event: new Set(), eose: new Set(), + closed: new Set(), }; #abortSubTimer: NodeJS.Timeout | undefined; @@ -315,4 +323,12 @@ class RelaySubscription implements Subscription { } this.#listeners.eose.forEach((cb) => cb({ aborted: false })); } + + _forwardClosed(msg: string) { + this.#listeners.closed.forEach((cb) => cb(msg)); + + // subscription has been closed by the relay -> clean up things + this.#clearListeners(); + this.#relay._removeSub(this.#subId); + } } diff --git a/packages/testutil/src/mockRelayServer.ts b/packages/testutil/src/mockRelayServer.ts index d763a8b..40c87de 100644 --- a/packages/testutil/src/mockRelayServer.ts +++ b/packages/testutil/src/mockRelayServer.ts @@ -16,6 +16,10 @@ type MockRelaySubResponseAction = eventsSpec: FakeEventsSpec; intervalMs?: number; } + | { + type: "closed"; + message: string; + } | { type: "notice"; notice: string; @@ -31,6 +35,7 @@ type MockRelaySubResponseScenario = MockRelaySubResponseAction[]; const r2cEventMsg = (subId: string, ev: NostrEvent) => JSON.stringify(["EVENT", subId, ev]); const r2cEoseMsg = (subId: string) => JSON.stringify(["EOSE", subId]); +const r2cClosedMsg = (subId: string, message: string) => JSON.stringify(["CLOSED", subId, message]); const r2cNoticeMsg = (notice: string) => JSON.stringify(["NOTICE", notice]); // play the "subscription response scenario" on subscription request @@ -41,6 +46,7 @@ const playSubScenario = async ( subId: string, ) => { for (const action of scenario) { + console.log("playSubScenario", action); switch (action.type) { case "events": for (const ev of generateFakeEvents(action.eventsSpec)) { @@ -52,6 +58,10 @@ const playSubScenario = async ( } break; + case "closed": + socket.send(r2cClosedMsg(subId, action.message)); + break; + case "notice": socket.send(r2cNoticeMsg(action.notice)); break;