From 13731553237a5f29d7d17743320c7b6f273da5df Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Wed, 13 Sep 2023 15:25:02 +0100 Subject: [PATCH] Optimise `CachePrimer` to skip chats where the cache is already up to date --- .../src/services/openchatAgent.ts | 12 ++- .../src/services/user/mappers.ts | 2 + frontend/openchat-agent/src/utils/caching.ts | 75 +++++++++++++------ frontend/openchat-agent/src/utils/chat.ts | 1 + frontend/openchat-client/src/openchat.ts | 24 +++--- frontend/openchat-client/src/stores/chat.ts | 1 + .../openchat-client/src/utils/cachePrimer.ts | 29 ++++--- .../openchat-shared/src/domain/chat/chat.ts | 18 +---- frontend/openchat-shared/src/domain/worker.ts | 24 ++++-- frontend/openchat-shared/src/utils/chat.ts | 9 +++ frontend/openchat-worker/src/worker.ts | 14 ++++ 11 files changed, 141 insertions(+), 68 deletions(-) diff --git a/frontend/openchat-agent/src/services/openchatAgent.ts b/frontend/openchat-agent/src/services/openchatAgent.ts index caf79f5c7d..245c39c1c2 100644 --- a/frontend/openchat-agent/src/services/openchatAgent.ts +++ b/frontend/openchat-agent/src/services/openchatAgent.ts @@ -3,11 +3,13 @@ import type { Identity } from "@dfinity/agent"; import { type Database, getCachedChats, + getCachePrimerTimestamps, initDb, loadFailedMessages, removeFailedMessage, setCachedChats, setCachedMessageIfNotExists, + setCachePrimerTimestamp, } from "../utils/caching"; import { getAllUsers } from "../utils/userCache"; import { getCachedRegistry, setCachedRegistry } from "../utils/registryCache"; @@ -2465,7 +2467,7 @@ export class OpenChatAgent extends EventTarget { ): Promise { return this.communityClient(communityId).updateUserGroup(userGroupId, name, usersToAdd, usersToRemove); } - + setMemberDisplayName(communityId: string, display_name: string | undefined): Promise { return this.communityClient(communityId).setMemberDisplayName(display_name); } @@ -2476,4 +2478,12 @@ export class OpenChatAgent extends EventTarget { ): Promise { return this.communityClient(communityId).deleteUserGroups(userGroupIds); } + + getCachePrimerTimestamps(): Promise> { + return getCachePrimerTimestamps(this.db); + } + + setCachePrimerTimestamp(chatIdentifierString: string, timestamp: bigint): Promise { + return setCachePrimerTimestamp(this.db, chatIdentifierString, timestamp); + } } diff --git a/frontend/openchat-agent/src/services/user/mappers.ts b/frontend/openchat-agent/src/services/user/mappers.ts index 957abe2c5a..45dd348fc2 100644 --- a/frontend/openchat-agent/src/services/user/mappers.ts +++ b/frontend/openchat-agent/src/services/user/mappers.ts @@ -704,6 +704,7 @@ function directChatSummaryUpdates(candid: ApiDirectChatSummaryUpdates): DirectCh id: { kind: "direct_chat", userId: candid.chat_id.toString() }, readByMeUpTo: optional(candid.read_by_me_up_to, identity), readByThemUpTo: optional(candid.read_by_them_up_to, identity), + lastUpdated: candid.last_updated, latestMessage: optional(candid.latest_message, (ev) => ({ index: ev.index, timestamp: ev.timestamp, @@ -819,6 +820,7 @@ function directChatSummary(candid: ApiDirectChatSummary): DirectChatSummary { }, them: { kind: "direct_chat", userId: candid.them.toString() }, latestEventIndex: candid.latest_event_index, + lastUpdated: candid.last_updated, readByThemUpTo: optional(candid.read_by_them_up_to, identity), dateCreated: candid.date_created, metrics: chatMetrics(candid.metrics), diff --git a/frontend/openchat-agent/src/utils/caching.ts b/frontend/openchat-agent/src/utils/caching.ts index 06a071b154..40902b1964 100644 --- a/frontend/openchat-agent/src/utils/caching.ts +++ b/frontend/openchat-agent/src/utils/caching.ts @@ -1,5 +1,11 @@ import { MAX_EVENTS, MAX_MESSAGES } from "../constants"; -import { openDB, type DBSchema, type IDBPDatabase } from "idb"; +import { + openDB, + type DBSchema, + type IDBPDatabase, + type StoreNames, + type StoreValue +} from "idb"; import type { ChatEvent, ChatIdentifier, @@ -21,14 +27,14 @@ import type { } from "openchat-shared"; import { chatIdentifiersEqual, + chatIdentifierToString, ChatMap, - UnsupportedValueError, MessageContextMap, } from "openchat-shared"; import type { Principal } from "@dfinity/principal"; import { toRecord } from "./list"; -const CACHE_VERSION = 82; +const CACHE_VERSION = 83; export type Database = Promise>; @@ -78,6 +84,11 @@ export interface ChatSchema extends DBSchema { key: string; value: EnhancedWrapper; }; + + cachePrimer: { + key: string; + value: bigint; + } } function padMessageIndex(i: number): string { @@ -95,27 +106,14 @@ export function createFailedCacheKey(context: MessageContext, messageId: bigint) function messageContextToString({ chatId, threadRootMessageIndex }: MessageContext): string { return threadRootMessageIndex === undefined - ? chatIdentiferToString(chatId) - : `${chatIdentiferToString(chatId)}_${threadRootMessageIndex}`; + ? chatIdentifierToString(chatId) + : `${chatIdentifierToString(chatId)}_${threadRootMessageIndex}`; } export function createCacheKey(context: MessageContext, index: number): string { return `${messageContextToString(context)}_${padMessageIndex(index)}`; } -function chatIdentiferToString(chatId: ChatIdentifier): string { - if (chatId.kind === "channel") { - return `${chatId.communityId}_${chatId.channelId}`; - } - if (chatId.kind === "direct_chat") { - return chatId.userId; - } - if (chatId.kind === "group_chat") { - return chatId.groupId; - } - throw new UnsupportedValueError("Unknown chatId kind", chatId); -} - export function openCache(principal: Principal): Database { return openDB(`openchat_db_${principal}`, CACHE_VERSION, { upgrade(db, _oldVersion, _newVersion) { @@ -140,6 +138,9 @@ export function openCache(principal: Principal): Database { if (db.objectStoreNames.contains("failed_thread_messages")) { db.deleteObjectStore("failed_thread_messages"); } + if (db.objectStoreNames.contains("cachePrimer")) { + db.deleteObjectStore("cachePrimer"); + } const chatEvents = db.createObjectStore("chat_events"); chatEvents.createIndex("messageIdx", "messageKey"); const threadEvents = db.createObjectStore("thread_events"); @@ -149,6 +150,7 @@ export function openCache(principal: Principal): Database { db.createObjectStore("community_details"); db.createObjectStore("failed_chat_messages"); db.createObjectStore("failed_thread_messages"); + db.createObjectStore("cachePrimer"); }, }); } @@ -593,7 +595,7 @@ export async function setCachedEvents( db: Database, chatId: ChatIdentifier, resp: EventsResponse, - threadRootMessageIndex?: number, + threadRootMessageIndex: number | undefined, ): Promise { if (resp === "events_failed") return; const store = threadRootMessageIndex !== undefined ? "thread_events" : "chat_events"; @@ -603,8 +605,8 @@ export async function setCachedEvents( }); const eventStore = tx.objectStore(store); await Promise.all( - resp.events.map(async (event) => { - await eventStore.put( + resp.events.map((event) => { + eventStore.put( makeSerialisable(event, chatId, true, threadRootMessageIndex), createCacheKey({ chatId, threadRootMessageIndex }, event.index), ); @@ -654,6 +656,18 @@ export async function setCachedMessageIfNotExists( await tx.done; } +export function getCachePrimerTimestamps(db: Database): Promise> { + return readAll(db, "cachePrimer"); +} + +export async function setCachePrimerTimestamp( + db: Database, + chatIdentifierString: string, + timestamp: bigint, +): Promise { + await (await db).put("cachePrimer", timestamp, chatIdentifierString); +} + function messageToEvent(message: Message, resp: SendMessageSuccess): EventWrapper { return { event: { @@ -752,3 +766,22 @@ function makeChatSummarySerializable(chat: T): T { latestMessage: makeSerialisable(chat.latestMessage, chat.id, true), }; } + +async function readAll>( + db: Database, + storeName: Name +): Promise>> { + const transaction = (await db).transaction([storeName]); + const store = transaction.objectStore(storeName); + const cursor = await store.openCursor(); + const values: Record> = {}; + while (cursor?.key !== undefined) { + values[cursor.key as string] = cursor.value; + try { + await cursor.continue(); + } catch { + break; + } + } + return values; +} diff --git a/frontend/openchat-agent/src/utils/chat.ts b/frontend/openchat-agent/src/utils/chat.ts index aed2078dc8..cac304ef07 100644 --- a/frontend/openchat-agent/src/utils/chat.ts +++ b/frontend/openchat-agent/src/utils/chat.ts @@ -323,6 +323,7 @@ export function mergeDirectChatUpdates( them: c.them, readByThemUpTo: u.readByThemUpTo ?? c.readByThemUpTo, dateCreated: c.dateCreated, + lastUpdated: u.lastUpdated, latestEventIndex: u.latestEventIndex ?? c.latestEventIndex, latestMessage: u.latestMessage ?? c.latestMessage, metrics: u.metrics ?? c.metrics, diff --git a/frontend/openchat-client/src/openchat.ts b/frontend/openchat-client/src/openchat.ts index 0f36fa28de..0692715942 100644 --- a/frontend/openchat-client/src/openchat.ts +++ b/frontend/openchat-client/src/openchat.ts @@ -390,7 +390,6 @@ import { unreadFavouriteChats, unreadCommunityChannels, globalUnreadCount, - getAllServerChats, } from "./stores/global"; import { localCommunitySummaryUpdates } from "./stores/localCommunitySummaryUpdates"; import { hasFlag, moderationFlags } from "./stores/flagStore"; @@ -4404,16 +4403,15 @@ export class OpenChat extends OpenChatAgentWorker { await updateRegistryTask; } - const updatedChats = (chatsResponse.state.directChats as ChatSummary[]) + const chats = (chatsResponse.state.directChats as ChatSummary[]) .concat(chatsResponse.state.groupChats) .concat(chatsResponse.state.communities.flatMap((c) => c.channels)); - this.updateReadUpToStore(updatedChats); - const chats = getAllServerChats(this._liveState.globalState).values(); + this.updateReadUpToStore(chats); - this._cachePrimer?.processChatUpdates(chats, updatedChats); + this._cachePrimer?.processChats(chats); - const userIds = this.userIdsFromChatSummaries(updatedChats); + const userIds = this.userIdsFromChatSummaries(chats); if (!init) { for (const userId of this.user.referrals) { userIds.add(userId); @@ -4451,7 +4449,7 @@ export class OpenChat extends OpenChatAgentWorker { } if (this._liveState.uninitializedDirectChats.size > 0) { - for (const chat of updatedChats) { + for (const chat of chats) { if (this._liveState.uninitializedDirectChats.has(chat.id)) { removeUninitializedDirectChat(chat.id); } @@ -4460,7 +4458,7 @@ export class OpenChat extends OpenChatAgentWorker { setGlobalState( chatsResponse.state.communities, - updatedChats, + chats, chatsResponse.state.favouriteChats, { group_chat: chatsResponse.state.pinnedGroupChats, @@ -4507,7 +4505,7 @@ export class OpenChat extends OpenChatAgentWorker { // If the latest message in a chat is sent by the current user, then we know they must have read up to // that message, so we mark the chat as read up to that message if it isn't already. This happens when a // user sends a message on one device then looks at OpenChat on another. - for (const chat of updatedChats) { + for (const chat of chats) { const latestMessage = chat.latestMessage?.event; if ( latestMessage !== undefined && @@ -4914,6 +4912,14 @@ export class OpenChat extends OpenChatAgentWorker { } } + getCachePrimerTimestamps(): Promise> { + return this.sendRequest({ kind: "getCachePrimerTimestamps" }); + } + + setCachePrimerTimestamp(chatIdentifierString: string, timestamp: bigint): Promise { + return this.sendRequest({ kind: "setCachePrimerTimestamp", chatIdentifierString, timestamp }); + } + // **** Communities Stuff // takes a list of communities that may contain communities that we are a member of and/or preview communities diff --git a/frontend/openchat-client/src/stores/chat.ts b/frontend/openchat-client/src/stores/chat.ts index 591f933b37..f148cc4f6e 100644 --- a/frontend/openchat-client/src/stores/chat.ts +++ b/frontend/openchat-client/src/stores/chat.ts @@ -579,6 +579,7 @@ export function createDirectChat(chatId: DirectChatIdentifier): void { readByThemUpTo: undefined, latestMessage: undefined, latestEventIndex: 0, + lastUpdated: BigInt(Date.now()), dateCreated: BigInt(Date.now()), metrics: emptyChatMetrics(), membership: { diff --git a/frontend/openchat-client/src/utils/cachePrimer.ts b/frontend/openchat-client/src/utils/cachePrimer.ts index 513ec703a0..ef583f2a34 100644 --- a/frontend/openchat-client/src/utils/cachePrimer.ts +++ b/frontend/openchat-client/src/utils/cachePrimer.ts @@ -1,5 +1,5 @@ import type { ChatEvent, ChatSummary, EventsResponse, IndexRange } from "openchat-shared"; -import { ChatMap, compareChats, missingUserIds, userIdsFromEvents } from "openchat-shared"; +import { ChatMap, compareChats, missingUserIds, userIdsFromEvents, chatIdentifierToString } from "openchat-shared"; import { Poller } from "./poller"; import { boolFromLS } from "../stores/localStorageSetting"; import { messagesRead } from "../stores/markRead"; @@ -16,19 +16,19 @@ export class CachePrimer { debug("initialized"); } - processChatUpdates(previous: ChatSummary[], next: ChatSummary[]): void { - const record = ChatMap.fromList(previous); - const updated = next.filter( - (c) => !c.membership.archived && hasBeenUpdated(record.get(c.id), c) - ); - - if (updated.length > 0) { - for (const chat of updated) { - this.pending.set(chat.id, chat); - debug("enqueued " + chat.id); + async processChats(chats: ChatSummary[]): Promise { + if (chats.length > 0) { + const lastUpdatedTimestamps = await this.api.getCachePrimerTimestamps(); + for (const chat of chats) { + if (chat.membership.archived) continue; + const lastUpdated = lastUpdatedTimestamps[chatIdentifierToString(chat.id)]; + if (lastUpdated === undefined || lastUpdated < chat.lastUpdated) { + this.pending.set(chat.id, chat); + debug("enqueued " + chat.id); + } } - if (this.runner === undefined) { + if (this.pending.size > 0 && this.runner === undefined) { this.runner = new Poller(() => runOnceIdle(() => this.processNext()), 0); debug("runner started"); } @@ -76,6 +76,7 @@ export class CachePrimer { ); } } + await this.api.setCachePrimerTimestamp(chatIdentifierToString(chat.id), chat.lastUpdated); debug(chat.id + " completed"); } finally { if (this.pending.size === 0) { @@ -120,10 +121,6 @@ export class CachePrimer { } } -function hasBeenUpdated(previous: ChatSummary | undefined, next: ChatSummary): boolean { - return previous === undefined || next.latestEventIndex > previous.latestEventIndex; -} - function debug(message: string) { if (boolFromLS("openchat_cache_primer_debug_enabled", false)) { console.debug("CachePrimer - " + message); diff --git a/frontend/openchat-shared/src/domain/chat/chat.ts b/frontend/openchat-shared/src/domain/chat/chat.ts index d750149e33..313dccc01c 100644 --- a/frontend/openchat-shared/src/domain/chat/chat.ts +++ b/frontend/openchat-shared/src/domain/chat/chat.ts @@ -823,19 +823,6 @@ export type DirectChatsInitial = { export type ChatIdentifier = ChannelIdentifier | DirectChatIdentifier | GroupChatIdentifier; export type MultiUserChatIdentifier = ChannelIdentifier | GroupChatIdentifier; -export function chatIdentifierToString(id: ChatIdentifier): string { - switch (id.kind) { - case "direct_chat": - return id.userId; - case "group_chat": - return id.groupId; - default: - throw new Error( - "TODO Channel chat identifiers should not serialised - get rid of the calling code", - ); - } -} - export function messageContextsEqual( a: MessageContext | undefined, b: MessageContext | undefined, @@ -1027,6 +1014,7 @@ export type ChatSummaryUpdates = DirectChatSummaryUpdates | GroupChatSummaryUpda type ChatSummaryUpdatesCommon = { readByMeUpTo?: number; + lastUpdated: bigint; latestEventIndex?: number; latestMessage?: EventWrapper; notificationsMuted?: boolean; @@ -1045,7 +1033,6 @@ export type DirectChatSummaryUpdates = ChatSummaryUpdatesCommon & { export type GroupChatSummaryUpdates = ChatSummaryUpdatesCommon & { id: GroupChatIdentifier; kind: "group_chat"; - lastUpdated: bigint; name?: string; description?: string; avatarBlobReferenceUpdate?: OptionUpdate; @@ -1143,6 +1130,7 @@ export type MultiUserChat = GroupChatSummary | ChannelSummary; export type ChatType = ChatSummary["kind"]; type ChatSummaryCommon = HasMembershipRole & { + lastUpdated: bigint; latestEventIndex: number; latestMessage?: EventWrapper; metrics: Metrics; @@ -1161,7 +1149,6 @@ export type ChannelSummary = DataContent & description: string; minVisibleEventIndex: number; minVisibleMessageIndex: number; - lastUpdated: bigint; memberCount: number; dateLastPinned: bigint | undefined; dateReadPinned: bigint | undefined; @@ -1186,7 +1173,6 @@ export type GroupChatSummary = DataContent & description: string; minVisibleEventIndex: number; minVisibleMessageIndex: number; - lastUpdated: bigint; memberCount: number; subtype: GroupSubtype; previewed: boolean; diff --git a/frontend/openchat-shared/src/domain/worker.ts b/frontend/openchat-shared/src/domain/worker.ts index b5ed5b9ca7..a3dc7ee957 100644 --- a/frontend/openchat-shared/src/domain/worker.ts +++ b/frontend/openchat-shared/src/domain/worker.ts @@ -275,7 +275,9 @@ export type WorkerRequest = | CreateUserGroup | UpdateUserGroup | DeleteUserGroups - | SetMemberDisplayName; + | SetMemberDisplayName + | GetCachePrimerTimestamps + | SetCachePrimerTimestamp; type SetCommunityIndexes = { kind: "setCommunityIndexes"; @@ -921,6 +923,16 @@ type DeleteUserGroups = { kind: "deleteUserGroups"; }; +type GetCachePrimerTimestamps = { + kind: "getCachePrimerTimestamps"; +} + +type SetCachePrimerTimestamp = { + chatIdentifierString: string; + timestamp: bigint; + kind: "setCachePrimerTimestamp"; +} + /** * Worker error type */ @@ -949,12 +961,10 @@ export type WorkerResponseInner = | SetUsernameResponse | PublicProfile | UserSummary - | undefined | ThreadPreview[] | SearchDirectChatResponse | SearchGroupChatResponse | Rules - | undefined | GroupChatSummary[] | RegisterProposalVoteResponse | ChangeRoleResponse @@ -984,7 +994,6 @@ export type WorkerResponseInner = | ArchiveChatResponse | ToggleMuteNotificationResponse | GroupChatSummary - | undefined | StorageStatus | MigrateUserPrincipalResponse | UserSummary[] @@ -1042,7 +1051,8 @@ export type WorkerResponseInner = | RegistryValue | CreateUserGroupResponse | UpdateUserGroupResponse - | DeleteUserGroupsResponse; + | DeleteUserGroupsResponse + | Record; export type WorkerResponse = Response; @@ -1532,4 +1542,8 @@ export type WorkerResult = T extends PinMessage ? DeleteUserGroupsResponse : T extends SetMemberDisplayName ? SetMemberDisplayNameResponse + : T extends GetCachePrimerTimestamps + ? Record + : T extends SetCachePrimerTimestamp + ? void : never; diff --git a/frontend/openchat-shared/src/utils/chat.ts b/frontend/openchat-shared/src/utils/chat.ts index 0ace0d112c..50913ff2ae 100644 --- a/frontend/openchat-shared/src/utils/chat.ts +++ b/frontend/openchat-shared/src/utils/chat.ts @@ -258,3 +258,12 @@ export function routeForChatIdentifier(scope: ChatListScope["kind"], id: ChatIde return `${prefix}/community/${id.communityId}/channel/${id.channelId}`; } } + +export function chatIdentifierToString(chatId: ChatIdentifier): string { + switch (chatId.kind) { + case "direct_chat": return chatId.userId; + case "group_chat": return chatId.groupId; + case "channel": return `${chatId.communityId}_${chatId.channelId}`; + default: throw new UnsupportedValueError("Unknown chatId kind", chatId); + } +} diff --git a/frontend/openchat-worker/src/worker.ts b/frontend/openchat-worker/src/worker.ts index a439b9a588..e10267d0db 100644 --- a/frontend/openchat-worker/src/worker.ts +++ b/frontend/openchat-worker/src/worker.ts @@ -963,6 +963,20 @@ self.addEventListener("message", (msg: MessageEvent) => )); break; + case "getCachePrimerTimestamps": + executeThenReply(payload, correlationId, agent + .getCachePrimerTimestamps()); + break; + + case "setCachePrimerTimestamp": + executeThenReply(payload, correlationId, agent + .setCachePrimerTimestamp( + payload.chatIdentifierString, + payload.timestamp + ) + .then(() => undefined)); + break; + default: logger?.debug("WORKER: unknown message kind received: ", kind); }