From 6e665ebfe5747d8e653b1f78e5d048cc23e251b8 Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Wed, 13 Sep 2023 15:36:33 +0200 Subject: [PATCH] feat: waku improvements --- src/lib/adapters/waku/index.ts | 13 +++++-- src/lib/adapters/waku/waku.ts | 12 +++++-- src/lib/adapters/waku/wakustore.ts | 58 +++++++++++++++++------------- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/src/lib/adapters/waku/index.ts b/src/lib/adapters/waku/index.ts index 09da88ac..fcff87eb 100644 --- a/src/lib/adapters/waku/index.ts +++ b/src/lib/adapters/waku/index.ts @@ -33,6 +33,8 @@ import type { StorageChat, StorageChatEntry, StorageObjectEntry, StorageProfile import { genRandomHex } from '$lib/utils' import { walletStore } from '$lib/stores/wallet' +const MAX_MESSAGES = 100 + interface QueuedMessage { message: Message address: string @@ -96,7 +98,7 @@ async function addMessageToChat( const unread = message.fromAddress !== address && message.type === 'user' ? 1 : 0 chats.updateChat(chatId, (chat) => ({ ...chat, - messages: [...chat.messages, message], + messages: [...chat.messages.slice(-MAX_MESSAGES), message], unread: chat.unread + unread, })) } @@ -165,7 +167,14 @@ export default class WakuAdapter implements Adapter { async onLogIn(wallet: BaseWallet): Promise { const address = wallet.address - this.waku = await connectWaku() + this.waku = await connectWaku({ + onDisconnect: () => { + console.debug('❌ disconnected from waku') + }, + onConnect: () => { + console.debug('✅ connected to waku') + }, + }) const wakuObjectAdapter = makeWakuObjectAdapter(this, wallet) diff --git a/src/lib/adapters/waku/waku.ts b/src/lib/adapters/waku/waku.ts index e4a6bdbc..2b72a543 100644 --- a/src/lib/adapters/waku/waku.ts +++ b/src/lib/adapters/waku/waku.ts @@ -37,6 +37,7 @@ function getTopic(contentTopic: ContentTopic, id: string | '' = '') { interface ConnectWakuOptions { onDisconnect?: () => void + onConnect?: (connections: unknown[]) => void } export async function connectWaku(options?: ConnectWakuOptions) { @@ -48,6 +49,13 @@ export async function connectWaku(options?: ConnectWakuOptions) { } }) + waku.libp2p.addEventListener('peer:connect', () => { + if (options?.onConnect && waku.libp2p.getConnections().length > 0) { + const connections = waku.libp2p.getConnections() + options.onConnect(connections) + } + }) + await waku.start() await waku.dial(peerMultiaddr) await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush, Protocols.Store]) @@ -107,7 +115,5 @@ export async function sendMessage(waku: LightNode, id: string, message: unknown) const encoder = createEncoder({ contentTopic }) const { error } = await waku.lightPush.send(encoder, { payload }) - if (error) { - console.error(error) - } + return error } diff --git a/src/lib/adapters/waku/wakustore.ts b/src/lib/adapters/waku/wakustore.ts index b4995bc4..c77c19a4 100644 --- a/src/lib/adapters/waku/wakustore.ts +++ b/src/lib/adapters/waku/wakustore.ts @@ -43,6 +43,27 @@ export function makeWakustore(waku: LightNode) { return makeQuery(contentTopic, id, queryOptions) } + function decodedMessageToTypedResult(message: DecodedMessage): T { + const decodedPayload = decodeMessagePayload(message) + const typedPayload = JSON.parse(decodedPayload) as T & { timestamp?: number } + + // HACK to use waku timestamp instead of the type T's + if ( + typedPayload && + typeof typedPayload === 'object' && + !Array.isArray(typedPayload) && + typedPayload.timestamp + ) { + return { + ...typedPayload, + timestamp: Number(message.timestamp), + origTimestamp: typedPayload.timestamp, + } + } else { + return typedPayload + } + } + async function parseQueryResults( results: QueryResult, queryOptions?: QueryOptions, @@ -52,24 +73,8 @@ export function makeWakustore(waku: LightNode) { for (const messagePromise of messagePromises) { const message = await messagePromise if (message) { - const decodedPayload = decodeMessagePayload(message) - const typedPayload = JSON.parse(decodedPayload) as T & { timestamp?: number } - - // HACK to use waku timestamp instead of the type T's - if ( - typedPayload && - typeof typedPayload === 'object' && - !Array.isArray(typedPayload) && - typedPayload.timestamp - ) { - typedResults.push({ - ...typedPayload, - timestamp: Number(message.timestamp), - origTimestamp: typedPayload.timestamp, - }) - } else { - typedResults.push(typedPayload) - } + const typedResult = decodedMessageToTypedResult(message) + typedResults.push(typedResult) // reached the limit if ( @@ -104,6 +109,16 @@ export function makeWakustore(waku: LightNode) { } async function onSnapshot(query: Query, callback: (value: T) => void): Promise { + const subscription = await subscribe( + waku, + query.contentTopic, + query.id, + (msg: DecodedMessage) => { + const typedResult = decodedMessageToTypedResult(msg) + callback(typedResult) + }, + ) + const queryOptions = { ...query.queryOptions, pageSize: query.queryOptions?.pageSize ?? query.queryOptions?.limit, @@ -115,12 +130,7 @@ export function makeWakustore(waku: LightNode) { callback(value) } - return await subscribe(waku, query.contentTopic, query.id, (msg: DecodedMessage) => { - const decodedPayload = decodeMessagePayload(msg) - const decodedValue = JSON.parse(decodedPayload) as T - - callback(decodedValue) - }) + return subscription } return {