diff --git a/docker/Dockerfile b/docker/Dockerfile index 13cbb14..1114987 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM node:16-alpine +FROM node:18-alpine # Create app directory WORKDIR /usr/src/app diff --git a/src/client.js b/src/client.js index 24e1c4c..7a06860 100644 --- a/src/client.js +++ b/src/client.js @@ -12,7 +12,45 @@ class Wire { }, ]; this._left = false; + + this.registeredRPC = {}; + + // Receive server RPC calls + this._socket.on(`${this.room}._call`, async ({ callId, name, params }) => { + try { + const result = await this.registeredRPC[name](params); + socket.emit(`${this.room}._result.${callId}`, { + ok: result ? result : null, + }); + } catch (err) { + socket.emit(`${this.room}._result.${callId}`, { + err: `${err.message}`, + }); + } + }); + } + + /** + * Call a server procedure. + * + * @param {string} action name of the operation to call on the server. + * @param {*} params the params of the action. + * @returns the result of the call. + */ + async _callServerRPC(name, params) { + const callId = nanoid(); + return new Promise((resolve, reject) => { + this._socket.once(`${this.room}._result.${callId}`, (result) => { + if (result.hasOwnProperty('ok')) { + resolve(result.ok); + } else { + reject(result.err); + } + }); + this._socket.emit(`${this.room}._call`, { callId, name, params }); + }); } + /** * Leave current room * @param {string} room name. @@ -43,10 +81,13 @@ class Wire { */ subscribe(event, callback) { this._socket.on(`${this.room}.${event}`, callback); + const unregisterCallback = () => { this._socket.off(`${this.room}.${event}`, callback); }; + this.toUnregister.push(unregisterCallback); + return unregisterCallback; } @@ -54,69 +95,40 @@ class Wire { * Register a new RPC function. * @param {string} name of function * @param {function} callback the function that handle the function result + * @param {object} params the configuration of the RPC. For now only `invoke` + * parameter is allowed with the following values: + * - 'single' for a RPC that can be registered only once. + * - 'first' The first registered client is called. + * - 'last' The last registered client is called. + * - 'random' A random RPC is called. */ - register(name, callback, { invoke = 'single' } = {}) { - const registerId = nanoid(); - // Executed when another client call the function - const toBeCalled = async ({ callId, params }) => { - try { - const result = await callback(params); - this._socket.emit(`${this.room}.result.${callId}`, { - ok: result, - }); - } catch (err) { - // Error handling - this._socket.emit(`${this.room}.result.${callId}`, { err: '' + err }); - } - }; - return new Promise((resolve, reject) => { - this._socket.once( - `${this.room}.register.${name}.${registerId}`, - (result) => { - if (result.hasOwnProperty('ok')) { - // Remove previously registered callback - this._socket.off(`${this.room}.call.${name}`); - this._socket.on(`${this.room}.call.${name}`, toBeCalled); - - // Return unregister callback - const unregisterCallback = () => - new Promise((resolve) => { - this._socket.once(`${this.room}.unregister.${name}`, () => { - this._socket.off(`${this.room}.call.${name}`); - resolve(); - }); - this._socket.emit(`${this.room}.unregister`, { name }); - }); - - this.toUnregister.push(unregisterCallback); - - resolve(unregisterCallback); - } else { - reject(result.err); - } - } - ); - this._socket.emit(`${this.room}.register`, { registerId, name, invoke }); + async register(name, callback, { invoke = 'single' } = {}) { + // Add to locally registered callback + this.registeredRPC[name] = callback; + + await this._callServerRPC('register', { + name, + invoke, }); + + // Return unregister callback + const unregisterCallback = () => { + delete this.registeredRPC[name]; + return this._callServerRPC('unregister', { name }); + }; + + this.toUnregister.push(unregisterCallback); + + return unregisterCallback; } /** * Call a previously registered function with `params` arguments. * @param {string} name of function - * @param {*} params arguments of the called function. + * @param {*} params parameters of the called function. */ - call(name, params) { - return new Promise((resolve, reject) => { - const callId = nanoid(); - this._socket.once(`${this.room}.result.${callId}`, (result) => { - if (result.hasOwnProperty('ok')) { - resolve(result.ok); - } else { - reject(result.err); - } - }); - this._socket.emit(`${this.room}.call`, { name, callId, params }); - }); + async call(name, params) { + return await this._callServerRPC('call', { name, params }); } } @@ -128,7 +140,7 @@ class Wire { * the room, i.e. the first client or the next one if the first quit. * @param {function} onJoined is called on each connection, reconnection after * wire.io is initialized. - * @param {string} userId (optionnal) to force userId. + * @param {string} userId (optional) to force userId. */ export const joinWire = ({ socket, diff --git a/src/client.test.js b/src/client.test.js index 08af9db..9ba2141 100644 --- a/src/client.test.js +++ b/src/client.test.js @@ -192,7 +192,7 @@ describe('Client', () => { try { await room2.call('testrpc', { test: 'testerror' }); } catch (err) { - expect(err).toBe('Error: test error'); + expect(err).toBe('test error'); done(); } }); diff --git a/src/server.js b/src/server.js index 31cda65..8727e1f 100644 --- a/src/server.js +++ b/src/server.js @@ -30,106 +30,106 @@ export const handleWire = ( isMaster, }); - // Publish event to others and self if `self` - socket.on(`${roomName}.publish`, ({ name, params, self }) => { - if (self) { - socket.emit(`${roomName}.${name}`, params); - } - socket.broadcast.to(roomName).emit(`${roomName}.${name}`, params); - }); + /** + * Call a remote function on the client. + * @param {string} name the name of the function + * @param {*} params the params of the call + * @returns the call result (async). + */ + const _callClientRPC = async (name, params) => { + const callId = nanoid(); + return new Promise((resolve, reject) => { + socket.once(`${roomName}._result.${callId}`, (result) => { + if (result.hasOwnProperty('ok')) { + resolve(result.ok); + } else { + reject(new Error(result.err)); + } + }); + socket.emit(`${roomName}._call`, { callId, name, params }); + }); + }; - // Register new remote function - socket.on( - `${roomName}.register`, - ({ registerId, name, invoke = 'single' }) => { - const existingInvoke = rooms[roomName].rpc[name]?.invoke; - - if (existingInvoke && invoke !== existingInvoke) { - socket.emit(`${roomName}.register.${name}.${registerId}`, { - err: `Can't register a new function under the ${name} with this invoke value.`, - }); - return; - } - if ( - existingInvoke == 'single' && - rooms[roomName].rpc[name].callbacks.length >= 1 - ) { - socket.emit(`${roomName}.register.${name}.${registerId}`, { - err: `Function ${name} al`, - }); - return; - } + /** + * Register a new RPC from the client. + * @param {*} param0 + */ + const register = ({ name, invoke = 'single' }) => { + const existingInvoke = rooms[roomName].rpc[name]?.invoke; + + if (existingInvoke && invoke !== existingInvoke) { + throw new Error( + `Can't register a new function under the ${name} with this invoke value.` + ); + } + if ( + existingInvoke == 'single' && + rooms[roomName].rpc[name].callbacks.length >= 1 + ) { + throw new Error(`Function ${name} already exists`); + } - // Define function for the RPC - const rpcCallback = ({ callId, params }) => { - return new Promise((resolve, reject) => { - if (!socket.connected) { - // Handle case of disconnected socket - delete rooms[roomName].rpc[name]; - resolve({ err: `Function ${name} is not registered` }); - } else { - // Schedule result - socket.once(`${roomName}.result.${callId}`, (result) => { - resolve(result); - }); - // Call function from client - socket.emit(`${roomName}.call.${name}`, { callId, params }); - } - }); + if (!rooms[roomName].rpc[name]) { + rooms[roomName].rpc[name] = { + invoke, + callbacks: [], }; + } - if (!rooms[roomName].rpc[name]) { - rooms[roomName].rpc[name] = { - invoke, - callbacks: [], - }; - } + // Remove previously registered callback from the same client + if (registeredRPCs[name]) { + rooms[roomName].rpc[name].callbacks = rooms[roomName].rpc[ + name + ].callbacks.filter((callback) => callback !== registeredRPCs[name]); + } - // Remove previously registered callback from the same client - if (registeredRPCs[name]) { - rooms[roomName].rpc[name].callbacks = rooms[roomName].rpc[ - name - ].callbacks.filter((callback) => callback !== registeredRPCs[name]); + const rpcCallback = async (params) => { + if (!socket.connected) { + throw new Error(`Function ${name} is not registered`); } + return await _callClientRPC(name, params); + }; - registeredRPCs[name] = rpcCallback; - rooms[roomName].rpc[name].callbacks.push(rpcCallback); - - socket.emit(`${roomName}.register.${name}.${registerId}`, { ok: true }); - } - ); + registeredRPCs[name] = rpcCallback; + rooms[roomName].rpc[name].callbacks.push(rpcCallback); + }; - socket.on(`${roomName}.unregister`, ({ name }) => { - if (rooms[roomName].rpc[name] === undefined) { - socket.emit(`${roomName}.unregister.${name}`); - } else { + /** + * Unregister a RPC from the client. + * @param {*} param0 + */ + const unregister = ({ name }) => { + if (rooms[roomName].rpc[name] !== undefined) { const { callbacks } = rooms[roomName].rpc[name]; rooms[roomName].rpc[name].callbacks = callbacks.filter( (rpc) => rpc !== registeredRPCs[name] ); - // Remove everything if it was the last + // Remove everything if it was the last function if (rooms[roomName].rpc[name].callbacks.length === 0) { delete rooms[roomName].rpc[name]; } delete registeredRPCs[name]; - - socket.emit(`${roomName}.unregister.${name}`); } - }); + }; - // Call a RPC from another client - socket.on(`${roomName}.call`, async ({ name, callId, params }) => { + /** + * Call a RPC on another client. + * @param {*} param0 + * @returns + */ + const call = async ({ name, params }) => { if ( rooms[roomName].rpc[name] === undefined || rooms[roomName].rpc[name].callbacks.length === 0 ) { - socket.emit(`${roomName}.result.${callId}`, { - err: `Function ${name} is not registered`, - }); + throw new Error(`Function ${name} is not registered`); } else { const { invoke, callbacks } = rooms[roomName].rpc[name]; + let callback; + + // Select the callback to execute switch (invoke) { case 'random': callback = callbacks[Math.floor(Math.random() * callbacks.length)]; @@ -140,18 +140,43 @@ export const handleWire = ( case 'first': case 'single': default: - // Select the callback to execute callback = callbacks[0]; } - const result = await callback({ - callId, - params, + + return await callback(params); + } + }; + + const actions = { register, unregister, call }; + + /** + * Handle all calls from the client. + */ + socket.on(`${roomName}._call`, async ({ callId, name, params }) => { + try { + if (!actions[name]) { + throw new Error(`Method ${name} does not exist`); + } + const result = await actions[name](params); + socket.emit(`${roomName}._result.${callId}`, { + ok: result ? result : null, }); - // Return result to caller - socket.emit(`${roomName}.result.${callId}`, result); + } catch (err) { + socket.emit(`${roomName}._result.${callId}`, { err: `${err.message}` }); } }); + // Publish event to others and self if `self` + socket.on(`${roomName}.publish`, ({ name, params, self }) => { + if (self) { + socket.emit(`${roomName}.${name}`, params); + } + socket.broadcast.to(roomName).emit(`${roomName}.${name}`, params); + }); + + /** + * Called when the user leave the room. + */ const onLeave = () => { // Remove registered RPCs from this client rooms[roomName].rpc = Object.fromEntries( @@ -196,9 +221,8 @@ export const handleWire = ( socket.on('disconnect', onLeave); socket.once(`${roomName}.leave`, () => { - socket.removeAllListeners(`${roomName}.register`); - socket.removeAllListeners(`${roomName}.unregister`); - socket.removeAllListeners(`${roomName}.call`); + // Remove all listeners + socket.removeAllListeners(`${roomName}._call`); socket.removeAllListeners(`${roomName}.publish`); socket.off('disconnect', onLeave); onLeave(); @@ -208,8 +232,10 @@ export const handleWire = ( if (isMaster) { promoteMaster(); } + socket.emit(`${roomName}.roomJoined`, userId); socket.broadcast.to(roomName).emit(`${roomName}.userEnter`, userId); + log( `${logPrefix}User ${userId} joined room ${roomName}.${ isMaster ? ' Is room master.' : ''