Skip to content

Commit

Permalink
fix: socket-io client should not disconnect with no event reply (#1800)
Browse files Browse the repository at this point in the history
* fix: socket-io client should not disconnect with no event reply

* ci: disable socket-io per-message timeout test
  • Loading branch information
zone117x authored Jan 5, 2024
1 parent 242cb04 commit d596fd5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 80 deletions.
24 changes: 10 additions & 14 deletions docs/socket-io/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@ export type Topic =
| NftAssetEventTopic
| NftCollectionEventTopic;

// Allows timeout callbacks for messages. See
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;

export interface ClientToServerMessages {
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
unsubscribe: (...topic: Topic[]) => void;
}

export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
export interface ServerToClientMessages {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempool: (transaction: MempoolTransaction) => void;
'nft-event': (event: NftEvent) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
}
80 changes: 15 additions & 65 deletions src/api/routes/ws/channels/socket-io-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const component = { component: 'socket-io' };
* SocketIO channel for sending real time API updates.
*/
export class SocketIOChannel extends WebSocketChannel {
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
private adapter?: Adapter;

constructor(server: http.Server) {
Expand All @@ -44,14 +44,11 @@ export class SocketIOChannel extends WebSocketChannel {
}

connect(): void {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
this.server,
{
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
}
);
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
});
this.io = io;

io.on('connection', async socket => {
Expand Down Expand Up @@ -169,13 +166,6 @@ export class SocketIOChannel extends WebSocketChannel {
return false;
}

private async getTopicSockets(room: Topic) {
if (!this.io) {
return;
}
return await this.io.to(room).fetchSockets();
}

send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
Expand All @@ -190,52 +180,32 @@ export class SocketIOChannel extends WebSocketChannel {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.prometheus?.sendEvent('block');
void this.getTopicSockets('block').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
)
);
this.io?.to('block').emit('block', block);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.prometheus?.sendEvent('microblock');
void this.getTopicSockets('microblock').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
)
);
this.io?.to('microblock').emit('microblock', microblock);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.prometheus?.sendEvent('mempool');
void this.getTopicSockets('mempool').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
)
);
this.io?.to('mempool').emit('mempool', tx);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
const topic: TransactionTopic = `transaction:${tx.tx_id}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, tx);
break;
}
case 'nftEvent': {
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
this.prometheus?.sendEvent('nft-event');
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
)
);
this.io?.to('nft-event').emit('nft-event', event);
break;
}
case 'nftAssetEvent': {
Expand All @@ -244,13 +214,7 @@ export class SocketIOChannel extends WebSocketChannel {
>;
this.prometheus?.sendEvent('nft-asset-event');
const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, assetIdentifier, value, event);
break;
}
case 'nftCollectionEvent': {
Expand All @@ -259,35 +223,21 @@ export class SocketIOChannel extends WebSocketChannel {
>;
this.prometheus?.sendEvent('nft-collection-event');
const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit(topic, assetIdentifier, event, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, assetIdentifier, event);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
this.prometheus?.sendEvent('address-transaction');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
})
);
this.io?.to(topic).emit(topic, principal, tx);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
this.prometheus?.sendEvent('address-stx-balance');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
})
);
this.io?.to(topic).emit(topic, principal, balance);
break;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/tests/socket-io-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ describe('socket-io', () => {
}
});

test('message timeout disconnects client', async () => {
// Per message timeout is not enabled (we don't want to require clients to explicitly reply to events)
test.skip('message timeout disconnects client', async () => {
const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,
Expand Down

0 comments on commit d596fd5

Please sign in to comment.