Skip to content

Commit

Permalink
Consolidate markRead logic between main chat and threads (#4440)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Sep 26, 2023
1 parent c1e196d commit fef1ca8
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 103 deletions.
76 changes: 34 additions & 42 deletions frontend/openchat-client/src/openchat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,15 +771,15 @@ export class OpenChat extends OpenChatAgentWorker {
}

markThreadRead(chatId: ChatIdentifier, threadRootMessageIndex: number, readUpTo: number): void {
this.messagesRead.markThreadRead(chatId, threadRootMessageIndex, readUpTo);
this.messagesRead.markReadUpTo({ chatId, threadRootMessageIndex }, readUpTo);
}

markMessageRead(
chatId: ChatIdentifier,
messageIndex: number,
messageId: bigint | undefined,
): void {
this.messagesRead.markMessageRead(chatId, messageIndex, messageId);
this.messagesRead.markMessageRead({ chatId }, messageIndex, messageId);
}

markPinnedMessagesRead(chatId: ChatIdentifier, dateLastPinned: bigint): void {
Expand Down Expand Up @@ -1069,7 +1069,7 @@ export class OpenChat extends OpenChatAgentWorker {
this.loadChatDetails(c);
}
if (c.latestMessage) {
messagesRead.markReadUpTo(c.id, c.latestMessage.event.messageIndex);
messagesRead.markReadUpTo({ chatId: c.id }, c.latestMessage.event.messageIndex);
}
}),
);
Expand Down Expand Up @@ -2670,9 +2670,7 @@ export class OpenChat extends OpenChatAgentWorker {
}
const context = { chatId, threadRootMessageIndex };
unconfirmed.delete(context, messageId);
if (threadRootMessageIndex === undefined) {
messagesRead.removeUnconfirmedMessage(chatId, messageId);
}
messagesRead.removeUnconfirmedMessage(context, messageId);
}
toggleProposalFilterMessageExpansion = toggleProposalFilterMessageExpansion;
groupWhile = groupWhile;
Expand Down Expand Up @@ -2730,19 +2728,11 @@ export class OpenChat extends OpenChatAgentWorker {
if (event.event.kind === "message") {
failedMessagesStore.delete(context, event.event.messageId);
if (unconfirmed.delete(context, event.event.messageId)) {
if (threadRootMessageIndex === undefined) {
messagesRead.confirmMessage(
chatId,
event.event.messageIndex,
event.event.messageId,
);
} else {
messagesRead.markThreadRead(
chatId,
threadRootMessageIndex,
event.event.messageIndex,
);
}
messagesRead.confirmMessage(
context,
event.event.messageIndex,
event.event.messageId,
);
}
}
}
Expand All @@ -2761,31 +2751,13 @@ export class OpenChat extends OpenChatAgentWorker {
messageEvent: EventWrapper<Message>,
threadRootMessageIndex: number | undefined,
): Promise<void> {
const context = { chatId: clientChat.id, threadRootMessageIndex };

unconfirmed.add(context, messageEvent);
failedMessagesStore.delete(context, messageEvent.event.messageId);

rtcConnectionsManager.sendMessage([...chatStateStore.getProp(clientChat.id, "userIds")], {
kind: "remote_user_sent_message",
id: clientChat.id,
messageEvent: serialiseMessageForRtc(messageEvent),
userId: this.user.userId,
threadRootMessageIndex,
});

if (threadRootMessageIndex === undefined) {
// mark our own messages as read manually since we will not be observing them
messagesRead.markMessageRead(
clientChat.id,
messageEvent.event.messageIndex,
messageEvent.event.messageId,
);

currentChatDraftMessage.clear(clientChat.id);
}

return;
}

deleteFailedMessage(
Expand Down Expand Up @@ -3121,7 +3093,7 @@ export class OpenChat extends OpenChatAgentWorker {

private postSendMessage(
chat: ChatSummary,
event: EventWrapper<Message>,
messageEvent: EventWrapper<Message>,
threadRootMessageIndex: number | undefined,
) {
if (threadRootMessageIndex !== undefined) {
Expand All @@ -3133,9 +3105,29 @@ export class OpenChat extends OpenChatAgentWorker {
// HACK - we need to defer this very slightly so that we can guarantee that we handle SendingMessage events
// *before* the new message is added to the unconfirmed store. Is this nice? No it is not.
window.setTimeout(() => {
this.sendMessageWebRtc(chat, event, threadRootMessageIndex).then(() => {
const context = { chatId: chat.id, threadRootMessageIndex };

unconfirmed.add(context, messageEvent);
failedMessagesStore.delete(context, messageEvent.event.messageId);

// mark our own messages as read manually since we will not be observing them
messagesRead.markMessageRead(
context,
messageEvent.event.messageIndex,
messageEvent.event.messageId,
);
// Mark all existing messages as read
if (messageEvent.event.messageIndex > 0) {
messagesRead.markReadUpTo(context, messageEvent.event.messageIndex - 1);
}

if (threadRootMessageIndex === undefined) {
currentChatDraftMessage.clear(chat.id);
}

this.sendMessageWebRtc(chat, messageEvent, threadRootMessageIndex).then(() => {
if (threadRootMessageIndex !== undefined) {
this.dispatchEvent(new SentThreadMessage(event));
this.dispatchEvent(new SentThreadMessage(messageEvent));
} else {
this.dispatchEvent(new SentMessage());
}
Expand Down Expand Up @@ -4503,7 +4495,7 @@ export class OpenChat extends OpenChatAgentWorker {
(chat.membership?.readByMeUpTo ?? -1) < latestMessage.messageIndex &&
!unconfirmed.contains({ chatId: chat.id }, latestMessage.messageId)
) {
messagesRead.markReadUpTo(chat.id, latestMessage.messageIndex);
messagesRead.markReadUpTo({ chatId: chat.id }, latestMessage.messageIndex);
}
}

Expand Down Expand Up @@ -5005,7 +4997,7 @@ export class OpenChat extends OpenChatAgentWorker {
messagesRead.batchUpdate(() => {
resp.community.channels.forEach((c) => {
if (c.latestMessage) {
messagesRead.markReadUpTo(c.id, c.latestMessage.event.messageIndex);
messagesRead.markReadUpTo({ chatId: c.id }, c.latestMessage.event.messageIndex);
}
});
});
Expand Down
46 changes: 23 additions & 23 deletions frontend/openchat-client/src/stores/markRead.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ describe("mark messages read", () => {
jest.useFakeTimers();
const mockedUnconfirmed = new MessageContextMap<UnconfirmedState>();
unconfirmed.clear(mockedUnconfirmed);
if (markRead.waiting.get(abcId) !== undefined) {
markRead.waiting.get(abcId)?.clear();
if (markRead.waiting.get({ chatId: abcId }) !== undefined) {
markRead.waiting.get({ chatId: abcId })?.clear();
}
markRead.state.set(abcId, new MessagesRead());
markRead.serverState.set(abcId, new MessagesRead());
Expand All @@ -47,24 +47,24 @@ describe("mark messages read", () => {

test("mark unconfirmed message as read", () => {
unconfirmed.add({ chatId: abcId }, createDummyMessage(BigInt(100)));
markRead.markMessageRead(abcId, 200, BigInt(100));
expect(markRead.waiting.get(abcId)?.has(BigInt(100))).toBe(true);
markRead.markMessageRead({ chatId: abcId }, 200, BigInt(100));
expect(markRead.waiting.get({ chatId: abcId })?.has(BigInt(100))).toBe(true);
});

test("mark confirmed message as read", () => {
const mr = new MessagesRead();
mr.readUpTo = 199;
markRead.state.set(abcId, mr);
markRead.markMessageRead(abcId, 200, BigInt(500));
expect(markRead.waiting.get(abcId)?.has(BigInt(500))).toBe(false);
markRead.markMessageRead({ chatId: abcId }, 200, BigInt(500));
expect(markRead.waiting.get({ chatId: abcId })?.has(BigInt(500))).toBe(false);
expect(markRead.state.get(abcId)?.readUpTo).toBe(200);
});

test("confirm message", () => {
markRead.waiting.get(abcId)?.set(BigInt(100), 100);
markRead.markMessageRead(abcId, 200, BigInt(100));
markRead.confirmMessage(abcId, 200, BigInt(100));
expect(markRead.waiting.get(abcId)?.has(BigInt(100))).toBe(false);
markRead.waiting.get({ chatId: abcId })?.set(BigInt(100), 100);
markRead.markMessageRead({ chatId: abcId }, 200, BigInt(100));
markRead.confirmMessage({ chatId: abcId }, 200, BigInt(100));
expect(markRead.waiting.get({ chatId: abcId })?.has(BigInt(100))).toBe(false);
expect(markRead.state.get(abcId)?.readUpTo).toBe(200);
});

Expand Down Expand Up @@ -120,7 +120,7 @@ describe("mark messages read", () => {
[{ threadRootMessageIndex: 1, readUpTo: 3 }],
undefined
);
markRead.markThreadRead(abcId, 1, 5);
markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 5);
const unread = markRead.unreadThreadMessageCount(abcId, 1, 5);
expect(unread).toEqual(0);
});
Expand All @@ -131,7 +131,7 @@ describe("mark messages read", () => {
[{ threadRootMessageIndex: 1, readUpTo: 3 }],
undefined
);
markRead.markThreadRead(abcId, 1, 5);
markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 5);
const unread = markRead.unreadThreadMessageCount(abcId, 1, 7);
expect(unread).toEqual(2);
});
Expand Down Expand Up @@ -174,7 +174,7 @@ describe("mark messages read", () => {
],
undefined
);
markRead.markThreadRead(abcId, 1, 2);
markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 2);
const count = markRead.staleThreadCountForChat(abcId, threadSyncs);
expect(count).toEqual(1);
});
Expand All @@ -188,7 +188,7 @@ describe("mark messages read", () => {
],
undefined
);
markRead.markThreadRead(abcId, 1, 3);
markRead.markReadUpTo({ chatId: abcId, threadRootMessageIndex: 1 }, 3);
const count = markRead.staleThreadCountForChat(abcId, threadSyncs);
expect(count).toEqual(0);
});
Expand All @@ -198,10 +198,10 @@ describe("mark messages read", () => {
describe("unread message count", () => {
describe("when all messages are confirmed", () => {
test("with no latest message + waiting local messages", () => {
markRead.waiting.set(abcId, new Map<bigint, number>());
markRead.waiting.get(abcId)?.set(BigInt(0), 0);
markRead.waiting.get(abcId)?.set(BigInt(1), 1);
markRead.waiting.get(abcId)?.set(BigInt(2), 2);
markRead.waiting.set({ chatId: abcId }, new Map<bigint, number>());
markRead.waiting.get({ chatId: abcId })?.set(BigInt(0), 0);
markRead.waiting.get({ chatId: abcId })?.set(BigInt(1), 1);
markRead.waiting.get({ chatId: abcId })?.set(BigInt(2), 2);
expect(markRead.unreadMessageCount(abcId, undefined)).toEqual(0);
});
test("with no latest message", () => {
Expand Down Expand Up @@ -243,9 +243,9 @@ describe("mark messages read", () => {
});
describe("when some messages are unconfirmed", () => {
test("with multiple gaps", () => {
markRead.waiting.get(abcId)?.set(BigInt(1), 11);
markRead.waiting.get(abcId)?.set(BigInt(2), 12);
markRead.waiting.get(abcId)?.set(BigInt(3), 13);
markRead.waiting.get({ chatId: abcId })?.set(BigInt(1), 11);
markRead.waiting.get({ chatId: abcId })?.set(BigInt(2), 12);
markRead.waiting.get({ chatId: abcId })?.set(BigInt(3), 13);
const mr = new MessagesRead();
mr.readUpTo = 10;
markRead.serverState.set(abcId, mr);
Expand All @@ -256,7 +256,7 @@ describe("mark messages read", () => {

describe("getting first unread message index", () => {
test("where we have read everything", () => {
markRead.markReadUpTo(abcId, 100);
markRead.markReadUpTo({ chatId: abcId }, 100);
expect(markRead.getFirstUnreadMessageIndex(abcId, 100)).toEqual(undefined);
});
test("where we have no messages", () => {
Expand All @@ -266,7 +266,7 @@ describe("mark messages read", () => {
expect(markRead.getFirstUnreadMessageIndex(abcId, 100)).toEqual(0);
});
test("where we have read some messages", () => {
markRead.markReadUpTo(abcId, 80);
markRead.markReadUpTo({ chatId: abcId }, 80);
expect(markRead.getFirstUnreadMessageIndex(abcId, 100)).toEqual(81);
});
});
Expand Down
Loading

0 comments on commit fef1ca8

Please sign in to comment.