diff --git a/frontend/openchat-client/src/openchat.ts b/frontend/openchat-client/src/openchat.ts index 70abe67768..d48e1870b0 100644 --- a/frontend/openchat-client/src/openchat.ts +++ b/frontend/openchat-client/src/openchat.ts @@ -771,7 +771,7 @@ export class OpenChat extends OpenChatAgentWorker { } markThreadRead(chatId: ChatIdentifier, threadRootMessageIndex: number, readUpTo: number): void { - this.messagesRead.markThreadRead(chatId, threadRootMessageIndex, readUpTo); + this.messagesRead.markReadUpTo({ chatId, threadRootMessageIndex }, readUpTo); } markMessageRead( @@ -779,7 +779,7 @@ export class OpenChat extends OpenChatAgentWorker { messageIndex: number, messageId: bigint | undefined, ): void { - this.messagesRead.markMessageRead(chatId, messageIndex, messageId); + this.messagesRead.markMessageRead({ chatId }, messageIndex, messageId); } markPinnedMessagesRead(chatId: ChatIdentifier, dateLastPinned: bigint): void { @@ -1069,7 +1069,7 @@ export class OpenChat extends OpenChatAgentWorker { this.loadChatDetails(c); } if (c.latestMessage) { - messagesRead.markReadUpTo(c.id, c.latestMessage.event.messageIndex); + messagesRead.markReadUpTo({ chatId: c.id }, c.latestMessage.event.messageIndex); } }), ); @@ -2670,9 +2670,7 @@ export class OpenChat extends OpenChatAgentWorker { } const context = { chatId, threadRootMessageIndex }; unconfirmed.delete(context, messageId); - if (threadRootMessageIndex === undefined) { - messagesRead.removeUnconfirmedMessage(chatId, messageId); - } + messagesRead.removeUnconfirmedMessage(context, messageId); } toggleProposalFilterMessageExpansion = toggleProposalFilterMessageExpansion; groupWhile = groupWhile; @@ -2730,19 +2728,11 @@ export class OpenChat extends OpenChatAgentWorker { if (event.event.kind === "message") { failedMessagesStore.delete(context, event.event.messageId); if (unconfirmed.delete(context, event.event.messageId)) { - if (threadRootMessageIndex === undefined) { - messagesRead.confirmMessage( - chatId, - event.event.messageIndex, - event.event.messageId, - ); - } else { - messagesRead.markThreadRead( - chatId, - threadRootMessageIndex, - event.event.messageIndex, - ); - } + messagesRead.confirmMessage( + context, + event.event.messageIndex, + event.event.messageId, + ); } } } @@ -2761,11 +2751,6 @@ export class OpenChat extends OpenChatAgentWorker { messageEvent: EventWrapper, threadRootMessageIndex: number | undefined, ): Promise { - const context = { chatId: clientChat.id, threadRootMessageIndex }; - - unconfirmed.add(context, messageEvent); - failedMessagesStore.delete(context, messageEvent.event.messageId); - rtcConnectionsManager.sendMessage([...chatStateStore.getProp(clientChat.id, "userIds")], { kind: "remote_user_sent_message", id: clientChat.id, @@ -2773,19 +2758,6 @@ export class OpenChat extends OpenChatAgentWorker { userId: this.user.userId, threadRootMessageIndex, }); - - if (threadRootMessageIndex === undefined) { - // mark our own messages as read manually since we will not be observing them - messagesRead.markMessageRead( - clientChat.id, - messageEvent.event.messageIndex, - messageEvent.event.messageId, - ); - - currentChatDraftMessage.clear(clientChat.id); - } - - return; } deleteFailedMessage( @@ -3121,7 +3093,7 @@ export class OpenChat extends OpenChatAgentWorker { private postSendMessage( chat: ChatSummary, - event: EventWrapper, + messageEvent: EventWrapper, threadRootMessageIndex: number | undefined, ) { if (threadRootMessageIndex !== undefined) { @@ -3133,9 +3105,29 @@ export class OpenChat extends OpenChatAgentWorker { // HACK - we need to defer this very slightly so that we can guarantee that we handle SendingMessage events // *before* the new message is added to the unconfirmed store. Is this nice? No it is not. window.setTimeout(() => { - this.sendMessageWebRtc(chat, event, threadRootMessageIndex).then(() => { + const context = { chatId: chat.id, threadRootMessageIndex }; + + unconfirmed.add(context, messageEvent); + failedMessagesStore.delete(context, messageEvent.event.messageId); + + // mark our own messages as read manually since we will not be observing them + messagesRead.markMessageRead( + context, + messageEvent.event.messageIndex, + messageEvent.event.messageId, + ); + // Mark all existing messages as read + if (messageEvent.event.messageIndex > 0) { + messagesRead.markReadUpTo(context, messageEvent.event.messageIndex - 1); + } + + if (threadRootMessageIndex === undefined) { + currentChatDraftMessage.clear(chat.id); + } + + this.sendMessageWebRtc(chat, messageEvent, threadRootMessageIndex).then(() => { if (threadRootMessageIndex !== undefined) { - this.dispatchEvent(new SentThreadMessage(event)); + this.dispatchEvent(new SentThreadMessage(messageEvent)); } else { this.dispatchEvent(new SentMessage()); } @@ -4503,7 +4495,7 @@ export class OpenChat extends OpenChatAgentWorker { (chat.membership?.readByMeUpTo ?? -1) < latestMessage.messageIndex && !unconfirmed.contains({ chatId: chat.id }, latestMessage.messageId) ) { - messagesRead.markReadUpTo(chat.id, latestMessage.messageIndex); + messagesRead.markReadUpTo({ chatId: chat.id }, latestMessage.messageIndex); } } @@ -5005,7 +4997,7 @@ export class OpenChat extends OpenChatAgentWorker { messagesRead.batchUpdate(() => { resp.community.channels.forEach((c) => { if (c.latestMessage) { - messagesRead.markReadUpTo(c.id, c.latestMessage.event.messageIndex); + messagesRead.markReadUpTo({ chatId: c.id }, c.latestMessage.event.messageIndex); } }); }); diff --git a/frontend/openchat-client/src/stores/markRead.spec.ts b/frontend/openchat-client/src/stores/markRead.spec.ts index e4eeddb3df..e256473ca7 100644 --- a/frontend/openchat-client/src/stores/markRead.spec.ts +++ b/frontend/openchat-client/src/stores/markRead.spec.ts @@ -37,8 +37,8 @@ describe("mark messages read", () => { jest.useFakeTimers(); const mockedUnconfirmed = new MessageContextMap(); unconfirmed.clear(mockedUnconfirmed); - if (markRead.waiting.get(abcId) !== undefined) { - markRead.waiting.get(abcId)?.clear(); + if (markRead.waiting.get({ chatId: abcId }) !== undefined) { + markRead.waiting.get({ chatId: abcId })?.clear(); } markRead.state.set(abcId, new MessagesRead()); markRead.serverState.set(abcId, new MessagesRead()); @@ -47,24 +47,24 @@ describe("mark messages read", () => { test("mark unconfirmed message as read", () => { unconfirmed.add({ chatId: abcId }, createDummyMessage(BigInt(100))); - markRead.markMessageRead(abcId, 200, BigInt(100)); - expect(markRead.waiting.get(abcId)?.has(BigInt(100))).toBe(true); + markRead.markMessageRead({ chatId: abcId }, 200, BigInt(100)); + expect(markRead.waiting.get({ chatId: abcId })?.has(BigInt(100))).toBe(true); }); test("mark confirmed message as read", () => { const mr = new MessagesRead(); mr.readUpTo = 199; markRead.state.set(abcId, mr); - markRead.markMessageRead(abcId, 200, BigInt(500)); - expect(markRead.waiting.get(abcId)?.has(BigInt(500))).toBe(false); + markRead.markMessageRead({ chatId: abcId }, 200, BigInt(500)); + expect(markRead.waiting.get({ chatId: abcId })?.has(BigInt(500))).toBe(false); expect(markRead.state.get(abcId)?.readUpTo).toBe(200); }); test("confirm message", () => { - markRead.waiting.get(abcId)?.set(BigInt(100), 100); - markRead.markMessageRead(abcId, 200, BigInt(100)); - markRead.confirmMessage(abcId, 200, BigInt(100)); - expect(markRead.waiting.get(abcId)?.has(BigInt(100))).toBe(false); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(100), 100); + markRead.markMessageRead({ chatId: abcId }, 200, BigInt(100)); + markRead.confirmMessage({ chatId: abcId }, 200, BigInt(100)); + expect(markRead.waiting.get({ chatId: abcId })?.has(BigInt(100))).toBe(false); expect(markRead.state.get(abcId)?.readUpTo).toBe(200); }); @@ -120,7 +120,7 @@ describe("mark messages read", () => { [{ threadRootMessageIndex: 1, readUpTo: 3 }], undefined ); - markRead.markThreadRead(abcId, 1, 5); + markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 5); const unread = markRead.unreadThreadMessageCount(abcId, 1, 5); expect(unread).toEqual(0); }); @@ -131,7 +131,7 @@ describe("mark messages read", () => { [{ threadRootMessageIndex: 1, readUpTo: 3 }], undefined ); - markRead.markThreadRead(abcId, 1, 5); + markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 5); const unread = markRead.unreadThreadMessageCount(abcId, 1, 7); expect(unread).toEqual(2); }); @@ -174,7 +174,7 @@ describe("mark messages read", () => { ], undefined ); - markRead.markThreadRead(abcId, 1, 2); + markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 2); const count = markRead.staleThreadCountForChat(abcId, threadSyncs); expect(count).toEqual(1); }); @@ -188,7 +188,7 @@ describe("mark messages read", () => { ], undefined ); - markRead.markThreadRead(abcId, 1, 3); + markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 3); const count = markRead.staleThreadCountForChat(abcId, threadSyncs); expect(count).toEqual(0); }); @@ -198,10 +198,10 @@ describe("mark messages read", () => { describe("unread message count", () => { describe("when all messages are confirmed", () => { test("with no latest message + waiting local messages", () => { - markRead.waiting.set(abcId, new Map()); - markRead.waiting.get(abcId)?.set(BigInt(0), 0); - markRead.waiting.get(abcId)?.set(BigInt(1), 1); - markRead.waiting.get(abcId)?.set(BigInt(2), 2); + markRead.waiting.set({ chatId: abcId }, new Map()); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(0), 0); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(1), 1); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(2), 2); expect(markRead.unreadMessageCount(abcId, undefined)).toEqual(0); }); test("with no latest message", () => { @@ -243,9 +243,9 @@ describe("mark messages read", () => { }); describe("when some messages are unconfirmed", () => { test("with multiple gaps", () => { - markRead.waiting.get(abcId)?.set(BigInt(1), 11); - markRead.waiting.get(abcId)?.set(BigInt(2), 12); - markRead.waiting.get(abcId)?.set(BigInt(3), 13); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(1), 11); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(2), 12); + markRead.waiting.get({ chatId: abcId })?.set(BigInt(3), 13); const mr = new MessagesRead(); mr.readUpTo = 10; markRead.serverState.set(abcId, mr); @@ -256,7 +256,7 @@ describe("mark messages read", () => { describe("getting first unread message index", () => { test("where we have read everything", () => { - markRead.markReadUpTo(abcId, 100); + markRead.markReadUpTo({ chatId: abcId }, 100); expect(markRead.getFirstUnreadMessageIndex(abcId, 100)).toEqual(undefined); }); test("where we have no messages", () => { @@ -266,7 +266,7 @@ describe("mark messages read", () => { expect(markRead.getFirstUnreadMessageIndex(abcId, 100)).toEqual(0); }); test("where we have read some messages", () => { - markRead.markReadUpTo(abcId, 80); + markRead.markReadUpTo({ chatId: abcId }, 80); expect(markRead.getFirstUnreadMessageIndex(abcId, 100)).toEqual(81); }); }); diff --git a/frontend/openchat-client/src/stores/markRead.ts b/frontend/openchat-client/src/stores/markRead.ts index 599481c4b0..352b28abbf 100644 --- a/frontend/openchat-client/src/stores/markRead.ts +++ b/frontend/openchat-client/src/stores/markRead.ts @@ -1,12 +1,14 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ import type { Subscriber, Unsubscriber } from "svelte/store"; import { - type ChatIdentifier, ChatMap, + MessageContextMap, + type ChatIdentifier, type ChatSummary, type MarkReadRequest, type MarkReadResponse, type Mention, + type MessageContext, type ThreadRead, type ThreadSyncDetails, } from "openchat-shared"; @@ -70,7 +72,7 @@ type MessagesReadByChat = ChatMap; export type MessageReadState = { serverState: MessagesReadByChat; - waiting: ChatMap>; + waiting: MessageContextMap>; state: MessagesReadByChat; }; @@ -82,7 +84,7 @@ export class MessageReadTracker { * The waiting structure is either keyed on chatId for normal chat messages or * of chatId_threadRootMessageIndex for thread messages */ - public waiting: ChatMap> = new ChatMap>(); // The map is messageId -> (unconfirmed) messageIndex + public waiting: MessageContextMap> = new MessageContextMap>(); // The map is messageId -> (unconfirmed) messageIndex public state: MessagesReadByChat = new ChatMap(); private subscribers: Subscriber[] = []; private executingBatch = false; @@ -166,34 +168,37 @@ export class MessageReadTracker { return this.state.get(chatId)!; } - markThreadRead(chatId: ChatIdentifier, threadRootMessageIndex: number, readUpTo: number): void { - this.stateForId(chatId).updateThread(threadRootMessageIndex, readUpTo); - this.publish(); - } - markMessageRead( - chatId: ChatIdentifier, + context: MessageContext, messageIndex: number, messageId: bigint | undefined ): void { - if (!this.state.has(chatId)) { - this.state.set(chatId, new MessagesRead()); + const chatState = this.stateForId(context.chatId) + if (!this.state.has(context.chatId)) { + this.state.set(context.chatId, new MessagesRead()); } - if (messageId !== undefined && unconfirmed.contains({ chatId }, messageId)) { + if (messageId !== undefined && unconfirmed.contains(context, messageId)) { // if a message is unconfirmed we will just tuck it away until we are told it has been confirmed - if (!this.waiting.has(chatId)) { - this.waiting.set(chatId, new Map()); + if (!this.waiting.has(context)) { + this.waiting.set(context, new Map()); } - this.waiting.get(chatId)?.set(messageId, messageIndex); - this.publish(); + this.waiting.get(context)?.set(messageId, messageIndex); + } else if (context.threadRootMessageIndex !== undefined) { + chatState.updateThread(context.threadRootMessageIndex, messageIndex); } else { // Mark the chat as read up to the new messageIndex - this.markReadUpTo(chatId, messageIndex); + chatState.markReadUpTo(messageIndex); } + this.publish(); } - markReadUpTo(chatId: ChatIdentifier, to: number): void { - this.stateForId(chatId).markReadUpTo(to); + markReadUpTo(context: MessageContext, to: number): void { + const chatState = this.stateForId(context.chatId); + if (context.threadRootMessageIndex !== undefined) { + chatState.updateThread(context.threadRootMessageIndex, to); + } else { + chatState.markReadUpTo(to); + } this.publish(); } @@ -202,19 +207,19 @@ export class MessageReadTracker { this.publish(); } - confirmMessage(chatId: ChatIdentifier, messageIndex: number, messageId: bigint): boolean { + confirmMessage(context: MessageContext, messageIndex: number, messageId: bigint): boolean { // this is called when a message is confirmed so that we can move it from // the unconfirmed read to the confirmed read. This means that it will get // marked as read on the back end - if (this.removeUnconfirmedMessage(chatId, messageId)) { - this.markMessageRead(chatId, messageIndex, messageId); + if (this.removeUnconfirmedMessage(context, messageId)) { + this.markMessageRead(context, messageIndex, messageId); return true; } return false; } - removeUnconfirmedMessage(chatId: ChatIdentifier, messageId: bigint): boolean { - return this.waiting.get(chatId)?.delete(messageId) ?? false; + removeUnconfirmedMessage(context: MessageContext, messageId: bigint): boolean { + return this.waiting.get(context)?.delete(messageId) ?? false; } staleThreadCountForChat(chatId: ChatIdentifier, threads: ThreadSyncDetails[]): number { @@ -229,15 +234,11 @@ export class MessageReadTracker { } threadReadUpTo(chatId: ChatIdentifier, threadRootMessageIndex: number): number { - const local = this.state.get(chatId)?.threads[threadRootMessageIndex]; - const server = this.serverState.get(chatId)?.threads[threadRootMessageIndex]; - if (server === undefined) { - return local ?? -1; - } - if (local === undefined) { - return server ?? -1; - } - return Math.max(local, server); + const local = this.state.get(chatId)?.threads[threadRootMessageIndex] ?? -1; + const server = this.serverState.get(chatId)?.threads[threadRootMessageIndex] ?? -1; + const unconfirmedReadCount = this.waiting.get({ chatId, threadRootMessageIndex })?.size ?? 0; + + return Math.max(local + unconfirmedReadCount, server); } staleThreadsCount(threads: ChatMap): number { @@ -252,7 +253,7 @@ export class MessageReadTracker { threadRootMessageIndex: number, latestMessageIndex: number ): number { - return latestMessageIndex - this.threadReadUpTo(chatId, threadRootMessageIndex); + return Math.max(latestMessageIndex - this.threadReadUpTo(chatId, threadRootMessageIndex), 0); } unreadMessageCount(chatId: ChatIdentifier, latestMessageIndex: number | undefined): number { @@ -265,7 +266,7 @@ export class MessageReadTracker { const readUpToLocal = this.state.get(chatId)?.readUpTo; const readUpToConfirmed = Math.max(readUpToServer ?? -1, readUpToLocal ?? -1); - const readUnconfirmedCount = this.waiting.get(chatId)?.size ?? 0; + const readUnconfirmedCount = this.waiting.get({ chatId })?.size ?? 0; const total = latestMessageIndex - readUpToConfirmed - readUnconfirmedCount; return Math.max(total, 0); @@ -291,7 +292,7 @@ export class MessageReadTracker { const readUpToConfirmed = Math.max(readUpToServer ?? -1, readUpToLocal ?? -1); if (readUpToConfirmed < latestMessageIndex) { - const readUnconfirmed = this.waiting.get(chatId) ?? new Map(); + const readUnconfirmed = this.waiting.get({ chatId }) ?? new Map(); const unconfirmedMessageIndexes = [...readUnconfirmed.values()]; for (let i = readUpToConfirmed + 1; i <= latestMessageIndex; i++) { @@ -312,7 +313,7 @@ export class MessageReadTracker { markAllRead(chat: ChatSummary) { const latestMessageIndex = chat.latestMessage?.event.messageIndex; if (latestMessageIndex !== undefined) { - this.markReadUpTo(chat.id, latestMessageIndex); + this.markReadUpTo({ chatId: chat.id }, latestMessageIndex); } } @@ -374,7 +375,7 @@ export class MessageReadTracker { isRead(chatId: ChatIdentifier, messageIndex: number, messageId: bigint | undefined): boolean { if (messageId !== undefined && unconfirmed.contains({ chatId }, messageId)) { - return this.waiting.get(chatId)?.has(messageId) ?? false; + return this.waiting.get({ chatId })?.has(messageId) ?? false; } else { const serverState = this.serverState.get(chatId); if (serverState?.readUpTo !== undefined && serverState.readUpTo >= messageIndex)