Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise CachePrimer to skip chats where cache is already up to date #4398

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion frontend/openchat-agent/src/services/openchatAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import type { Identity } from "@dfinity/agent";
import {
type Database,
getCachedChats,
getCachePrimerTimestamps,
initDb,
loadFailedMessages,
removeFailedMessage,
setCachedChats,
setCachedMessageIfNotExists,
setCachePrimerTimestamp,
} from "../utils/caching";
import { getAllUsers } from "../utils/userCache";
import { getCachedRegistry, setCachedRegistry } from "../utils/registryCache";
Expand Down Expand Up @@ -2465,7 +2467,7 @@ export class OpenChatAgent extends EventTarget {
): Promise<UpdateUserGroupResponse> {
return this.communityClient(communityId).updateUserGroup(userGroupId, name, usersToAdd, usersToRemove);
}

setMemberDisplayName(communityId: string, display_name: string | undefined): Promise<SetMemberDisplayNameResponse> {
return this.communityClient(communityId).setMemberDisplayName(display_name);
}
Expand All @@ -2476,4 +2478,12 @@ export class OpenChatAgent extends EventTarget {
): Promise<DeleteUserGroupsResponse> {
return this.communityClient(communityId).deleteUserGroups(userGroupIds);
}

getCachePrimerTimestamps(): Promise<Record<string, bigint>> {
return getCachePrimerTimestamps(this.db);
}

setCachePrimerTimestamp(chatIdentifierString: string, timestamp: bigint): Promise<void> {
return setCachePrimerTimestamp(this.db, chatIdentifierString, timestamp);
}
}
2 changes: 2 additions & 0 deletions frontend/openchat-agent/src/services/user/mappers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ function directChatSummaryUpdates(candid: ApiDirectChatSummaryUpdates): DirectCh
id: { kind: "direct_chat", userId: candid.chat_id.toString() },
readByMeUpTo: optional(candid.read_by_me_up_to, identity),
readByThemUpTo: optional(candid.read_by_them_up_to, identity),
lastUpdated: candid.last_updated,
latestMessage: optional(candid.latest_message, (ev) => ({
index: ev.index,
timestamp: ev.timestamp,
Expand Down Expand Up @@ -819,6 +820,7 @@ function directChatSummary(candid: ApiDirectChatSummary): DirectChatSummary {
},
them: { kind: "direct_chat", userId: candid.them.toString() },
latestEventIndex: candid.latest_event_index,
lastUpdated: candid.last_updated,
readByThemUpTo: optional(candid.read_by_them_up_to, identity),
dateCreated: candid.date_created,
metrics: chatMetrics(candid.metrics),
Expand Down
75 changes: 54 additions & 21 deletions frontend/openchat-agent/src/utils/caching.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { MAX_EVENTS, MAX_MESSAGES } from "../constants";
import { openDB, type DBSchema, type IDBPDatabase } from "idb";
import {
openDB,
type DBSchema,
type IDBPDatabase,
type StoreNames,
type StoreValue
} from "idb";
import type {
ChatEvent,
ChatIdentifier,
Expand All @@ -21,14 +27,14 @@ import type {
} from "openchat-shared";
import {
chatIdentifiersEqual,
chatIdentifierToString,
ChatMap,
UnsupportedValueError,
MessageContextMap,
} from "openchat-shared";
import type { Principal } from "@dfinity/principal";
import { toRecord } from "./list";

const CACHE_VERSION = 82;
const CACHE_VERSION = 83;

export type Database = Promise<IDBPDatabase<ChatSchema>>;

Expand Down Expand Up @@ -78,6 +84,11 @@ export interface ChatSchema extends DBSchema {
key: string;
value: EnhancedWrapper<Message>;
};

cachePrimer: {
key: string;
value: bigint;
}
}

function padMessageIndex(i: number): string {
Expand All @@ -95,27 +106,14 @@ export function createFailedCacheKey(context: MessageContext, messageId: bigint)

function messageContextToString({ chatId, threadRootMessageIndex }: MessageContext): string {
return threadRootMessageIndex === undefined
? chatIdentiferToString(chatId)
: `${chatIdentiferToString(chatId)}_${threadRootMessageIndex}`;
? chatIdentifierToString(chatId)
: `${chatIdentifierToString(chatId)}_${threadRootMessageIndex}`;
}

export function createCacheKey(context: MessageContext, index: number): string {
return `${messageContextToString(context)}_${padMessageIndex(index)}`;
}

function chatIdentiferToString(chatId: ChatIdentifier): string {
if (chatId.kind === "channel") {
return `${chatId.communityId}_${chatId.channelId}`;
}
if (chatId.kind === "direct_chat") {
return chatId.userId;
}
if (chatId.kind === "group_chat") {
return chatId.groupId;
}
throw new UnsupportedValueError("Unknown chatId kind", chatId);
}

export function openCache(principal: Principal): Database {
return openDB<ChatSchema>(`openchat_db_${principal}`, CACHE_VERSION, {
upgrade(db, _oldVersion, _newVersion) {
Expand All @@ -140,6 +138,9 @@ export function openCache(principal: Principal): Database {
if (db.objectStoreNames.contains("failed_thread_messages")) {
db.deleteObjectStore("failed_thread_messages");
}
if (db.objectStoreNames.contains("cachePrimer")) {
db.deleteObjectStore("cachePrimer");
}
const chatEvents = db.createObjectStore("chat_events");
chatEvents.createIndex("messageIdx", "messageKey");
const threadEvents = db.createObjectStore("thread_events");
Expand All @@ -149,6 +150,7 @@ export function openCache(principal: Principal): Database {
db.createObjectStore("community_details");
db.createObjectStore("failed_chat_messages");
db.createObjectStore("failed_thread_messages");
db.createObjectStore("cachePrimer");
},
});
}
Expand Down Expand Up @@ -593,7 +595,7 @@ export async function setCachedEvents<T extends ChatEvent>(
db: Database,
chatId: ChatIdentifier,
resp: EventsResponse<T>,
threadRootMessageIndex?: number,
threadRootMessageIndex: number | undefined,
): Promise<void> {
if (resp === "events_failed") return;
const store = threadRootMessageIndex !== undefined ? "thread_events" : "chat_events";
Expand All @@ -603,8 +605,8 @@ export async function setCachedEvents<T extends ChatEvent>(
});
const eventStore = tx.objectStore(store);
await Promise.all(
resp.events.map(async (event) => {
await eventStore.put(
resp.events.map((event) => {
eventStore.put(
makeSerialisable<T>(event, chatId, true, threadRootMessageIndex),
createCacheKey({ chatId, threadRootMessageIndex }, event.index),
);
Expand Down Expand Up @@ -654,6 +656,18 @@ export async function setCachedMessageIfNotExists(
await tx.done;
}

export function getCachePrimerTimestamps(db: Database): Promise<Record<string, bigint>> {
return readAll(db, "cachePrimer");
}

export async function setCachePrimerTimestamp(
db: Database,
chatIdentifierString: string,
timestamp: bigint,
): Promise<void> {
await (await db).put("cachePrimer", timestamp, chatIdentifierString);
}

function messageToEvent(message: Message, resp: SendMessageSuccess): EventWrapper<Message> {
return {
event: {
Expand Down Expand Up @@ -752,3 +766,22 @@ function makeChatSummarySerializable<T extends ChatSummary>(chat: T): T {
latestMessage: makeSerialisable(chat.latestMessage, chat.id, true),
};
}

async function readAll<Name extends StoreNames<ChatSchema>>(
julianjelfs marked this conversation as resolved.
Show resolved Hide resolved
db: Database,
storeName: Name
): Promise<Record<string, StoreValue<ChatSchema, Name>>> {
const transaction = (await db).transaction([storeName]);
const store = transaction.objectStore(storeName);
const cursor = await store.openCursor();
const values: Record<string, StoreValue<ChatSchema, Name>> = {};
while (cursor?.key !== undefined) {
values[cursor.key as string] = cursor.value;
try {
await cursor.continue();
} catch {
break;
}
}
return values;
}
1 change: 1 addition & 0 deletions frontend/openchat-agent/src/utils/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ export function mergeDirectChatUpdates(
them: c.them,
readByThemUpTo: u.readByThemUpTo ?? c.readByThemUpTo,
dateCreated: c.dateCreated,
lastUpdated: u.lastUpdated,
latestEventIndex: u.latestEventIndex ?? c.latestEventIndex,
latestMessage: u.latestMessage ?? c.latestMessage,
metrics: u.metrics ?? c.metrics,
Expand Down
24 changes: 15 additions & 9 deletions frontend/openchat-client/src/openchat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ import {
unreadFavouriteChats,
unreadCommunityChannels,
globalUnreadCount,
getAllServerChats,
} from "./stores/global";
import { localCommunitySummaryUpdates } from "./stores/localCommunitySummaryUpdates";
import { hasFlag, moderationFlags } from "./stores/flagStore";
Expand Down Expand Up @@ -4404,16 +4403,15 @@ export class OpenChat extends OpenChatAgentWorker {
await updateRegistryTask;
}

const updatedChats = (chatsResponse.state.directChats as ChatSummary[])
const chats = (chatsResponse.state.directChats as ChatSummary[])
.concat(chatsResponse.state.groupChats)
.concat(chatsResponse.state.communities.flatMap((c) => c.channels));

this.updateReadUpToStore(updatedChats);
const chats = getAllServerChats(this._liveState.globalState).values();
this.updateReadUpToStore(chats);

this._cachePrimer?.processChatUpdates(chats, updatedChats);
this._cachePrimer?.processChats(chats);
julianjelfs marked this conversation as resolved.
Show resolved Hide resolved

const userIds = this.userIdsFromChatSummaries(updatedChats);
const userIds = this.userIdsFromChatSummaries(chats);
if (!init) {
for (const userId of this.user.referrals) {
userIds.add(userId);
Expand Down Expand Up @@ -4451,7 +4449,7 @@ export class OpenChat extends OpenChatAgentWorker {
}

if (this._liveState.uninitializedDirectChats.size > 0) {
for (const chat of updatedChats) {
for (const chat of chats) {
if (this._liveState.uninitializedDirectChats.has(chat.id)) {
removeUninitializedDirectChat(chat.id);
}
Expand All @@ -4460,7 +4458,7 @@ export class OpenChat extends OpenChatAgentWorker {

setGlobalState(
chatsResponse.state.communities,
updatedChats,
chats,
chatsResponse.state.favouriteChats,
{
group_chat: chatsResponse.state.pinnedGroupChats,
Expand Down Expand Up @@ -4507,7 +4505,7 @@ export class OpenChat extends OpenChatAgentWorker {
// If the latest message in a chat is sent by the current user, then we know they must have read up to
// that message, so we mark the chat as read up to that message if it isn't already. This happens when a
// user sends a message on one device then looks at OpenChat on another.
for (const chat of updatedChats) {
for (const chat of chats) {
const latestMessage = chat.latestMessage?.event;
if (
latestMessage !== undefined &&
Expand Down Expand Up @@ -4914,6 +4912,14 @@ export class OpenChat extends OpenChatAgentWorker {
}
}

getCachePrimerTimestamps(): Promise<Record<string, bigint>> {
return this.sendRequest({ kind: "getCachePrimerTimestamps" });
}

setCachePrimerTimestamp(chatIdentifierString: string, timestamp: bigint): Promise<void> {
return this.sendRequest({ kind: "setCachePrimerTimestamp", chatIdentifierString, timestamp });
}

// **** Communities Stuff

// takes a list of communities that may contain communities that we are a member of and/or preview communities
Expand Down
1 change: 1 addition & 0 deletions frontend/openchat-client/src/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ export function createDirectChat(chatId: DirectChatIdentifier): void {
readByThemUpTo: undefined,
latestMessage: undefined,
latestEventIndex: 0,
lastUpdated: BigInt(Date.now()),
dateCreated: BigInt(Date.now()),
metrics: emptyChatMetrics(),
membership: {
Expand Down
29 changes: 13 additions & 16 deletions frontend/openchat-client/src/utils/cachePrimer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ChatEvent, ChatSummary, EventsResponse, IndexRange } from "openchat-shared";
import { ChatMap, compareChats, missingUserIds, userIdsFromEvents } from "openchat-shared";
import { ChatMap, compareChats, missingUserIds, userIdsFromEvents, chatIdentifierToString } from "openchat-shared";
import { Poller } from "./poller";
import { boolFromLS } from "../stores/localStorageSetting";
import { messagesRead } from "../stores/markRead";
Expand All @@ -16,19 +16,19 @@ export class CachePrimer {
debug("initialized");
}

processChatUpdates(previous: ChatSummary[], next: ChatSummary[]): void {
const record = ChatMap.fromList(previous);
const updated = next.filter(
(c) => !c.membership.archived && hasBeenUpdated(record.get(c.id), c)
);

if (updated.length > 0) {
for (const chat of updated) {
this.pending.set(chat.id, chat);
debug("enqueued " + chat.id);
async processChats(chats: ChatSummary[]): Promise<void> {
if (chats.length > 0) {
const lastUpdatedTimestamps = await this.api.getCachePrimerTimestamps();
for (const chat of chats) {
if (chat.membership.archived) continue;
const lastUpdated = lastUpdatedTimestamps[chatIdentifierToString(chat.id)];
if (lastUpdated === undefined || lastUpdated < chat.lastUpdated) {
this.pending.set(chat.id, chat);
debug("enqueued " + chat.id);
}
}

if (this.runner === undefined) {
if (this.pending.size > 0 && this.runner === undefined) {
this.runner = new Poller(() => runOnceIdle(() => this.processNext()), 0);
debug("runner started");
}
Expand Down Expand Up @@ -76,6 +76,7 @@ export class CachePrimer {
);
}
}
await this.api.setCachePrimerTimestamp(chatIdentifierToString(chat.id), chat.lastUpdated);
debug(chat.id + " completed");
} finally {
if (this.pending.size === 0) {
Expand Down Expand Up @@ -120,10 +121,6 @@ export class CachePrimer {
}
}

function hasBeenUpdated(previous: ChatSummary | undefined, next: ChatSummary): boolean {
return previous === undefined || next.latestEventIndex > previous.latestEventIndex;
}

function debug(message: string) {
if (boolFromLS("openchat_cache_primer_debug_enabled", false)) {
console.debug("CachePrimer - " + message);
Expand Down
Loading
Loading