Skip to content

Commit

Permalink
Simplify RPC call logic (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmi authored Jan 25, 2023
1 parent 37df71a commit 3e25e37
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 141 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16-alpine
FROM node:18-alpine

# Create app directory
WORKDIR /usr/src/app
Expand Down
124 changes: 68 additions & 56 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,45 @@ class Wire {
},
];
this._left = false;

this.registeredRPC = {};

// Receive server RPC calls
this._socket.on(`${this.room}._call`, async ({ callId, name, params }) => {
try {
const result = await this.registeredRPC[name](params);
socket.emit(`${this.room}._result.${callId}`, {
ok: result ? result : null,
});
} catch (err) {
socket.emit(`${this.room}._result.${callId}`, {
err: `${err.message}`,
});
}
});
}

/**
* Call a server procedure.
*
* @param {string} action name of the operation to call on the server.
* @param {*} params the params of the action.
* @returns the result of the call.
*/
async _callServerRPC(name, params) {
const callId = nanoid();
return new Promise((resolve, reject) => {
this._socket.once(`${this.room}._result.${callId}`, (result) => {
if (result.hasOwnProperty('ok')) {
resolve(result.ok);
} else {
reject(result.err);
}
});
this._socket.emit(`${this.room}._call`, { callId, name, params });
});
}

/**
* Leave current room
* @param {string} room name.
Expand Down Expand Up @@ -43,80 +81,54 @@ class Wire {
*/
subscribe(event, callback) {
this._socket.on(`${this.room}.${event}`, callback);

const unregisterCallback = () => {
this._socket.off(`${this.room}.${event}`, callback);
};

this.toUnregister.push(unregisterCallback);

return unregisterCallback;
}

/**
* Register a new RPC function.
* @param {string} name of function
* @param {function} callback the function that handle the function result
* @param {object} params the configuration of the RPC. For now only `invoke`
* parameter is allowed with the following values:
* - 'single' for a RPC that can be registered only once.
* - 'first' The first registered client is called.
* - 'last' The last registered client is called.
* - 'random' A random RPC is called.
*/
register(name, callback, { invoke = 'single' } = {}) {
const registerId = nanoid();
// Executed when another client call the function
const toBeCalled = async ({ callId, params }) => {
try {
const result = await callback(params);
this._socket.emit(`${this.room}.result.${callId}`, {
ok: result,
});
} catch (err) {
// Error handling
this._socket.emit(`${this.room}.result.${callId}`, { err: '' + err });
}
};
return new Promise((resolve, reject) => {
this._socket.once(
`${this.room}.register.${name}.${registerId}`,
(result) => {
if (result.hasOwnProperty('ok')) {
// Remove previously registered callback
this._socket.off(`${this.room}.call.${name}`);
this._socket.on(`${this.room}.call.${name}`, toBeCalled);

// Return unregister callback
const unregisterCallback = () =>
new Promise((resolve) => {
this._socket.once(`${this.room}.unregister.${name}`, () => {
this._socket.off(`${this.room}.call.${name}`);
resolve();
});
this._socket.emit(`${this.room}.unregister`, { name });
});

this.toUnregister.push(unregisterCallback);

resolve(unregisterCallback);
} else {
reject(result.err);
}
}
);
this._socket.emit(`${this.room}.register`, { registerId, name, invoke });
async register(name, callback, { invoke = 'single' } = {}) {
// Add to locally registered callback
this.registeredRPC[name] = callback;

await this._callServerRPC('register', {
name,
invoke,
});

// Return unregister callback
const unregisterCallback = () => {
delete this.registeredRPC[name];
return this._callServerRPC('unregister', { name });
};

this.toUnregister.push(unregisterCallback);

return unregisterCallback;
}

/**
* Call a previously registered function with `params` arguments.
* @param {string} name of function
* @param {*} params arguments of the called function.
* @param {*} params parameters of the called function.
*/
call(name, params) {
return new Promise((resolve, reject) => {
const callId = nanoid();
this._socket.once(`${this.room}.result.${callId}`, (result) => {
if (result.hasOwnProperty('ok')) {
resolve(result.ok);
} else {
reject(result.err);
}
});
this._socket.emit(`${this.room}.call`, { name, callId, params });
});
async call(name, params) {
return await this._callServerRPC('call', { name, params });
}
}

Expand All @@ -128,7 +140,7 @@ class Wire {
* the room, i.e. the first client or the next one if the first quit.
* @param {function} onJoined is called on each connection, reconnection after
* wire.io is initialized.
* @param {string} userId (optionnal) to force userId.
* @param {string} userId (optional) to force userId.
*/
export const joinWire = ({
socket,
Expand Down
2 changes: 1 addition & 1 deletion src/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ describe('Client', () => {
try {
await room2.call('testrpc', { test: 'testerror' });
} catch (err) {
expect(err).toBe('Error: test error');
expect(err).toBe('test error');
done();
}
});
Expand Down
Loading

0 comments on commit 3e25e37

Please sign in to comment.