Skip to content

Commit

Permalink
Optimise CachePrimer to skip chats where the cache is already up to…
Browse files Browse the repository at this point in the history
… date
  • Loading branch information
hpeebles committed Sep 19, 2023
1 parent 4837ad5 commit 1373155
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 68 deletions.
12 changes: 11 additions & 1 deletion frontend/openchat-agent/src/services/openchatAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import type { Identity } from "@dfinity/agent";
import {
type Database,
getCachedChats,
getCachePrimerTimestamps,
initDb,
loadFailedMessages,
removeFailedMessage,
setCachedChats,
setCachedMessageIfNotExists,
setCachePrimerTimestamp,
} from "../utils/caching";
import { getAllUsers } from "../utils/userCache";
import { getCachedRegistry, setCachedRegistry } from "../utils/registryCache";
Expand Down Expand Up @@ -2465,7 +2467,7 @@ export class OpenChatAgent extends EventTarget {
): Promise<UpdateUserGroupResponse> {
return this.communityClient(communityId).updateUserGroup(userGroupId, name, usersToAdd, usersToRemove);
}

setMemberDisplayName(communityId: string, display_name: string | undefined): Promise<SetMemberDisplayNameResponse> {
return this.communityClient(communityId).setMemberDisplayName(display_name);
}
Expand All @@ -2476,4 +2478,12 @@ export class OpenChatAgent extends EventTarget {
): Promise<DeleteUserGroupsResponse> {
return this.communityClient(communityId).deleteUserGroups(userGroupIds);
}

getCachePrimerTimestamps(): Promise<Record<string, bigint>> {
return getCachePrimerTimestamps(this.db);
}

setCachePrimerTimestamp(chatIdentifierString: string, timestamp: bigint): Promise<void> {
return setCachePrimerTimestamp(this.db, chatIdentifierString, timestamp);
}
}
2 changes: 2 additions & 0 deletions frontend/openchat-agent/src/services/user/mappers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ function directChatSummaryUpdates(candid: ApiDirectChatSummaryUpdates): DirectCh
id: { kind: "direct_chat", userId: candid.chat_id.toString() },
readByMeUpTo: optional(candid.read_by_me_up_to, identity),
readByThemUpTo: optional(candid.read_by_them_up_to, identity),
lastUpdated: candid.last_updated,
latestMessage: optional(candid.latest_message, (ev) => ({
index: ev.index,
timestamp: ev.timestamp,
Expand Down Expand Up @@ -819,6 +820,7 @@ function directChatSummary(candid: ApiDirectChatSummary): DirectChatSummary {
},
them: { kind: "direct_chat", userId: candid.them.toString() },
latestEventIndex: candid.latest_event_index,
lastUpdated: candid.last_updated,
readByThemUpTo: optional(candid.read_by_them_up_to, identity),
dateCreated: candid.date_created,
metrics: chatMetrics(candid.metrics),
Expand Down
75 changes: 54 additions & 21 deletions frontend/openchat-agent/src/utils/caching.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { MAX_EVENTS, MAX_MESSAGES } from "../constants";
import { openDB, type DBSchema, type IDBPDatabase } from "idb";
import {
openDB,
type DBSchema,
type IDBPDatabase,
type StoreNames,
type StoreValue
} from "idb";
import type {
ChatEvent,
ChatIdentifier,
Expand All @@ -21,14 +27,14 @@ import type {
} from "openchat-shared";
import {
chatIdentifiersEqual,
chatIdentifierToString,
ChatMap,
UnsupportedValueError,
MessageContextMap,
} from "openchat-shared";
import type { Principal } from "@dfinity/principal";
import { toRecord } from "./list";

const CACHE_VERSION = 82;
const CACHE_VERSION = 83;

export type Database = Promise<IDBPDatabase<ChatSchema>>;

Expand Down Expand Up @@ -78,6 +84,11 @@ export interface ChatSchema extends DBSchema {
key: string;
value: EnhancedWrapper<Message>;
};

cachePrimer: {
key: string;
value: bigint;
}
}

function padMessageIndex(i: number): string {
Expand All @@ -95,27 +106,14 @@ export function createFailedCacheKey(context: MessageContext, messageId: bigint)

function messageContextToString({ chatId, threadRootMessageIndex }: MessageContext): string {
return threadRootMessageIndex === undefined
? chatIdentiferToString(chatId)
: `${chatIdentiferToString(chatId)}_${threadRootMessageIndex}`;
? chatIdentifierToString(chatId)
: `${chatIdentifierToString(chatId)}_${threadRootMessageIndex}`;
}

export function createCacheKey(context: MessageContext, index: number): string {
return `${messageContextToString(context)}_${padMessageIndex(index)}`;
}

function chatIdentiferToString(chatId: ChatIdentifier): string {
if (chatId.kind === "channel") {
return `${chatId.communityId}_${chatId.channelId}`;
}
if (chatId.kind === "direct_chat") {
return chatId.userId;
}
if (chatId.kind === "group_chat") {
return chatId.groupId;
}
throw new UnsupportedValueError("Unknown chatId kind", chatId);
}

export function openCache(principal: Principal): Database {
return openDB<ChatSchema>(`openchat_db_${principal}`, CACHE_VERSION, {
upgrade(db, _oldVersion, _newVersion) {
Expand All @@ -140,6 +138,9 @@ export function openCache(principal: Principal): Database {
if (db.objectStoreNames.contains("failed_thread_messages")) {
db.deleteObjectStore("failed_thread_messages");
}
if (db.objectStoreNames.contains("cachePrimer")) {
db.deleteObjectStore("cachePrimer");
}
const chatEvents = db.createObjectStore("chat_events");
chatEvents.createIndex("messageIdx", "messageKey");
const threadEvents = db.createObjectStore("thread_events");
Expand All @@ -149,6 +150,7 @@ export function openCache(principal: Principal): Database {
db.createObjectStore("community_details");
db.createObjectStore("failed_chat_messages");
db.createObjectStore("failed_thread_messages");
db.createObjectStore("cachePrimer");
},
});
}
Expand Down Expand Up @@ -593,7 +595,7 @@ export async function setCachedEvents<T extends ChatEvent>(
db: Database,
chatId: ChatIdentifier,
resp: EventsResponse<T>,
threadRootMessageIndex?: number,
threadRootMessageIndex: number | undefined,
): Promise<void> {
if (resp === "events_failed") return;
const store = threadRootMessageIndex !== undefined ? "thread_events" : "chat_events";
Expand All @@ -603,8 +605,8 @@ export async function setCachedEvents<T extends ChatEvent>(
});
const eventStore = tx.objectStore(store);
await Promise.all(
resp.events.map(async (event) => {
await eventStore.put(
resp.events.map((event) => {
eventStore.put(
makeSerialisable<T>(event, chatId, true, threadRootMessageIndex),
createCacheKey({ chatId, threadRootMessageIndex }, event.index),
);
Expand Down Expand Up @@ -654,6 +656,18 @@ export async function setCachedMessageIfNotExists(
await tx.done;
}

export function getCachePrimerTimestamps(db: Database): Promise<Record<string, bigint>> {
return readAll(db, "cachePrimer");
}

export async function setCachePrimerTimestamp(
db: Database,
chatIdentifierString: string,
timestamp: bigint,
): Promise<void> {
await (await db).put("cachePrimer", timestamp, chatIdentifierString);
}

function messageToEvent(message: Message, resp: SendMessageSuccess): EventWrapper<Message> {
return {
event: {
Expand Down Expand Up @@ -752,3 +766,22 @@ function makeChatSummarySerializable<T extends ChatSummary>(chat: T): T {
latestMessage: makeSerialisable(chat.latestMessage, chat.id, true),
};
}

async function readAll<Name extends StoreNames<ChatSchema>>(
db: Database,
storeName: Name
): Promise<Record<string, StoreValue<ChatSchema, Name>>> {
const transaction = (await db).transaction([storeName]);
const store = transaction.objectStore(storeName);
const cursor = await store.openCursor();
const values: Record<string, StoreValue<ChatSchema, Name>> = {};
while (cursor?.key !== undefined) {
values[cursor.key as string] = cursor.value;
try {
await cursor.continue();
} catch {
break;
}
}
return values;
}
1 change: 1 addition & 0 deletions frontend/openchat-agent/src/utils/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ export function mergeDirectChatUpdates(
them: c.them,
readByThemUpTo: u.readByThemUpTo ?? c.readByThemUpTo,
dateCreated: c.dateCreated,
lastUpdated: u.lastUpdated,
latestEventIndex: u.latestEventIndex ?? c.latestEventIndex,
latestMessage: u.latestMessage ?? c.latestMessage,
metrics: u.metrics ?? c.metrics,
Expand Down
24 changes: 15 additions & 9 deletions frontend/openchat-client/src/openchat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ import {
unreadFavouriteChats,
unreadCommunityChannels,
globalUnreadCount,
getAllServerChats,
} from "./stores/global";
import { localCommunitySummaryUpdates } from "./stores/localCommunitySummaryUpdates";
import { hasFlag, moderationFlags } from "./stores/flagStore";
Expand Down Expand Up @@ -4404,16 +4403,15 @@ export class OpenChat extends OpenChatAgentWorker {
await updateRegistryTask;
}

const updatedChats = (chatsResponse.state.directChats as ChatSummary[])
const chats = (chatsResponse.state.directChats as ChatSummary[])
.concat(chatsResponse.state.groupChats)
.concat(chatsResponse.state.communities.flatMap((c) => c.channels));

this.updateReadUpToStore(updatedChats);
const chats = getAllServerChats(this._liveState.globalState).values();
this.updateReadUpToStore(chats);

this._cachePrimer?.processChatUpdates(chats, updatedChats);
this._cachePrimer?.processChats(chats);

const userIds = this.userIdsFromChatSummaries(updatedChats);
const userIds = this.userIdsFromChatSummaries(chats);
if (!init) {
for (const userId of this.user.referrals) {
userIds.add(userId);
Expand Down Expand Up @@ -4451,7 +4449,7 @@ export class OpenChat extends OpenChatAgentWorker {
}

if (this._liveState.uninitializedDirectChats.size > 0) {
for (const chat of updatedChats) {
for (const chat of chats) {
if (this._liveState.uninitializedDirectChats.has(chat.id)) {
removeUninitializedDirectChat(chat.id);
}
Expand All @@ -4460,7 +4458,7 @@ export class OpenChat extends OpenChatAgentWorker {

setGlobalState(
chatsResponse.state.communities,
updatedChats,
chats,
chatsResponse.state.favouriteChats,
{
group_chat: chatsResponse.state.pinnedGroupChats,
Expand Down Expand Up @@ -4507,7 +4505,7 @@ export class OpenChat extends OpenChatAgentWorker {
// If the latest message in a chat is sent by the current user, then we know they must have read up to
// that message, so we mark the chat as read up to that message if it isn't already. This happens when a
// user sends a message on one device then looks at OpenChat on another.
for (const chat of updatedChats) {
for (const chat of chats) {
const latestMessage = chat.latestMessage?.event;
if (
latestMessage !== undefined &&
Expand Down Expand Up @@ -4914,6 +4912,14 @@ export class OpenChat extends OpenChatAgentWorker {
}
}

getCachePrimerTimestamps(): Promise<Record<string, bigint>> {
return this.sendRequest({ kind: "getCachePrimerTimestamps" });
}

setCachePrimerTimestamp(chatIdentifierString: string, timestamp: bigint): Promise<void> {
return this.sendRequest({ kind: "setCachePrimerTimestamp", chatIdentifierString, timestamp });
}

// **** Communities Stuff

// takes a list of communities that may contain communities that we are a member of and/or preview communities
Expand Down
1 change: 1 addition & 0 deletions frontend/openchat-client/src/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ export function createDirectChat(chatId: DirectChatIdentifier): void {
readByThemUpTo: undefined,
latestMessage: undefined,
latestEventIndex: 0,
lastUpdated: BigInt(Date.now()),
dateCreated: BigInt(Date.now()),
metrics: emptyChatMetrics(),
membership: {
Expand Down
29 changes: 13 additions & 16 deletions frontend/openchat-client/src/utils/cachePrimer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ChatEvent, ChatSummary, EventsResponse, IndexRange } from "openchat-shared";
import { ChatMap, compareChats, missingUserIds, userIdsFromEvents } from "openchat-shared";
import { ChatMap, compareChats, missingUserIds, userIdsFromEvents, chatIdentifierToString } from "openchat-shared";
import { Poller } from "./poller";
import { boolFromLS } from "../stores/localStorageSetting";
import { messagesRead } from "../stores/markRead";
Expand All @@ -16,19 +16,19 @@ export class CachePrimer {
debug("initialized");
}

processChatUpdates(previous: ChatSummary[], next: ChatSummary[]): void {
const record = ChatMap.fromList(previous);
const updated = next.filter(
(c) => !c.membership.archived && hasBeenUpdated(record.get(c.id), c)
);

if (updated.length > 0) {
for (const chat of updated) {
this.pending.set(chat.id, chat);
debug("enqueued " + chat.id);
async processChats(chats: ChatSummary[]): Promise<void> {
if (chats.length > 0) {
const lastUpdatedTimestamps = await this.api.getCachePrimerTimestamps();
for (const chat of chats) {
if (chat.membership.archived) continue;
const lastUpdated = lastUpdatedTimestamps[chatIdentifierToString(chat.id)];
if (lastUpdated === undefined || lastUpdated < chat.lastUpdated) {
this.pending.set(chat.id, chat);
debug("enqueued " + chat.id);
}
}

if (this.runner === undefined) {
if (this.pending.size > 0 && this.runner === undefined) {
this.runner = new Poller(() => runOnceIdle(() => this.processNext()), 0);
debug("runner started");
}
Expand Down Expand Up @@ -76,6 +76,7 @@ export class CachePrimer {
);
}
}
await this.api.setCachePrimerTimestamp(chatIdentifierToString(chat.id), chat.lastUpdated);
debug(chat.id + " completed");
} finally {
if (this.pending.size === 0) {
Expand Down Expand Up @@ -120,10 +121,6 @@ export class CachePrimer {
}
}

function hasBeenUpdated(previous: ChatSummary | undefined, next: ChatSummary): boolean {
return previous === undefined || next.latestEventIndex > previous.latestEventIndex;
}

function debug(message: string) {
if (boolFromLS("openchat_cache_primer_debug_enabled", false)) {
console.debug("CachePrimer - " + message);
Expand Down
Loading

0 comments on commit 1373155

Please sign in to comment.