Skip to content

Commit

Permalink
fix: discard unprocessed lines/data when connection connects/disconnects
Browse files Browse the repository at this point in the history
If there is any data there, it will be incomplete, with the remainder lost due to the broken connection
  • Loading branch information
Julusian committed Sep 29, 2023
1 parent 332ce67 commit 08bb870
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
100 changes: 99 additions & 1 deletion src/__tests__/connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ describe('connection', () => {
}
})

it('test with full connection', async () => {
it('test with full client', async () => {
const client = new BasicCasparCGAPI({
host: '127.0.0.1',
port: 5250,
Expand Down Expand Up @@ -458,5 +458,103 @@ describe('connection', () => {
client.disconnect()
}
})

it('connection loss midway through response', async () => {
const conn = new Connection('127.0.0.1', 5250, true)
try {
expect(conn).toBeTruthy()

const onConnError = jest.fn()
const onConnData = jest.fn()
conn.on('error', onConnError)
conn.on('data', onConnData)

const sockets = SocketMock.openSockets()
expect(sockets).toHaveLength(1)

// Dispatch a command
const sendError = await conn.sendCommand(
{
command: Commands.Info,
params: {},
},
'cmd1'
)
expect(sendError).toBeFalsy()
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(0)

// Info was sent
expect(onSocketWrite).toHaveBeenCalledTimes(1)
expect(onSocketWrite).toHaveBeenNthCalledWith(1, 'REQ cmd1 INFO\r\n', 'utf-8')
// expect(onSocketWrite).toHaveBeenNthCalledWith(2, 'REQ cmd2 PLAY 1-10\r\n', 'utf-8')
onSocketWrite.mockClear()

// Reply with a part of a fragmented message
sockets[0].mockData(Buffer.from(`RES cmd1 201 INFO OK\r\n<?xml`))
await new Promise(setImmediate)

expect(conn.connected).toBeTruthy()
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(0)

// Simulate connection failure
sockets[0].emit('close', new Error('Connection lost'))
await new Promise(setImmediate)

expect(conn.connected).toBeFalsy()
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(0)

// Reconnect
sockets[0].emit('connect')
await new Promise(setImmediate)

expect(conn.connected).toBeTruthy()
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(0)

// Send a command in the new connection
const sendError2 = await conn.sendCommand(
{
command: Commands.Play,
params: {
channel: 1,
layer: 10,
},
},
'cmd2'
)
expect(sendError2).toBeFalsy()

// Check was sent
expect(onSocketWrite).toHaveBeenCalledTimes(1)
expect(onSocketWrite).toHaveBeenNthCalledWith(1, 'REQ cmd2 PLAY 1-10\r\n', 'utf-8')

await new Promise(setImmediate)
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(0)

// Reply with successful PLAY
sockets[0].mockData(Buffer.from(`RES cmd2 202 PLAY OK\r\n`))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)

// Check result looks good
expect(onConnData).toHaveBeenNthCalledWith(1, {
command: 'PLAY',
data: [],
message: 'The command has been executed.',
reqId: 'cmd2',
responseCode: 202,
type: 'OK',
})
} finally {
// Ensure cleaned up
conn.disconnect()
}
})
})
})
12 changes: 12 additions & 0 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,19 @@ export class Connection extends EventEmitter<ConnectionEvents> {
})
this._socket.on('connect', () => {
this._setConnected(true)

// Any data which hasn't been parsed yet is now incomplete, and can be discarded
this._discardUnprocessed()
})
this._socket.on('close', () => {
this._discardUnprocessed()

this._setConnected(false)
this._triggerReconnect()
})
this._socket.on('error', (e) => {
this._discardUnprocessed()

if (`${e}`.match(/ECONNREFUSED/)) {
// Unable to connect, no need to handle this error
this._setConnected(false)
Expand All @@ -248,6 +255,11 @@ export class Connection extends EventEmitter<ConnectionEvents> {
this._socket.connect(this.port, this.host)
}

private _discardUnprocessed() {
this._unprocessedData = ''
this._unprocessedLines = []
}

private _setConnected(connected: boolean) {
if (connected) {
if (!this._connected) {
Expand Down

0 comments on commit 08bb870

Please sign in to comment.