Skip to content

Commit

Permalink
fix: improve response parsing to handle tcp fragmentation. if parsing…
Browse files Browse the repository at this point in the history
… a response fails, produce an error response, instead of a connection error
  • Loading branch information
Julusian committed Sep 29, 2023
1 parent 2703db0 commit 332ce67
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 30 deletions.
132 changes: 119 additions & 13 deletions src/__tests__/connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { deserializers } from '../deserializers'
import { Socket as OrgSocket } from 'net'
import { Socket as MockSocket } from '../__mocks__/net'
import { Commands } from '../commands'
import { BasicCasparCGAPI } from '../api'

jest.mock('net')

Expand Down Expand Up @@ -56,6 +57,14 @@ describe('connection', () => {
socket.onClose = onSocketClose
})
}

function extractReqId(index: number) {
const str = onSocketWrite.mock.calls[index - 1][0]
const match = str.match(/REQ (\w+) /)
if (!match) throw new Error(`Failed to find REQ id in "${str}"`)
return match[1]
}

beforeEach(() => {
setupSocketMock()
})
Expand Down Expand Up @@ -110,7 +119,7 @@ describe('connection', () => {
)

// Wait for deserializer to run
await new Promise(process.nextTick.bind(process))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)
Expand Down Expand Up @@ -167,7 +176,7 @@ describe('connection', () => {
sockets[0].mockData(Buffer.from(`<test/></channel>\r\n\r\n`))

// Wait for deserializer to run
await new Promise(process.nextTick.bind(process))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)
Expand Down Expand Up @@ -243,9 +252,7 @@ describe('connection', () => {
sockets[0].mockData(Buffer.from(`RES cmd2 202 PLAY OK\r\n`))

// Wait for deserializer to run
await new Promise(process.nextTick.bind(process))
await new Promise(process.nextTick.bind(process))
await new Promise(process.nextTick.bind(process))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(2)
Expand Down Expand Up @@ -321,19 +328,26 @@ describe('connection', () => {
expect(onSocketWrite).toHaveBeenNthCalledWith(2, 'REQ cmd2 PLAY 1-10\r\n', 'utf-8')

// Reply with a blob designed to crash the xml parser
sockets[0].mockData(Buffer.from(`201 INFO OK\r\n<?xml\r\n\r\n`))
await new Promise(process.nextTick.bind(process))
sockets[0].mockData(Buffer.from(`RES cmd1 201 INFO OK\r\n<?xml\r\n\r\n`))
await new Promise(setImmediate)

// TODO - should the invalid xml cause an error here, or propogate as response?
expect(onConnError).toHaveBeenCalledTimes(1)
expect(onConnData).toHaveBeenCalledTimes(0)
onConnError.mockClear()
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)

// TODO - verify the response of the INFO matches what we expect
// Check result looks good
expect(onConnData).toHaveBeenNthCalledWith(1, {
command: 'INFO',
data: ['<?xml'],
message: 'Invalid response received.',
reqId: 'cmd1',
responseCode: 500,
type: 'FAILED',
})
onConnData.mockClear()

// Reply with successful PLAY
sockets[0].mockData(Buffer.from(`RES cmd2 202 PLAY OK\r\n`))
await new Promise(process.nextTick.bind(process))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)
Expand All @@ -352,5 +366,97 @@ describe('connection', () => {
conn.disconnect()
}
})

it('test with full connection', async () => {
const client = new BasicCasparCGAPI({
host: '127.0.0.1',
port: 5250,
autoConnect: true,
})
try {
expect(client).toBeTruthy()

const onConnError = jest.fn()
// const onConnData = jest.fn()
client.on('error', onConnError)
// client.on('data', onConnData)

const onCommandOk = jest.fn()
const onCommandError = jest.fn()

const sockets = SocketMock.openSockets()
expect(sockets).toHaveLength(1)

// Dispatch a command
const sendError = await client.executeCommand({
command: Commands.Info,
params: {},
})
sendError.request?.then(onCommandOk, onCommandError)
const sendError2 = await client.executeCommand({
command: Commands.Play,
params: {
channel: 1,
layer: 10,
},
})
sendError2.request?.then(onCommandOk, onCommandError)
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onCommandOk).toHaveBeenCalledTimes(0)
expect(onCommandError).toHaveBeenCalledTimes(0)

// Info was sent
expect(onSocketWrite).toHaveBeenCalledTimes(2)
expect(onSocketWrite).toHaveBeenNthCalledWith(1, expect.stringMatching(/REQ (\w+) INFO\r\n/), 'utf-8')
expect(onSocketWrite).toHaveBeenNthCalledWith(
2,
expect.stringMatching(/REQ (\w+) PLAY 1-10\r\n/),
'utf-8'
)

// Reply with a blob designed to crash the xml parser
const infoReqId = extractReqId(1)
sockets[0].mockData(Buffer.from(`RES ${infoReqId} 201 INFO OK\r\n<?xml\r\n\r\n`))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
// expect(onConnData).toHaveBeenCalledTimes(1)
expect(onCommandOk).toHaveBeenCalledTimes(1)
expect(onCommandError).toHaveBeenCalledTimes(0)

// Check result looks good
expect(onCommandOk).toHaveBeenNthCalledWith(1, {
command: 'INFO',
data: ['<?xml'],
message: 'Invalid response received.',
reqId: infoReqId,
responseCode: 500,
type: 'FAILED',
})
onCommandOk.mockClear()

// Reply with successful PLAY
const playReqId = extractReqId(2)
sockets[0].mockData(Buffer.from(`RES ${playReqId} 202 PLAY OK\r\n`))
await new Promise(setImmediate)

expect(onConnError).toHaveBeenCalledTimes(0)
expect(onCommandOk).toHaveBeenCalledTimes(1)
expect(onCommandError).toHaveBeenCalledTimes(0)

// Check result looks good
expect(onCommandOk).toHaveBeenNthCalledWith(1, {
command: 'PLAY',
data: [],
message: 'The command has been executed.',
reqId: playReqId,
responseCode: 202,
type: 'OK',
})
} finally {
// Ensure cleaned up
client.disconnect()
}
})
})
})
2 changes: 1 addition & 1 deletion src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ interface InternalRequest {
}

export interface Response {
reqId?: string
reqId: string | undefined
command: Commands
responseCode: number
data: any[]
Expand Down
61 changes: 45 additions & 16 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export type ConnectionEvents = {

export class Connection extends EventEmitter<ConnectionEvents> {
private _socket?: Socket
private _unprocessedData = ''
private _unprocessedLines: string[] = []
private _reconnectTimeout?: NodeJS.Timeout
private _connected = false
Expand Down Expand Up @@ -125,20 +126,25 @@ export class Connection extends EventEmitter<ConnectionEvents> {
})
}

private async _processIncomingData(data: Buffer) {
const string = data.toString('utf-8')
const newLines = string.split('\r\n')

private _processIncomingData(data: Buffer) {
/**
* This is a simple strategy to handle receiving newline separated data, factoring in arbitrary TCP fragmentation.
* It is common for a long response to be split across multiple packets, most likely with the split happening in the middle of a line.
*/
this._unprocessedData += data.toString('utf-8')
const newLines = this._unprocessedData.split('\r\n')
// Pop and preserve the last fragment as unprocessed. In most cases this will be an empty string, but it could be the first portion of a line
this._unprocessedData = newLines.pop() ?? ''
this._unprocessedLines.push(...newLines)

while (this._unprocessedLines.length > 0) {
const result = RESPONSE_REGEX.exec(this._unprocessedLines[0])
let processedLines = 0

if (result && result.groups?.['ResponseCode']) {
if (result?.groups?.['ResponseCode']) {
// create a response object
const responseCode = parseInt(result?.groups?.['ResponseCode'])
const response = {
const response: Response = {
reqId: result?.groups?.['ReqId'],
command: result?.groups?.['Action'] as Commands,
responseCode,
Expand All @@ -149,22 +155,41 @@ export class Connection extends EventEmitter<ConnectionEvents> {

// parse additional lines if needed
if (response.responseCode === 200) {
const indexOfBlankLine = this._unprocessedLines.indexOf('')
if (indexOfBlankLine === -1) break // No termination yet, try again later

// multiple lines of data
response.data = this._unprocessedLines.slice(1, this._unprocessedLines.indexOf(''))
response.data = this._unprocessedLines.slice(1, indexOfBlankLine)
processedLines += response.data.length + 1 // data lines + 1 empty line
} else if (response.responseCode === 201 || response.responseCode === 400) {
if (this._unprocessedLines.length < 2) break // No data line, try again later

response.data = [this._unprocessedLines[1]]
processedLines++
}

const deserializers = this._getVersionedDeserializers()
// attempt to deserialize the response if we can
if (deserializers[response.command] && response.data.length) {
response.data = await deserializers[response.command](response.data)
}

// now do something with response
this.emit('data', response)
// Parse the command after `this._unprocessedLines` has been updated
setImmediate(() => {
Promise.resolve()
.then(async () => {
const deserializers = this._getVersionedDeserializers()
// attempt to deserialize the response if we can
if (deserializers[response.command] && response.data.length) {
response.data = await deserializers[response.command](response.data)
}

// now do something with response
this.emit('data', response)
})
.catch(() => {
this.emit('data', {
...response,
responseCode: 500, // TODO better value?
type: ResponseTypes.ServerError,
message: 'Invalid response received.',
})
})
})
} else {
// well this is not happy, do we do something?
// perhaps this is the infamous 100 or 101 response code, although that doesn't appear in casparcg source code
Expand Down Expand Up @@ -198,7 +223,11 @@ export class Connection extends EventEmitter<ConnectionEvents> {
this._socket.setEncoding('utf-8')

this._socket.on('data', (data) => {
this._processIncomingData(data).catch((e) => this.emit('error', e))
try {
this._processIncomingData(data)
} catch (e: any) {
this.emit('error', e)
}
})
this._socket.on('connect', () => {
this._setConnected(true)
Expand Down

0 comments on commit 332ce67

Please sign in to comment.