diff --git a/package.json b/package.json index fea4223..c3c2c20 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rtsp-video-recorder", - "version": "1.2.0-alpha.4", + "version": "1.2.1-alpha.1", "description": "Provide an API to record rtsp video stream to filesystem.", "main": "dist/recorder.js", "types": "dist/recorder.d.ts", diff --git a/src/example.ts b/src/example.ts index 35ca7d8..41422ea 100644 --- a/src/example.ts +++ b/src/example.ts @@ -1,3 +1,4 @@ +// tslint:disable no-console import readline from 'readline'; import Recorder, { RecorderEvents } from './recorder'; @@ -21,19 +22,22 @@ try { DIRECTORY_PATTERN, FILENAME_PATTERN, AUTO_CLEAR, + DESTINATION, } = process.env; - const ip = IP || '192.168.0.100'; - const title = TITLE || 'Test cam'; + if (!IP || !DESTINATION) { + throw new Error('You have to specify at least IP & DESTINATION.'); + } + + const title = TITLE || 'Example cam'; const segmentTime = SEGMENT_TIME || '10m'; const dirSizeThreshold = THRESHOLD || '500M'; - const autoClear = AUTO_CLEAR === 'false' ? false : true; + const autoClear = AUTO_CLEAR === 'true' ? true : false; const directoryPattern = DIRECTORY_PATTERN || '%Y.%m.%d'; const filenamePattern = FILENAME_PATTERN || `%H.%M.%S-${title}`; const recorder = new Recorder( - `rtsp://${ip}:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream`, - 'dist/Recorder', + `rtsp://${IP}:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream`, DESTINATION, { title, segmentTime, @@ -53,28 +57,33 @@ try { .on(RecorderEvents.STOP, log(RecorderEvents.STOP)) // .on(RecorderEvents.PROGRESS, log(RecorderEvents.PROGRESS)) .on(RecorderEvents.SPACE_FULL, log(RecorderEvents.SPACE_FULL)) - .on(RecorderEvents.SPACE_WIPED, log(RecorderEvents.SPACE_WIPED)); + .on(RecorderEvents.SPACE_WIPED, log(RecorderEvents.SPACE_WIPED)) + .start(); process.stdin.on('keypress', (_, key) => { if (key.ctrl && key.name === 'c') { if (recorder.isRecording()) { recorder .on(RecorderEvents.STOPPED, () => { - console.log('Gracefully stopped.'); - process.exit(); + setTimeout(() => { + console.log('Gracefully stopped.'); + process.exit(); + }, 2000); }) .stop(); } else { process.exit(); } } else if (key.name === 'space') { - recorder.isRecording() - ? recorder.stop() - : recorder.start(); + if (recorder.isRecording()) { + recorder.stop() + } else { + recorder.start(); + } } }); console.log('Press "space" to start/stop recording, "ctrl + c" to stop a process.'); console.log(); } catch (err) { - console.error(err.message, { err }); + console.error(err); } diff --git a/src/helpers.ts b/src/helpers.ts index 3884d66..a88eb50 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -45,11 +45,19 @@ export const clearSpace = async (root: string) => { export const getOldestObject = (listing: string[]) => { const result = listing.map((path) => { - return { - path, - created: fs.lstatSync(path).birthtimeMs, - }; - }).reduce( + try { + return { + path, + created: fs.lstatSync(path).birthtimeMs, + }; + } catch (err) { + // tslint:disable no-console + console.info('getOldestObject', { listing, path, err }); + return { path, created: Infinity }; + } + }) + .filter((item) => item.created !== Infinity) + .reduce( (acc, cur) => acc.created > cur.created ? cur : acc, { path: '', created: Infinity }, ); diff --git a/src/recorder.ts b/src/recorder.ts index ebf5c42..298bba5 100644 --- a/src/recorder.ts +++ b/src/recorder.ts @@ -84,6 +84,11 @@ export default class Recorder implements IRecorder { return this; } + public removeListener = (event: Events, callback: EventCallback) => { + this.eventEmitter.removeListener(event, callback); + return this; + } + public isRecording = () => Boolean(this.process); private startRecord = () => { @@ -91,25 +96,12 @@ export default class Recorder implements IRecorder { throw new RecorderError('Process already spawned.'); } + this.eventEmitter.on(Events.PROGRESS, this.onProgress); this.eventEmitter.on(Events.SEGMENT_STARTED, this.onSegmentStarted); this.eventEmitter.on(Events.SPACE_FULL, this.onSpaceFull); - this.eventEmitter.on(Events.PROGRESS, this.onProgress); this.eventEmitter.on(Events.STOP, this.stopRecord); this.process = this.spawnFFMPEG(); - - // TODO: To spawn STARTED event here in case process is really started only. - this.eventEmitter.emit(Events.STARTED, { - path: this.path, - uri: this.uri, - segmentTime: this.segmentTime, - directoryPattern: this.directoryPattern, - filenamePattern: this.filenamePattern, - dirSizeThreshold: this.dirSizeThreshold, - autoClear: this.autoClear, - title: this.title, - ffmpegBinary: this.ffmpegBinary, - }); } private stopRecord = async () => { @@ -120,10 +112,17 @@ export default class Recorder implements IRecorder { this.process.kill(); this.process = null; if (this.previousSegment) { - await this.moveSegment(this.previousSegment); + try { + await this.moveSegment(this.previousSegment); + } catch (err) { + this.eventEmitter.emit(Events.ERROR, err); + } this.previousSegment = undefined; } - this.eventEmitter.emit(Events.STOPPED, 0, 'Programmatically stopped.'); + this.eventEmitter.removeListener(Events.PROGRESS, this.onProgress); + this.eventEmitter.removeListener(Events.SEGMENT_STARTED, this.onSegmentStarted); + this.eventEmitter.removeListener(Events.SPACE_FULL, this.onSpaceFull); + this.eventEmitter.removeListener(Events.STOP, this.stopRecord); } private spawnFFMPEG = () => { @@ -153,21 +152,8 @@ export default class Recorder implements IRecorder { ); process.stderr.on('data', (buffer: Buffer) => { - try { - const message = buffer.toString(); - const current = this.handleProgressBuffer(message); - if (current) { - this.eventEmitter.emit(Events.SEGMENT_STARTED, { - current, - previous: this.previousSegment, - }); - this.previousSegment = current; - } - this.eventEmitter.emit(Events.PROGRESS, message); - } catch (err) { - this.eventEmitter.emit(Events.ERROR, err); - this.eventEmitter.emit(Events.STOP, 'Error', err); - } + const message = buffer.toString(); + this.eventEmitter.emit(Events.PROGRESS, message); }); process.on('error', (error: string) => { @@ -181,9 +167,43 @@ export default class Recorder implements IRecorder { return process; } + private onProgress = (message: string) => { + try { + const current = this.handleProgressBuffer(message); + if (current) { + if (!this.previousSegment) { + this.eventEmitter.emit(Events.STARTED, { + path: this.path, + uri: this.uri, + segmentTime: this.segmentTime, + directoryPattern: this.directoryPattern, + filenamePattern: this.filenamePattern, + dirSizeThreshold: this.dirSizeThreshold, + autoClear: this.autoClear, + title: this.title, + ffmpegBinary: this.ffmpegBinary, + }); + } + this.eventEmitter.emit(Events.SEGMENT_STARTED, { + current, + previous: this.previousSegment, + }); + this.previousSegment = current; + } + } catch (err) { + this.eventEmitter.emit(Events.ERROR, err); + this.eventEmitter.emit(Events.STOP, 'Error', err); + } + } + private onSegmentStarted = async ({ previous }: SegmentStartedArg) => { - if (previous) { - await this.moveSegment(previous); + try { + if (previous) { + await this.moveSegment(previous); + } + await this.ensureSpaceEnough(); + } catch (err) { + this.eventEmitter.emit(Events.ERROR, err); } } @@ -209,18 +229,9 @@ export default class Recorder implements IRecorder { } } - private onProgress = async () => { - try { - await this.ensureSpaceEnough(); - } catch (err) { - this.eventEmitter.emit(Events.ERROR, err); - this.eventEmitter.emit(Events.STOP); - } - } - private ensureSpaceEnough = async () => { if (!this.dirSizeThreshold) { - return; + return true; } const used = await du(this.path, { disk: true }); @@ -231,7 +242,9 @@ export default class Recorder implements IRecorder { threshold: this.dirSizeThreshold, used, }); + return false; } + return true; } private handleProgressBuffer = (message: string) => { @@ -270,20 +283,17 @@ export default class Recorder implements IRecorder { } private moveSegment = async (path: string) => { - try { - const { dirpath, dirname, filename } = this.parseSegmentPath(path); - await this.ensureDirectory(dirpath, dirname); - const target = `${dirpath}/${filename}`; - await fse.move(path, target); - this.eventEmitter.emit(Events.FILE_CREATED, { - filepath: target, - dirpath, - dirname, - filename, - }); - } catch (err) { - this.eventEmitter.emit(Events.ERROR, err); - } + const { dirpath, dirname, filename } = this.parseSegmentPath(path); + await this.ensureDirectory(dirpath, dirname); + const target = `${dirpath}/${filename}`; + await fse.move(path, target); + this.eventEmitter.emit(Events.FILE_CREATED, { + source: path, + filepath: target, + dirpath, + dirname, + filename, + }); } } diff --git a/test/recorder.spec.ts b/test/recorder.spec.ts index e6d8807..35ba9ba 100644 --- a/test/recorder.spec.ts +++ b/test/recorder.spec.ts @@ -75,12 +75,15 @@ describe('Events', () => { describe(RecorderEvents.STARTED, () => { test(`Handler receives an object that contains options applied to the current process. Default values if no options passed.`, (done) => { - const onStarted = jest.fn(() => done()); + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; + const onStarted = jest.fn(() => done()).mockName('onStarted'); new Recorder(URI, PATH) .on(RecorderEvents.STARTED, onStarted) .start(); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); + expect(onStarted).toBeCalledTimes(1); expect(onStarted).toBeCalledWith({ uri: URI, @@ -95,7 +98,8 @@ describe('Events', () => { test(`Handler receives an object that contains options applied to the current process. Converted values in case of some options if passed.`, (done) => { - const onStarted = jest.fn(() => done()); + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; + const onStarted = jest.fn(() => done()).mockName('onStarted'); new Recorder(URI, PATH, { title: 'Test Cam', @@ -109,6 +113,8 @@ describe('Events', () => { .on(RecorderEvents.STARTED, onStarted) .start(); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); + expect(onStarted).toBeCalledTimes(1); expect(onStarted).toBeCalledWith({ uri: URI, @@ -125,22 +131,9 @@ describe('Events', () => { }); describe(RecorderEvents.STOPPED, () => { - test(`If stopped programmatically handler receives 0 exit code & reason message - that it stopped programmatically.`, (done) => { - const onStopped = jest.fn(() => done()); - - new Recorder(URI, PATH) - .on(RecorderEvents.STOPPED, onStopped) - .start() - .stop(); - - expect(onStopped).toBeCalledTimes(1); - expect(onStopped).toBeCalledWith(0, 'Programmatically stopped.'); - }); - test(`If stop reason is FFMPEG process exited handler receives exit code of ffmpeg process and a messae that FFMPEG exited.`, (done) => { - const onStopped = jest.fn(() => done()); + const onStopped = jest.fn(() => done()).mockName('onStopped'); new Recorder(URI, PATH) .on(RecorderEvents.STOPPED, onStopped) @@ -157,7 +150,7 @@ describe('Events', () => { test(`Event handler receives a path to current and previous segments.`, (done) => { const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; const SECOND_SEGMENT = `${PATH}/2020.06.25.10.28.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; - const onSegmentStarted = jest.fn(() => done()); + const onSegmentStarted = jest.fn(() => done()).mockName('onSegmentStarted'); new Recorder(URI, PATH) .on(RecorderEvents.SEGMENT_STARTED, onSegmentStarted) @@ -182,7 +175,7 @@ describe('Events', () => { but directory does not exist.`, async (done) => { const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; const SECOND_SEGMENT = `${PATH}/2020.06.25.10.28.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; - const onDirectoryCreated = jest.fn(() => done()); + const onDirectoryCreated = jest.fn(() => done()).mockName('onDirectoryCreated'); mocked(fs).lstatSync.mockImplementation(() => { throw new Error('Directory does not exist.'); }); @@ -208,7 +201,7 @@ describe('Events', () => { test('New file should be created when new segment started.', async (done) => { const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; const SECOND_SEGMENT = `${PATH}/2020.06.25.10.28.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; - const onFileCreated = jest.fn(() => done()); + const onFileCreated = jest.fn(() => done()).mockName('onFileCreated'); new Recorder(URI, PATH) .on(RecorderEvents.FILE_CREATED, onFileCreated) @@ -222,6 +215,7 @@ describe('Events', () => { expect(onFileCreated).toBeCalledTimes(1); expect(onFileCreated).toBeCalledWith({ + source: FIRST_SEGMENT, dirpath: `${PATH}/2020.06.25`, dirname: '2020.06.25', filepath: `${PATH}/2020.06.25/10.18.04.mp4`, @@ -247,6 +241,7 @@ describe('Events', () => { expect(onFileCreated).toBeCalledTimes(1); expect(onFileCreated).toBeCalledWith({ + source: FIRST_SEGMENT, dirpath: `${PATH}/2020.06.25`, dirname: '2020.06.25', filepath: `${PATH}/2020.06.25/10.18.04.mp4`, @@ -259,6 +254,7 @@ describe('Events', () => { describe(RecorderEvents.SPACE_FULL, () => { test('If no space left an event should be emitted and payload raised.', async (done) => { + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; mocked(du).mockImplementation(() => 496); mocked(fs).readdirSync.mockImplementation(() => []); @@ -268,7 +264,7 @@ describe('Events', () => { .on(RecorderEvents.SPACE_FULL, onSpaceFull) .start(); - fakeProcess.stderr.emit('data', Buffer.from(`Random progress message`, 'utf8')); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top await Promise.resolve(); @@ -281,7 +277,8 @@ describe('Events', () => { }); }); - test('If no space enough an event won\'t be emitted.', async (done) => { + test('If space not enough an event won\'t be emitted.', async (done) => { + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; mocked(du).mockImplementation(() => 400); mocked(fs).readdirSync.mockImplementation(() => []); @@ -291,7 +288,10 @@ describe('Events', () => { .on(RecorderEvents.SPACE_FULL, onSpaceFull) .start(); - fakeProcess.stderr.emit('data', Buffer.from(`Random progress message`, 'utf8')); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); + + // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top + await Promise.resolve(); expect(onSpaceFull).toBeCalledTimes(0); @@ -301,6 +301,7 @@ describe('Events', () => { describe(RecorderEvents.SPACE_WIPED, () => { test('If no space left recorder directory should be wiped. The oldest directory should be removed only.', async (done) => { + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; mocked(du) .mockImplementationOnce(() => 500) .mockImplementationOnce(() => 200); @@ -318,7 +319,7 @@ describe('Events', () => { .on(RecorderEvents.SPACE_WIPED, onSpaceWiped) .start(); - fakeProcess.stderr.emit('data', Buffer.from(`Random progress message`, 'utf8')); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top await Promise.resolve(); @@ -402,6 +403,7 @@ describe('Events', () => { fakeProcess.stderr.emit('data', Buffer.from(`Opening '${SECOND_SEGMENT}' for writing`, 'utf8')); // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top await Promise.resolve(); + await Promise.resolve(); expect(onError).toBeCalledTimes(1); expect(onError).toBeCalledWith(new Error(`${PATH}/2020.06.25 exists but it is not a directory.`)); @@ -423,12 +425,14 @@ describe('Events', () => { fakeProcess.stderr.emit('data', Buffer.from(`Opening '${SECOND_SEGMENT}' for writing`, 'utf8')); // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top await Promise.resolve(); + await Promise.resolve(); expect(onError).toBeCalledTimes(1); expect(onError).toBeCalledWith(new RecorderError('Moving failed.')); }); test('DU has failed for some reason.', async (done) => { + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; const onError = jest.fn(() => done()).mockName('onError'); mocked(du).mockImplementation(() => { throw new Error('DU has failed for some reason.'); @@ -438,7 +442,7 @@ describe('Events', () => { .on(RecorderEvents.ERROR, onError) .start(); - fakeProcess.stderr.emit('data', Buffer.from(`Random progress message`, 'utf8')); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top await Promise.resolve(); @@ -447,23 +451,22 @@ describe('Events', () => { }); test('In case of can\'t remove a directory', async (done) => { + const FIRST_SEGMENT = `${PATH}/2020.06.25.10.18.04.731b9d2bc1c4b8376bc7fb87a3565f7b.mp4`; mocked(du).mockImplementation(() => 500); mocked(fs).readdirSync.mockImplementation(() => []); - const onError = jest.fn().mockName('onError'); + const onError = jest.fn(() => done()).mockName('onError'); new Recorder(URI, PATH, { dirSizeThreshold: 500, autoClear: true }) .on(RecorderEvents.ERROR, onError) .start(); - fakeProcess.stderr.emit('data', Buffer.from(`Random progress message`, 'utf8')); + fakeProcess.stderr.emit('data', Buffer.from(`Opening '${FIRST_SEGMENT}' for writing`, 'utf8')); // https://stackoverflow.com/questions/54890916/jest-fn-claims-not-to-have-been-called-but-has?answertab=active#tab-top await Promise.resolve(); await Promise.resolve(); expect(onError).toBeCalledTimes(1); expect(onError).toBeCalledWith(new Error('Can\'t remove current directory.')); - - done(); }); });