diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 91a77c3..76e50ae 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -269,8 +269,8 @@ const broadcastRoomMessage = (room, m) => { const announceSignalingInfo = room => { signalingConns.forEach(conn => { // only subscribe if connection is established, otherwise the conn automatically subscribes to all rooms - if (conn.connected) { - conn.send({ type: 'subscribe', topics: [room.name] }) + if (conn.connected()) { + conn.subscribe(room.name) if (room.webrtcConns.size < room.provider.maxConns) { publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId }) } @@ -412,8 +412,8 @@ export class Room { disconnect () { // signal through all available signaling connections signalingConns.forEach(conn => { - if (conn.connected) { - conn.send({ type: 'unsubscribe', topics: [this.name] }) + if (conn.connected()) { + conn.unsubscribe(this.name) } }) awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'disconnect') @@ -466,96 +466,130 @@ const openRoom = (doc, provider, name, key) => { const publishSignalingMessage = (conn, room, data) => { if (room.key) { cryptoutils.encryptJson(data, room.key).then(data => { - conn.send({ type: 'publish', topic: room.name, data: buffer.toBase64(data) }) + conn.publish(room.name, buffer.toBase64(data)) }) } else { - conn.send({ type: 'publish', topic: room.name, data }) + conn.publish(room.name, data) } } -export class SignalingConn extends ws.WebsocketClient { +export class SignalingConn { constructor (url) { - super(url) + this.url = url /** * @type {Set} */ this.providers = new Set() - this.on('connect', () => { - log(`connected (${url})`) - const topics = Array.from(rooms.keys()) - this.send({ type: 'subscribe', topics }) - rooms.forEach(room => - publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) - ) + this.setupClient() + } + + setupClient() { + this.client = new ws.WebsocketClient(this.url) + this.client.on('connect', () => { + log(`connected (${this.url})`) + this.handleConnect() }) - this.on('message', m => { + this.client.on('message', m => { switch (m.type) { case 'publish': { - const roomName = m.topic - const room = rooms.get(roomName) - if (room == null || typeof roomName !== 'string') { - return + this.handleMessage(m.topic, m.data) + } + } + }) + this.client.on('disconnect', () => log(`disconnect (${this.url})`)) + } + + handleConnect() { + const topics = Array.from(rooms.keys()) + topics.forEach(topic => + this.subscribe(topic) + ) + rooms.forEach(room => + publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) + ) + } + + handleMessage (roomName, data) { + const room = rooms.get(roomName) + if (room == null || typeof roomName !== 'string') { + return + } + const execMessage = data => { + const webrtcConns = room.webrtcConns + const peerId = room.peerId + if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { + // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel + return + } + const emitPeerChange = webrtcConns.has(data.from) + ? () => {} + : () => + room.provider.emit('peers', [{ + removed: [], + added: [data.from], + webrtcPeers: Array.from(room.webrtcConns.keys()), + bcPeers: Array.from(room.bcConns) + }]) + switch (data.type) { + case 'announce': + if (webrtcConns.size < room.provider.maxConns) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) + emitPeerChange() } - const execMessage = data => { - const webrtcConns = room.webrtcConns - const peerId = room.peerId - if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { - // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel - return - } - const emitPeerChange = webrtcConns.has(data.from) - ? () => {} - : () => - room.provider.emit('peers', [{ - removed: [], - added: [data.from], - webrtcPeers: Array.from(room.webrtcConns.keys()), - bcPeers: Array.from(room.bcConns) - }]) - switch (data.type) { - case 'announce': - if (webrtcConns.size < room.provider.maxConns) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) - emitPeerChange() - } - break - case 'signal': - if (data.signal.type === 'offer') { - const existingConn = webrtcConns.get(data.from) - if (existingConn) { - const remoteToken = data.token - const localToken = existingConn.glareToken - if (localToken && localToken > remoteToken) { - log('offer rejected: ', data.from) - return - } - // if we don't reject the offer, we will be accepting it and answering it - existingConn.glareToken = undefined - } - } - if (data.signal.type === 'answer') { - log('offer answered by: ', data.from) - const existingConn = webrtcConns.get(data.from) - existingConn.glareToken = undefined - } - if (data.to === peerId) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) - emitPeerChange() - } - break + break + case 'signal': + if (data.signal.type === 'offer') { + const existingConn = webrtcConns.get(data.from) + if (existingConn) { + const remoteToken = data.token + const localToken = existingConn.glareToken + if (localToken && localToken > remoteToken) { + log('offer rejected: ', data.from) + return + } + // if we don't reject the offer, we will be accepting it and answering it + existingConn.glareToken = undefined } } - if (room.key) { - if (typeof m.data === 'string') { - cryptoutils.decryptJson(buffer.fromBase64(m.data), room.key).then(execMessage) - } - } else { - execMessage(m.data) + if (data.signal.type === 'answer') { + log('offer answered by: ', data.from) + const existingConn = webrtcConns.get(data.from) + existingConn.glareToken = undefined } - } + if (data.to === peerId) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) + emitPeerChange() + } + break } - }) - this.on('disconnect', () => log(`disconnect (${url})`)) + } + if (room.key) { + if (typeof data === 'string') { + cryptoutils.decryptJson(buffer.fromBase64(data), room.key).then(execMessage) + } + } else { + execMessage(data) + } + } + + connected () { + return this.client.connected + } + + subscribe (roomName) { + this.client.send({ type: 'subscribe', topics: [roomName] }) + } + + unsubscribe (roomName) { + this.client.send({ type: 'unsubscribe', topics: [roomName] }) + } + + publish (roomName, message) { + this.client.send({ type: 'publish', topic: roomName, data: message }) + } + + destroy () { + this.client.destroy() } } @@ -633,11 +667,7 @@ export class WebrtcProvider extends Observable { connect () { this.shouldConnect = true - this.signalingUrls.forEach(url => { - const signalingConn = map.setIfUndefined(signalingConns, url, () => new SignalingConn(url)) - this.signalingConns.push(signalingConn) - signalingConn.providers.add(this) - }) + this.signalingUrls.forEach(url => this.addSignalingConn(url, () => new SignalingConn(url))) if (this.room) { this.room.connect() } @@ -666,4 +696,10 @@ export class WebrtcProvider extends Observable { }) super.destroy() } + + addSignalingConn(url, conn) { + const signalingConn = map.setIfUndefined(signalingConns, url, conn) + this.signalingConns.push(signalingConn) + signalingConn.providers.add(this) + } }