diff --git a/examples/chat/components/send-message-form.js b/examples/chat/components/send-message-form.js index 8f3e9d5..8afcc49 100644 --- a/examples/chat/components/send-message-form.js +++ b/examples/chat/components/send-message-form.js @@ -41,7 +41,7 @@ export default class ChatSendMessageForm extends HTMLElement { ev.preventDefault(); if (selected_announcement.value && local_peer_id.value && local_app_uuid.value) { const message = this.input.value; - await fetch("/_api/v1/message_send", { + await fetch("/_api/v1/messages/outbox", { method: "POST", body: JSON.stringify({ peer: selected_announcement.value.peer, diff --git a/examples/chat/main.js b/examples/chat/main.js index 05fa98a..c182a16 100644 --- a/examples/chat/main.js +++ b/examples/chat/main.js @@ -34,7 +34,7 @@ async function announce(peers) { nick: state.nick.value, }; for (const peer of peers) { - await fetch("/_api/v1/announcements", { + await fetch("/_api/v1/announcements/outbox", { method: 'POST', body: JSON.stringify({peer, data}), }); @@ -42,7 +42,7 @@ async function announce(peers) { } async function fetchAnnouncements() { - const res = await fetch("/_api/v1/announcements"); + const res = await fetch("/_api/v1/announcements/inbox"); const data = /** @type {import("../../lib/client.ts").AppAnnouncement[]} */(await res.json()); // Only list announcements for current app from peers in current discovered list return data.filter(x => { @@ -54,26 +54,33 @@ async function fetchAnnouncements() { }); } -async function getMessages() { - while (true) { - const res = await fetch("/_api/v1/message_read"); - const data = await res.json(); - if (data && state.local_peer_id.value && state.local_app_uuid.value) { - const from = { - peer: data.peer, - app_uuid: data.uuid, - }; - const to = { - peer: state.local_peer_id.value, - app_uuid: state.local_app_uuid.value, - }; - state.appendMessage(from, to, data.message); - await fetch("/_api/v1/message_next", {method: "POST"}); - } else { - // Check again in 1 second - setTimeout(getMessages, 1000); - return; - } +/** @param {import("../../lib/client.ts").MessageJson} message */ +async function receiveMessage(message) { + if (state.local_peer_id.value && state.local_app_uuid.value) { + const from = { + peer: message.peer, + app_uuid: message.uuid, + }; + const to = { + peer: state.local_peer_id.value, + app_uuid: state.local_app_uuid.value, + }; + state.appendMessage(from, to, message.message); + // Delete seen messages + await fetch("/_api/v1/messages/inbox", { + method: "DELETE", + body: JSON.stringify({ + message_id: message.id + }), + }); + } +} + +async function fetchMessages() { + const res = await fetch("/_api/v1/messages/inbox"); + const data = await res.json(); + for (const message of data) { + await receiveMessage(message); } } @@ -82,6 +89,7 @@ await updateLocalPeerId(); await updateLocalAppInstance(); state.announcements.value = await fetchAnnouncements(); state.peers.value = await fetchPeers(); +await fetchMessages(); // Announce app to newly discovered peers announce(state.peers.value); @@ -89,9 +97,6 @@ announce(state.peers.value); // Announce to all peers when nick changes watch([state.nick], () => announce(state.peers.value)); -// Start polling for messages -getMessages(); - // Ask user for nickname askNick(); @@ -108,7 +113,7 @@ peer_events.addEventListener("PeerExpired", peer_id => { state.peers.signal(); }); -const announcement_events = new EventSource("/_api/v1/announcements/events"); +const announcement_events = new EventSource("/_api/v1/announcements/inbox/events"); announcement_events.addEventListener("AppAnnouncement", event => { const announcement = JSON.parse(event.data); console.log('App announced', announcement); @@ -119,3 +124,10 @@ announcement_events.addEventListener("AppAnnouncement", event => { return x; }); }); + +const inbox_events = new EventSource("/_api/v1/messages/inbox/events"); +inbox_events.addEventListener("Message", event => { + const message = JSON.parse(event.data); + console.log('Message received', message); + receiveMessage(/** @type {import("../../lib/client.ts").MessageJson} */(message)); +}); diff --git a/lib/client.ts b/lib/client.ts index 22159b3..302270b 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -52,13 +52,23 @@ interface JsonArray extends Array { } type JsonValue = (null | boolean | number | string | JsonObject | JsonArray); export type Message = { + type: "Message", id: number, peer: string, uuid: string, message: Uint8Array, }; +export type MessageJson = { + type: "Message", + id: number, + peer: string, + uuid: string, + message: string, +}; + export type AppAnnouncement = { + type: "AppAnnouncement", peer: string, app_uuid: string, data: JsonValue, @@ -76,16 +86,17 @@ export type MutinyRequestBody = {type: "LocalPeerId"} | {type: "Announce", peer: string, app_uuid: string, data: JsonValue} | {type: "AppAnnouncements"} | { - type: "MessageSend", + type: "SendMessage", peer: string, app_uuid: string, from_app_uuid: string, message: Uint8Array, } - | {type: "MessageRead", app_uuid: string} - | {type: "MessageNext", app_uuid: string} + | {type: "InboxMessages", app_uuid: string} + | {type: "DeleteInboxMessage", app_uuid: string, message_id: number} | {type: "SubscribePeerEvents"} | {type: "SubscribeAnnounceEvents"} + | {type: "SubscribeInboxEvents", app_uuid: string} ; export type MutinyResponse = { @@ -96,20 +107,14 @@ export type MutinyResponse = { export type PeerEvent = {type: "PeerDiscovered", peer_id: string} | {type: "PeerExpired", peer_id: string}; -export type AnnounceEvent = { - type: "AppAnnouncement", - peer: string, - app_uuid: string, - data: JsonValue, -}; - export type MutinyResponseBody = {type: "Success"} | {type: "Error", message: string} | {type: "LocalPeerId", peer_id: string} | {type: "Peers", peers: string[]} | {type: "AppInstanceUuid", uuid: string | null} | {type: "CreateAppInstance", uuid: string} - | {type: "Message", message: null | Message} + | {type: "Message", message: Message} + | {type: "InboxMessages", messages: Message[]} | {type: "AppAnnouncements", announcements: AppAnnouncement[]} | PeerEvent ; @@ -235,13 +240,11 @@ export class MutinyClient { return this; }, return(value?: PeerEvent) { - console.log('peerEvents iterator return()'); // Remove waiting promise waiting.delete(request.id); return Promise.resolve({value, done: true}); }, next: async () => { - console.log('peerEvents iterator next()'); const value = await promise; // Register next promise promise = new Promise((resolve, reject) => { @@ -260,12 +263,18 @@ export class MutinyClient { return this._subscribe(request); } - announceEvents(): AsyncIterableIterator { + announceEvents(): AsyncIterableIterator { const body: MutinyRequestBody = {type: "SubscribeAnnounceEvents"}; const request = {id: this.next_request_id++, body}; return this._subscribe(request); } + inboxEvents(app_uuid: string): AsyncIterableIterator { + const body: MutinyRequestBody = {type: "SubscribeInboxEvents", app_uuid}; + const request = {id: this.next_request_id++, body}; + return this._subscribe(request); + } + async appInstanceUuid(label: string): Promise { const response = await this.requestOne({type: "AppInstanceUuid", label}); assert(response.type === 'AppInstanceUuid'); @@ -290,14 +299,14 @@ export class MutinyClient { return response.announcements; } - async messageSend( + async sendMessage( peer: string, app_uuid: string, from_app_uuid: string, message: Uint8Array ): Promise { const response = await this.requestOne({ - type: "MessageSend", + type: "SendMessage", peer, app_uuid, from_app_uuid, @@ -307,14 +316,14 @@ export class MutinyClient { return; } - async messageRead(app_uuid: string): Promise { - const response = await this.requestOne({type: "MessageRead", app_uuid}); - assert(response.type === 'Message'); - return response.message; + async inboxMessages(app_uuid: string): Promise { + const response = await this.requestOne({type: "InboxMessages", app_uuid}); + assert(response.type === 'InboxMessages'); + return response.messages; } - async messageNext(app_uuid: string): Promise { - const response = await this.requestOne({type: "MessageNext", app_uuid}); + async deleteInboxMessage(app_uuid: string, message_id: number): Promise { + const response = await this.requestOne({type: "DeleteInboxMessage", app_uuid, message_id}); assert(response.type === 'Success'); return; } diff --git a/mutiny-app/src/main.ts b/mutiny-app/src/main.ts index f13fb51..e9cd41a 100644 --- a/mutiny-app/src/main.ts +++ b/mutiny-app/src/main.ts @@ -58,28 +58,17 @@ export class Server { return eventStream(this.client.peerEvents(), event => { return [event.type, event.peer_id]; }); - } else if (request.method === 'POST' && pathname === '/_api/v1/message_send') { + } else if (request.method === 'POST' && pathname === '/_api/v1/announcements/outbox') { const body = await request.json(); - const message = new TextEncoder().encode(body.message); - return new Response(JSON.stringify(await this.client.messageSend( + await this.client.announce( body.peer, - body.app_uuid, this.app.uuid, - message, - ))); - } else if (pathname === '/_api/v1/announcements') { - if (request.method === 'POST') { - const body = await request.json(); - await this.client.announce( - body.peer, - this.app.uuid, - body.data, - ); - return new Response(JSON.stringify({success: true})); - } else { - return new Response(JSON.stringify(await this.client.announcements())); - } - } else if (pathname === '/_api/v1/announcements/events') { + body.data, + ); + return new Response(JSON.stringify({success: true})); + } else if (pathname === '/_api/v1/announcements/inbox') { + return new Response(JSON.stringify(await this.client.announcements())); + } else if (pathname === '/_api/v1/announcements/inbox/events') { return eventStream(this.client.announceEvents(), event => { return [event.type, JSON.stringify({ peer: event.peer, @@ -87,16 +76,36 @@ export class Server { data: event.data, })]; }); - } else if (pathname === '/_api/v1/message_read') { - const m = await this.client.messageRead(this.app.uuid) as Message; - return new Response(JSON.stringify(m && { + } else if (request.method === 'POST' && pathname === '/_api/v1/messages/outbox') { + const body = await request.json(); + const message = new TextEncoder().encode(body.message); + return new Response(JSON.stringify(await this.client.sendMessage( + body.peer, + body.app_uuid, + this.app.uuid, + message, + ))); + } else if (request.method === 'GET' && pathname === '/_api/v1/messages/inbox') { + const messages = await this.client.inboxMessages(this.app.uuid); + return new Response(JSON.stringify(messages.map(m => ({ + id: m.id, peer: m.peer, uuid: m.uuid, message: new TextDecoder().decode(m.message), - })); - } else if (request.method === 'POST' && pathname === '/_api/v1/message_next') { - await this.client.messageNext(this.app.uuid); + })))); + } else if (request.method === 'DELETE' && pathname === '/_api/v1/messages/inbox') { + const body = await request.json(); + await this.client.deleteInboxMessage(this.app.uuid, body.message_id); return new Response(JSON.stringify({success: true})); + } else if (pathname === '/_api/v1/messages/inbox/events') { + return eventStream(this.client.inboxEvents(this.app.uuid), event => { + return [event.type, JSON.stringify({ + id: event.id, + peer: event.peer, + uuid: event.uuid, + message: new TextDecoder().decode(event.message), + })]; + }); } else { return new Response('Not found', {status: 404}); } diff --git a/mutinyd/src/protocol.rs b/mutinyd/src/protocol.rs index e577a7e..fb053f9 100644 --- a/mutinyd/src/protocol.rs +++ b/mutinyd/src/protocol.rs @@ -22,22 +22,26 @@ pub enum RequestBody { app_uuid: String, data: serde_json::Value, }, - MessageSend { + SendMessage { peer: String, app_uuid: String, from_app_uuid: String, #[serde(with = "serde_bytes")] message: Vec, }, - MessageRead { + InboxMessages { app_uuid: String, }, - MessageNext { + DeleteInboxMessage { app_uuid: String, + message_id: usize, }, AppAnnouncements, SubscribePeerEvents, SubscribeAnnounceEvents, + SubscribeInboxEvents { + app_uuid: String, + }, } #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] @@ -71,8 +75,9 @@ pub enum ResponseBody { Peers { peers: Vec, }, - Message { - message: Option, + Message (Message), + InboxMessages { + messages: Vec }, AppAnnouncements { announcements: Vec @@ -85,6 +90,7 @@ pub enum ResponseBody { } #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(tag="type")] pub struct Message { pub id: usize, pub peer: String, @@ -94,6 +100,7 @@ pub struct Message { } #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(tag="type")] pub struct AppAnnouncement { pub peer: String, pub app_uuid: String, diff --git a/mutinyd/src/server.rs b/mutinyd/src/server.rs index fcc9b07..307ef1b 100644 --- a/mutinyd/src/server.rs +++ b/mutinyd/src/server.rs @@ -17,6 +17,7 @@ pub struct Server { listener: UnixListener, peer_subscribers: HashMap>, announce_subscribers: HashMap>, + inbox_subscribers: HashMap>>, client_request_receiver: mpsc::Receiver, client_request_sender: mpsc::Sender, peers: HashSet<(PeerId, Multiaddr)>, @@ -33,6 +34,7 @@ impl Server { listener: UnixListener::bind(config.socket_path.as_path())?, peer_subscribers: HashMap::new(), announce_subscribers: HashMap::new(), + inbox_subscribers: HashMap::new(), swarm: swarm::start(config.keypair).await?, client_request_receiver: rx, client_request_sender: tx, @@ -118,8 +120,14 @@ impl Server { let from = tx.get_app(peer_id, &from_app_uuid)?.ok_or("Cannot find 'from' app in database")?; let to = tx.get_app(local_peer_id, &to_app_uuid)?.ok_or("Cannot find 'to' app in database")?; let message_id = tx.get_or_put_message_data(&message)?; - tx.put_message_inbox(received, from, to, message_id)?; + let id = tx.put_message_inbox(received, from, to, message_id)?; tx.commit()?; + self.inbox_subscribers_send(to, ResponseBody::Message ( Message { + id: id.try_into()?, + peer: peer.to_base58(), + uuid: from_app_uuid, + message, + })).await; }, } let _ = self.swarm.behaviour_mut().request_response.send_response( @@ -175,6 +183,22 @@ impl Server { } } + async fn inbox_subscribers_send(&mut self, app_id: i64, message: ResponseBody) -> () { + let mut to_remove: Vec = vec![] ; + if let Some(subscribers) = self.inbox_subscribers.get(&app_id) { + for (request_id, sender) in subscribers { + if let Err(err) = sender.send(message.clone()).await { + eprintln!("Error sending message to announce event subscriber: {}", err); + // Remove this subscriber + to_remove.push(*request_id); + } + } + for request_id in to_remove { + self.announce_subscribers.remove(&request_id); + } + } + } + async fn swarm_event(&mut self, event: SwarmEvent) -> Result<(), Box> { match event { SwarmEvent::Behaviour(swarm::MutinyBehaviourEvent::Mdns(ev)) => match ev { @@ -286,20 +310,20 @@ impl Server { Ok(tx.commit()?) } - fn read_message(&mut self, uuid: String) -> Result, Box> { - let tx = self.store.transaction()?; - let local_peer_id = tx.get_peer(&self.peer_id.to_base58())?.ok_or("Cannot find local peer ID in database")?; - let app_id = tx.get_app(local_peer_id, &uuid)?.ok_or("Cannot find 'to' app instance in database")?; - Ok(tx.read_message(app_id)?) - } + // fn read_message(&mut self, uuid: String) -> Result, Box> { + // let tx = self.store.transaction()?; + // let local_peer_id = tx.get_peer(&self.peer_id.to_base58())?.ok_or("Cannot find local peer ID in database")?; + // let app_id = tx.get_app(local_peer_id, &uuid)?.ok_or("Cannot find 'to' app instance in database")?; + // Ok(tx.read_message(app_id)?) + // } - fn next_message(&mut self, uuid: String) -> Result<(), Box> { - let tx = self.store.transaction()?; - let local_peer_id = tx.get_peer(&self.peer_id.to_base58())?.ok_or("Cannot find local peer ID in database")?; - let app_id = tx.get_app(local_peer_id, &uuid)?.ok_or("Cannot find 'to' app instance in database")?; - tx.next_message(app_id)?; - Ok(tx.commit()?) - } + // fn next_message(&mut self, uuid: String) -> Result<(), Box> { + // let tx = self.store.transaction()?; + // let local_peer_id = tx.get_peer(&self.peer_id.to_base58())?.ok_or("Cannot find local peer ID in database")?; + // let app_id = tx.get_app(local_peer_id, &uuid)?.ok_or("Cannot find 'to' app instance in database")?; + // tx.next_message(app_id)?; + // Ok(tx.commit()?) + // } async fn handle_request(&mut self, request: ClientRequest) -> Result<(), Box> { match request.request.body { @@ -350,16 +374,23 @@ impl Server { } let _ = request.response.send(ResponseBody::Success).await; }, - RequestBody::MessageSend {peer, app_uuid, from_app_uuid, message} => { + RequestBody::SendMessage {peer, app_uuid, from_app_uuid, message} => { self.send_message(peer, app_uuid, from_app_uuid, message)?; let _ = request.response.send(ResponseBody::Success).await; }, - RequestBody::MessageRead {app_uuid} => { - let message = self.read_message(app_uuid)?; - let _ = request.response.send(ResponseBody::Message {message}).await; + RequestBody::InboxMessages {app_uuid} => { + let tx = self.store.transaction()?; + let peer_id = tx.get_or_put_peer(&self.peer_id.to_base58())?; + let app_id = tx.get_or_put_app(peer_id, &app_uuid)?; + let _ = request.response.send(ResponseBody::InboxMessages { + messages: tx.list_app_inbox_messages(app_id)?, + }).await; }, - RequestBody::MessageNext {app_uuid} => { - self.next_message(app_uuid)?; + RequestBody::DeleteInboxMessage {app_uuid, message_id} => { + let tx = self.store.transaction()?; + let peer_id = tx.get_or_put_peer(&self.peer_id.to_base58())?; + let app_id = tx.get_or_put_app(peer_id, &app_uuid)?; + tx.delete_inbox_message(app_id, message_id.try_into()?)?; let _ = request.response.send(ResponseBody::Success).await; }, RequestBody::SubscribePeerEvents => { @@ -367,6 +398,13 @@ impl Server { }, RequestBody::SubscribeAnnounceEvents => { self.announce_subscribers.insert(request.request.id, request.response); + }, + RequestBody::SubscribeInboxEvents {app_uuid} => { + let tx = self.store.transaction()?; + let peer_id = tx.get_or_put_peer(&self.peer_id.to_base58())?; + let app_id = tx.get_or_put_app(peer_id, &app_uuid)?; + let subscribers = self.inbox_subscribers.entry(app_id).or_insert(HashMap::new()); + subscribers.insert(request.request.id, request.response); } } Ok(()) diff --git a/mutinyd/src/store.rs b/mutinyd/src/store.rs index 6b1da9a..790b0a8 100644 --- a/mutinyd/src/store.rs +++ b/mutinyd/src/store.rs @@ -258,24 +258,6 @@ impl<'a> StoreTransaction<'a> { Ok(()) } - pub fn put_message_inbox(&self, received: i64, from: i64, to: i64, message_id: i64) -> Result { - let mut stmt = self.tx.prepare_cached( - "INSERT INTO message_inbox (received, from_app_id, to_app_id, message_id) - VALUES (?1, ?2, ?3, ?4) - RETURNING id", - )?; - stmt.query_row([received, from, to, message_id], |row| row.get::<_, i64>(0)) - } - - // pub fn delete_message_inbox(&self, inbox_id: i64) -> Result<()> { - // let mut stmt = self.tx.prepare_cached( - // "DELETE FROM message_inbox - // WHERE id = ?1", - // )?; - // stmt.execute([inbox_id])?; - // self.prune_message_data() - // } - pub fn put_message_outbox(&self, queued: i64, from: i64, to: i64, message_id: i64) -> Result { let mut stmt = self.tx.prepare_cached( "INSERT INTO message_outbox (queued, from_app_id, to_app_id, message_id) @@ -326,7 +308,16 @@ impl<'a> StoreTransaction<'a> { Ok(()) } - pub fn read_message(&self, app_id: i64) -> Result> { + pub fn put_message_inbox(&self, received: i64, from: i64, to: i64, message_id: i64) -> Result { + let mut stmt = self.tx.prepare_cached( + "INSERT INTO message_inbox (received, from_app_id, to_app_id, message_id) + VALUES (?1, ?2, ?3, ?4) + RETURNING id", + )?; + stmt.query_row([received, from, to, message_id], |row| row.get::<_, i64>(0)) + } + + pub fn list_app_inbox_messages(&self, app_id: i64) -> Result> { let mut stmt = self.tx.prepare_cached( "SELECT message_inbox.id, peer.peer_id, app.uuid, data FROM message_inbox @@ -334,29 +325,27 @@ impl<'a> StoreTransaction<'a> { JOIN app ON app.id = from_app_id JOIN peer ON peer.id = app.peer_id WHERE to_app_id = ?1 - ORDER BY message_inbox.id ASC - LIMIT 1", + ORDER BY message_inbox.id ASC", )?; - stmt.query_row([app_id], |row| { - Ok(Message { + let mut rows = stmt.query([app_id])?; + let mut results = Vec::new(); + while let Some(row) = rows.next()? { + results.push(Message { id: row.get::<_, usize>(0)?, peer: row.get::<_, String>(1)?, uuid: row.get::<_, String>(2)?, message: row.get::<_, Vec>(3)?, - }) - }).optional() + }); + } + return Ok(results); } - pub fn next_message(&self, app_id: i64) -> Result<()> { + pub fn delete_inbox_message(&self, to: i64, message_id: i64) -> Result<()> { let mut stmt = self.tx.prepare_cached( "DELETE FROM message_inbox - WHERE id IN ( - SELECT min(id) - FROM message_inbox - WHERE to_app_id = ?1 - )", + WHERE to_app_id = ?1 AND id = ?2", )?; - stmt.execute([app_id])?; + stmt.execute([to, message_id])?; Ok(()) }