Skip to content

Commit

Permalink
Server sent events for message inbox #14
Browse files Browse the repository at this point in the history
  • Loading branch information
caolan committed Jul 29, 2024
1 parent 11425ba commit 09891de
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 131 deletions.
2 changes: 1 addition & 1 deletion examples/chat/components/send-message-form.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export default class ChatSendMessageForm extends HTMLElement {
ev.preventDefault();
if (selected_announcement.value && local_peer_id.value && local_app_uuid.value) {
const message = this.input.value;
await fetch("/_api/v1/message_send", {
await fetch("/_api/v1/messages/outbox", {
method: "POST",
body: JSON.stringify({
peer: selected_announcement.value.peer,
Expand Down
64 changes: 38 additions & 26 deletions examples/chat/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ async function announce(peers) {
nick: state.nick.value,
};
for (const peer of peers) {
await fetch("/_api/v1/announcements", {
await fetch("/_api/v1/announcements/outbox", {
method: 'POST',
body: JSON.stringify({peer, data}),
});
}
}

async function fetchAnnouncements() {
const res = await fetch("/_api/v1/announcements");
const res = await fetch("/_api/v1/announcements/inbox");
const data = /** @type {import("../../lib/client.ts").AppAnnouncement[]} */(await res.json());
// Only list announcements for current app from peers in current discovered list
return data.filter(x => {
Expand All @@ -54,26 +54,33 @@ async function fetchAnnouncements() {
});
}

async function getMessages() {
while (true) {
const res = await fetch("/_api/v1/message_read");
const data = await res.json();
if (data && state.local_peer_id.value && state.local_app_uuid.value) {
const from = {
peer: data.peer,
app_uuid: data.uuid,
};
const to = {
peer: state.local_peer_id.value,
app_uuid: state.local_app_uuid.value,
};
state.appendMessage(from, to, data.message);
await fetch("/_api/v1/message_next", {method: "POST"});
} else {
// Check again in 1 second
setTimeout(getMessages, 1000);
return;
}
/** @param {import("../../lib/client.ts").MessageJson} message */
async function receiveMessage(message) {
if (state.local_peer_id.value && state.local_app_uuid.value) {
const from = {
peer: message.peer,
app_uuid: message.uuid,
};
const to = {
peer: state.local_peer_id.value,
app_uuid: state.local_app_uuid.value,
};
state.appendMessage(from, to, message.message);
// Delete seen messages
await fetch("/_api/v1/messages/inbox", {
method: "DELETE",
body: JSON.stringify({
message_id: message.id
}),
});
}
}

async function fetchMessages() {
const res = await fetch("/_api/v1/messages/inbox");
const data = await res.json();
for (const message of data) {
await receiveMessage(message);
}
}

Expand All @@ -82,16 +89,14 @@ await updateLocalPeerId();
await updateLocalAppInstance();
state.announcements.value = await fetchAnnouncements();
state.peers.value = await fetchPeers();
await fetchMessages();

// Announce app to newly discovered peers
announce(state.peers.value);

// Announce to all peers when nick changes
watch([state.nick], () => announce(state.peers.value));

// Start polling for messages
getMessages();

// Ask user for nickname
askNick();

Expand All @@ -108,7 +113,7 @@ peer_events.addEventListener("PeerExpired", peer_id => {
state.peers.signal();
});

const announcement_events = new EventSource("/_api/v1/announcements/events");
const announcement_events = new EventSource("/_api/v1/announcements/inbox/events");
announcement_events.addEventListener("AppAnnouncement", event => {
const announcement = JSON.parse(event.data);
console.log('App announced', announcement);
Expand All @@ -119,3 +124,10 @@ announcement_events.addEventListener("AppAnnouncement", event => {
return x;
});
});

const inbox_events = new EventSource("/_api/v1/messages/inbox/events");
inbox_events.addEventListener("Message", event => {
const message = JSON.parse(event.data);
console.log('Message received', message);
receiveMessage(/** @type {import("../../lib/client.ts").MessageJson} */(message));
});
53 changes: 31 additions & 22 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,23 @@ interface JsonArray extends Array<JsonValue> { }
type JsonValue = (null | boolean | number | string | JsonObject | JsonArray);

export type Message = {
type: "Message",
id: number,
peer: string,
uuid: string,
message: Uint8Array,
};

export type MessageJson = {
type: "Message",
id: number,
peer: string,
uuid: string,
message: string,
};

export type AppAnnouncement = {
type: "AppAnnouncement",
peer: string,
app_uuid: string,
data: JsonValue,
Expand All @@ -76,16 +86,17 @@ export type MutinyRequestBody = {type: "LocalPeerId"}
| {type: "Announce", peer: string, app_uuid: string, data: JsonValue}
| {type: "AppAnnouncements"}
| {
type: "MessageSend",
type: "SendMessage",
peer: string,
app_uuid: string,
from_app_uuid: string,
message: Uint8Array,
}
| {type: "MessageRead", app_uuid: string}
| {type: "MessageNext", app_uuid: string}
| {type: "InboxMessages", app_uuid: string}
| {type: "DeleteInboxMessage", app_uuid: string, message_id: number}
| {type: "SubscribePeerEvents"}
| {type: "SubscribeAnnounceEvents"}
| {type: "SubscribeInboxEvents", app_uuid: string}
;

export type MutinyResponse = {
Expand All @@ -96,20 +107,14 @@ export type MutinyResponse = {
export type PeerEvent = {type: "PeerDiscovered", peer_id: string}
| {type: "PeerExpired", peer_id: string};

export type AnnounceEvent = {
type: "AppAnnouncement",
peer: string,
app_uuid: string,
data: JsonValue,
};

export type MutinyResponseBody = {type: "Success"}
| {type: "Error", message: string}
| {type: "LocalPeerId", peer_id: string}
| {type: "Peers", peers: string[]}
| {type: "AppInstanceUuid", uuid: string | null}
| {type: "CreateAppInstance", uuid: string}
| {type: "Message", message: null | Message}
| {type: "Message", message: Message}
| {type: "InboxMessages", messages: Message[]}
| {type: "AppAnnouncements", announcements: AppAnnouncement[]}
| PeerEvent
;
Expand Down Expand Up @@ -235,13 +240,11 @@ export class MutinyClient {
return this;
},
return(value?: PeerEvent) {
console.log('peerEvents iterator return()');
// Remove waiting promise
waiting.delete(request.id);
return Promise.resolve({value, done: true});
},
next: async () => {
console.log('peerEvents iterator next()');
const value = await promise;
// Register next promise
promise = new Promise((resolve, reject) => {
Expand All @@ -260,12 +263,18 @@ export class MutinyClient {
return this._subscribe(request);
}

announceEvents(): AsyncIterableIterator<AnnounceEvent> {
announceEvents(): AsyncIterableIterator<AppAnnouncement> {
const body: MutinyRequestBody = {type: "SubscribeAnnounceEvents"};
const request = {id: this.next_request_id++, body};
return this._subscribe(request);
}

inboxEvents(app_uuid: string): AsyncIterableIterator<Message> {
const body: MutinyRequestBody = {type: "SubscribeInboxEvents", app_uuid};
const request = {id: this.next_request_id++, body};
return this._subscribe(request);
}

async appInstanceUuid(label: string): Promise<string | null> {
const response = await this.requestOne({type: "AppInstanceUuid", label});
assert(response.type === 'AppInstanceUuid');
Expand All @@ -290,14 +299,14 @@ export class MutinyClient {
return response.announcements;
}

async messageSend(
async sendMessage(
peer: string,
app_uuid: string,
from_app_uuid: string,
message: Uint8Array
): Promise<void> {
const response = await this.requestOne({
type: "MessageSend",
type: "SendMessage",
peer,
app_uuid,
from_app_uuid,
Expand All @@ -307,14 +316,14 @@ export class MutinyClient {
return;
}

async messageRead(app_uuid: string): Promise<Message | null> {
const response = await this.requestOne({type: "MessageRead", app_uuid});
assert(response.type === 'Message');
return response.message;
async inboxMessages(app_uuid: string): Promise<Message[]> {
const response = await this.requestOne({type: "InboxMessages", app_uuid});
assert(response.type === 'InboxMessages');
return response.messages;
}

async messageNext(app_uuid: string): Promise<void> {
const response = await this.requestOne({type: "MessageNext", app_uuid});
async deleteInboxMessage(app_uuid: string, message_id: number): Promise<void> {
const response = await this.requestOne({type: "DeleteInboxMessage", app_uuid, message_id});
assert(response.type === 'Success');
return;
}
Expand Down
59 changes: 34 additions & 25 deletions mutiny-app/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,45 +58,54 @@ export class Server {
return eventStream(this.client.peerEvents(), event => {
return [event.type, event.peer_id];
});
} else if (request.method === 'POST' && pathname === '/_api/v1/message_send') {
} else if (request.method === 'POST' && pathname === '/_api/v1/announcements/outbox') {
const body = await request.json();
const message = new TextEncoder().encode(body.message);
return new Response(JSON.stringify(await this.client.messageSend(
await this.client.announce(
body.peer,
body.app_uuid,
this.app.uuid,
message,
)));
} else if (pathname === '/_api/v1/announcements') {
if (request.method === 'POST') {
const body = await request.json();
await this.client.announce(
body.peer,
this.app.uuid,
body.data,
);
return new Response(JSON.stringify({success: true}));
} else {
return new Response(JSON.stringify(await this.client.announcements()));
}
} else if (pathname === '/_api/v1/announcements/events') {
body.data,
);
return new Response(JSON.stringify({success: true}));
} else if (pathname === '/_api/v1/announcements/inbox') {
return new Response(JSON.stringify(await this.client.announcements()));
} else if (pathname === '/_api/v1/announcements/inbox/events') {
return eventStream(this.client.announceEvents(), event => {
return [event.type, JSON.stringify({
peer: event.peer,
app_uuid: event.app_uuid,
data: event.data,
})];
});
} else if (pathname === '/_api/v1/message_read') {
const m = await this.client.messageRead(this.app.uuid) as Message;
return new Response(JSON.stringify(m && {
} else if (request.method === 'POST' && pathname === '/_api/v1/messages/outbox') {
const body = await request.json();
const message = new TextEncoder().encode(body.message);
return new Response(JSON.stringify(await this.client.sendMessage(
body.peer,
body.app_uuid,
this.app.uuid,
message,
)));
} else if (request.method === 'GET' && pathname === '/_api/v1/messages/inbox') {
const messages = await this.client.inboxMessages(this.app.uuid);
return new Response(JSON.stringify(messages.map(m => ({
id: m.id,
peer: m.peer,
uuid: m.uuid,
message: new TextDecoder().decode(m.message),
}));
} else if (request.method === 'POST' && pathname === '/_api/v1/message_next') {
await this.client.messageNext(this.app.uuid);
}))));
} else if (request.method === 'DELETE' && pathname === '/_api/v1/messages/inbox') {
const body = await request.json();
await this.client.deleteInboxMessage(this.app.uuid, body.message_id);
return new Response(JSON.stringify({success: true}));
} else if (pathname === '/_api/v1/messages/inbox/events') {
return eventStream(this.client.inboxEvents(this.app.uuid), event => {
return [event.type, JSON.stringify({
id: event.id,
peer: event.peer,
uuid: event.uuid,
message: new TextDecoder().decode(event.message),
})];
});
} else {
return new Response('Not found', {status: 404});
}
Expand Down
17 changes: 12 additions & 5 deletions mutinyd/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@ pub enum RequestBody {
app_uuid: String,
data: serde_json::Value,
},
MessageSend {
SendMessage {
peer: String,
app_uuid: String,
from_app_uuid: String,
#[serde(with = "serde_bytes")]
message: Vec<u8>,
},
MessageRead {
InboxMessages {
app_uuid: String,
},
MessageNext {
DeleteInboxMessage {
app_uuid: String,
message_id: usize,
},
AppAnnouncements,
SubscribePeerEvents,
SubscribeAnnounceEvents,
SubscribeInboxEvents {
app_uuid: String,
},
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
Expand Down Expand Up @@ -71,8 +75,9 @@ pub enum ResponseBody {
Peers {
peers: Vec<String>,
},
Message {
message: Option<Message>,
Message (Message),
InboxMessages {
messages: Vec<Message>
},
AppAnnouncements {
announcements: Vec<AppAnnouncement>
Expand All @@ -85,6 +90,7 @@ pub enum ResponseBody {
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(tag="type")]
pub struct Message {
pub id: usize,
pub peer: String,
Expand All @@ -94,6 +100,7 @@ pub struct Message {
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(tag="type")]
pub struct AppAnnouncement {
pub peer: String,
pub app_uuid: String,
Expand Down
Loading

0 comments on commit 09891de

Please sign in to comment.