diff --git a/src/__tests__/connection.spec.ts b/src/__tests__/connection.spec.ts
index 5dd0ee08..47307e68 100644
--- a/src/__tests__/connection.spec.ts
+++ b/src/__tests__/connection.spec.ts
@@ -5,6 +5,7 @@ import { deserializers } from '../deserializers'
import { Socket as OrgSocket } from 'net'
import { Socket as MockSocket } from '../__mocks__/net'
import { Commands } from '../commands'
+import { BasicCasparCGAPI } from '../api'
jest.mock('net')
@@ -56,6 +57,14 @@ describe('connection', () => {
socket.onClose = onSocketClose
})
}
+
+ function extractReqId(index: number) {
+ const str = onSocketWrite.mock.calls[index - 1][0]
+ const match = str.match(/REQ (\w+) /)
+ if (!match) throw new Error(`Failed to find REQ id in "${str}"`)
+ return match[1]
+ }
+
beforeEach(() => {
setupSocketMock()
})
@@ -110,7 +119,7 @@ describe('connection', () => {
)
// Wait for deserializer to run
- await new Promise(process.nextTick.bind(process))
+ await new Promise(setImmediate)
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)
@@ -167,7 +176,7 @@ describe('connection', () => {
sockets[0].mockData(Buffer.from(`\r\n\r\n`))
// Wait for deserializer to run
- await new Promise(process.nextTick.bind(process))
+ await new Promise(setImmediate)
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(1)
@@ -243,9 +252,7 @@ describe('connection', () => {
sockets[0].mockData(Buffer.from(`RES cmd2 202 PLAY OK\r\n`))
// Wait for deserializer to run
- await new Promise(process.nextTick.bind(process))
- await new Promise(process.nextTick.bind(process))
- await new Promise(process.nextTick.bind(process))
+ await new Promise(setImmediate)
expect(onConnError).toHaveBeenCalledTimes(0)
expect(onConnData).toHaveBeenCalledTimes(2)
@@ -321,19 +328,26 @@ describe('connection', () => {
expect(onSocketWrite).toHaveBeenNthCalledWith(2, 'REQ cmd2 PLAY 1-10\r\n', 'utf-8')
// Reply with a blob designed to crash the xml parser
- sockets[0].mockData(Buffer.from(`201 INFO OK\r\n {
conn.disconnect()
}
})
+
+ it('test with full connection', async () => {
+ const client = new BasicCasparCGAPI({
+ host: '127.0.0.1',
+ port: 5250,
+ autoConnect: true,
+ })
+ try {
+ expect(client).toBeTruthy()
+
+ const onConnError = jest.fn()
+ // const onConnData = jest.fn()
+ client.on('error', onConnError)
+ // client.on('data', onConnData)
+
+ const onCommandOk = jest.fn()
+ const onCommandError = jest.fn()
+
+ const sockets = SocketMock.openSockets()
+ expect(sockets).toHaveLength(1)
+
+ // Dispatch a command
+ const sendError = await client.executeCommand({
+ command: Commands.Info,
+ params: {},
+ })
+ sendError.request?.then(onCommandOk, onCommandError)
+ const sendError2 = await client.executeCommand({
+ command: Commands.Play,
+ params: {
+ channel: 1,
+ layer: 10,
+ },
+ })
+ sendError2.request?.then(onCommandOk, onCommandError)
+ expect(onConnError).toHaveBeenCalledTimes(0)
+ expect(onCommandOk).toHaveBeenCalledTimes(0)
+ expect(onCommandError).toHaveBeenCalledTimes(0)
+
+ // Info was sent
+ expect(onSocketWrite).toHaveBeenCalledTimes(2)
+ expect(onSocketWrite).toHaveBeenNthCalledWith(1, expect.stringMatching(/REQ (\w+) INFO\r\n/), 'utf-8')
+ expect(onSocketWrite).toHaveBeenNthCalledWith(
+ 2,
+ expect.stringMatching(/REQ (\w+) PLAY 1-10\r\n/),
+ 'utf-8'
+ )
+
+ // Reply with a blob designed to crash the xml parser
+ const infoReqId = extractReqId(1)
+ sockets[0].mockData(Buffer.from(`RES ${infoReqId} 201 INFO OK\r\n {
private _socket?: Socket
+ private _unprocessedData = ''
private _unprocessedLines: string[] = []
private _reconnectTimeout?: NodeJS.Timeout
private _connected = false
@@ -125,20 +126,25 @@ export class Connection extends EventEmitter {
})
}
- private async _processIncomingData(data: Buffer) {
- const string = data.toString('utf-8')
- const newLines = string.split('\r\n')
-
+ private _processIncomingData(data: Buffer) {
+ /**
+ * This is a simple strategy to handle receiving newline separated data, factoring in arbitrary TCP fragmentation.
+ * It is common for a long response to be split across multiple packets, most likely with the split happening in the middle of a line.
+ */
+ this._unprocessedData += data.toString('utf-8')
+ const newLines = this._unprocessedData.split('\r\n')
+ // Pop and preserve the last fragment as unprocessed. In most cases this will be an empty string, but it could be the first portion of a line
+ this._unprocessedData = newLines.pop() ?? ''
this._unprocessedLines.push(...newLines)
while (this._unprocessedLines.length > 0) {
const result = RESPONSE_REGEX.exec(this._unprocessedLines[0])
let processedLines = 0
- if (result && result.groups?.['ResponseCode']) {
+ if (result?.groups?.['ResponseCode']) {
// create a response object
const responseCode = parseInt(result?.groups?.['ResponseCode'])
- const response = {
+ const response: Response = {
reqId: result?.groups?.['ReqId'],
command: result?.groups?.['Action'] as Commands,
responseCode,
@@ -149,22 +155,41 @@ export class Connection extends EventEmitter {
// parse additional lines if needed
if (response.responseCode === 200) {
+ const indexOfBlankLine = this._unprocessedLines.indexOf('')
+ if (indexOfBlankLine === -1) break // No termination yet, try again later
+
// multiple lines of data
- response.data = this._unprocessedLines.slice(1, this._unprocessedLines.indexOf(''))
+ response.data = this._unprocessedLines.slice(1, indexOfBlankLine)
processedLines += response.data.length + 1 // data lines + 1 empty line
} else if (response.responseCode === 201 || response.responseCode === 400) {
+ if (this._unprocessedLines.length < 2) break // No data line, try again later
+
response.data = [this._unprocessedLines[1]]
processedLines++
}
- const deserializers = this._getVersionedDeserializers()
- // attempt to deserialize the response if we can
- if (deserializers[response.command] && response.data.length) {
- response.data = await deserializers[response.command](response.data)
- }
-
- // now do something with response
- this.emit('data', response)
+ // Parse the command after `this._unprocessedLines` has been updated
+ setImmediate(() => {
+ Promise.resolve()
+ .then(async () => {
+ const deserializers = this._getVersionedDeserializers()
+ // attempt to deserialize the response if we can
+ if (deserializers[response.command] && response.data.length) {
+ response.data = await deserializers[response.command](response.data)
+ }
+
+ // now do something with response
+ this.emit('data', response)
+ })
+ .catch(() => {
+ this.emit('data', {
+ ...response,
+ responseCode: 500, // TODO better value?
+ type: ResponseTypes.ServerError,
+ message: 'Invalid response received.',
+ })
+ })
+ })
} else {
// well this is not happy, do we do something?
// perhaps this is the infamous 100 or 101 response code, although that doesn't appear in casparcg source code
@@ -198,7 +223,11 @@ export class Connection extends EventEmitter {
this._socket.setEncoding('utf-8')
this._socket.on('data', (data) => {
- this._processIncomingData(data).catch((e) => this.emit('error', e))
+ try {
+ this._processIncomingData(data)
+ } catch (e: any) {
+ this.emit('error', e)
+ }
})
this._socket.on('connect', () => {
this._setConnected(true)