Skip to content

Commit

Permalink
Unexpectedly Stopped issue fix (#11) (#22) (1.2.1-alpha.1)
Browse files Browse the repository at this point in the history
* Example file with mandatory ENV vars
* Unexpectedly stopped issue (#11) fixed (hope so)
* Expose removeListener method from EventEmitter enhancement (#17)
* Version bumped 1.2.0-alpha.5 -> 1.2.1-alpha.1
  • Loading branch information
boonya authored Aug 27, 2020
1 parent 7816496 commit d18e656
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 103 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rtsp-video-recorder",
"version": "1.2.0-alpha.4",
"version": "1.2.1-alpha.1",
"description": "Provide an API to record rtsp video stream to filesystem.",
"main": "dist/recorder.js",
"types": "dist/recorder.d.ts",
Expand Down
33 changes: 21 additions & 12 deletions src/example.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// tslint:disable no-console
import readline from 'readline';
import Recorder, { RecorderEvents } from './recorder';

Expand All @@ -21,19 +22,22 @@ try {
DIRECTORY_PATTERN,
FILENAME_PATTERN,
AUTO_CLEAR,
DESTINATION,
} = process.env;

const ip = IP || '192.168.0.100';
const title = TITLE || 'Test cam';
if (!IP || !DESTINATION) {
throw new Error('You have to specify at least IP & DESTINATION.');
}

const title = TITLE || 'Example cam';
const segmentTime = SEGMENT_TIME || '10m';
const dirSizeThreshold = THRESHOLD || '500M';
const autoClear = AUTO_CLEAR === 'false' ? false : true;
const autoClear = AUTO_CLEAR === 'true' ? true : false;
const directoryPattern = DIRECTORY_PATTERN || '%Y.%m.%d';
const filenamePattern = FILENAME_PATTERN || `%H.%M.%S-${title}`;

const recorder = new Recorder(
`rtsp://${ip}:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream`,
'dist/Recorder',
`rtsp://${IP}:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream`, DESTINATION,
{
title,
segmentTime,
Expand All @@ -53,28 +57,33 @@ try {
.on(RecorderEvents.STOP, log(RecorderEvents.STOP))
// .on(RecorderEvents.PROGRESS, log(RecorderEvents.PROGRESS))
.on(RecorderEvents.SPACE_FULL, log(RecorderEvents.SPACE_FULL))
.on(RecorderEvents.SPACE_WIPED, log(RecorderEvents.SPACE_WIPED));
.on(RecorderEvents.SPACE_WIPED, log(RecorderEvents.SPACE_WIPED))
.start();

process.stdin.on('keypress', (_, key) => {
if (key.ctrl && key.name === 'c') {
if (recorder.isRecording()) {
recorder
.on(RecorderEvents.STOPPED, () => {
console.log('Gracefully stopped.');
process.exit();
setTimeout(() => {
console.log('Gracefully stopped.');
process.exit();
}, 2000);
})
.stop();
} else {
process.exit();
}
} else if (key.name === 'space') {
recorder.isRecording()
? recorder.stop()
: recorder.start();
if (recorder.isRecording()) {
recorder.stop()
} else {
recorder.start();
}
}
});
console.log('Press "space" to start/stop recording, "ctrl + c" to stop a process.');
console.log();
} catch (err) {
console.error(err.message, { err });
console.error(err);
}
18 changes: 13 additions & 5 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ export const clearSpace = async (root: string) => {

export const getOldestObject = (listing: string[]) => {
const result = listing.map((path) => {
return {
path,
created: fs.lstatSync(path).birthtimeMs,
};
}).reduce(
try {
return {
path,
created: fs.lstatSync(path).birthtimeMs,
};
} catch (err) {
// tslint:disable no-console
console.info('getOldestObject', { listing, path, err });
return { path, created: Infinity };
}
})
.filter((item) => item.created !== Infinity)
.reduce(
(acc, cur) => acc.created > cur.created ? cur : acc,
{ path: '', created: Infinity },
);
Expand Down
124 changes: 67 additions & 57 deletions src/recorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,24 @@ export default class Recorder implements IRecorder {
return this;
}

public removeListener = (event: Events, callback: EventCallback) => {
this.eventEmitter.removeListener(event, callback);
return this;
}

public isRecording = () => Boolean(this.process);

private startRecord = () => {
if (this.process) {
throw new RecorderError('Process already spawned.');
}

this.eventEmitter.on(Events.PROGRESS, this.onProgress);
this.eventEmitter.on(Events.SEGMENT_STARTED, this.onSegmentStarted);
this.eventEmitter.on(Events.SPACE_FULL, this.onSpaceFull);
this.eventEmitter.on(Events.PROGRESS, this.onProgress);
this.eventEmitter.on(Events.STOP, this.stopRecord);

this.process = this.spawnFFMPEG();

// TODO: To spawn STARTED event here in case process is really started only.
this.eventEmitter.emit(Events.STARTED, {
path: this.path,
uri: this.uri,
segmentTime: this.segmentTime,
directoryPattern: this.directoryPattern,
filenamePattern: this.filenamePattern,
dirSizeThreshold: this.dirSizeThreshold,
autoClear: this.autoClear,
title: this.title,
ffmpegBinary: this.ffmpegBinary,
});
}

private stopRecord = async () => {
Expand All @@ -120,10 +112,17 @@ export default class Recorder implements IRecorder {
this.process.kill();
this.process = null;
if (this.previousSegment) {
await this.moveSegment(this.previousSegment);
try {
await this.moveSegment(this.previousSegment);
} catch (err) {
this.eventEmitter.emit(Events.ERROR, err);
}
this.previousSegment = undefined;
}
this.eventEmitter.emit(Events.STOPPED, 0, 'Programmatically stopped.');
this.eventEmitter.removeListener(Events.PROGRESS, this.onProgress);
this.eventEmitter.removeListener(Events.SEGMENT_STARTED, this.onSegmentStarted);
this.eventEmitter.removeListener(Events.SPACE_FULL, this.onSpaceFull);
this.eventEmitter.removeListener(Events.STOP, this.stopRecord);
}

private spawnFFMPEG = () => {
Expand Down Expand Up @@ -153,21 +152,8 @@ export default class Recorder implements IRecorder {
);

process.stderr.on('data', (buffer: Buffer) => {
try {
const message = buffer.toString();
const current = this.handleProgressBuffer(message);
if (current) {
this.eventEmitter.emit(Events.SEGMENT_STARTED, {
current,
previous: this.previousSegment,
});
this.previousSegment = current;
}
this.eventEmitter.emit(Events.PROGRESS, message);
} catch (err) {
this.eventEmitter.emit(Events.ERROR, err);
this.eventEmitter.emit(Events.STOP, 'Error', err);
}
const message = buffer.toString();
this.eventEmitter.emit(Events.PROGRESS, message);
});

process.on('error', (error: string) => {
Expand All @@ -181,9 +167,43 @@ export default class Recorder implements IRecorder {
return process;
}

private onProgress = (message: string) => {
try {
const current = this.handleProgressBuffer(message);
if (current) {
if (!this.previousSegment) {
this.eventEmitter.emit(Events.STARTED, {
path: this.path,
uri: this.uri,
segmentTime: this.segmentTime,
directoryPattern: this.directoryPattern,
filenamePattern: this.filenamePattern,
dirSizeThreshold: this.dirSizeThreshold,
autoClear: this.autoClear,
title: this.title,
ffmpegBinary: this.ffmpegBinary,
});
}
this.eventEmitter.emit(Events.SEGMENT_STARTED, {
current,
previous: this.previousSegment,
});
this.previousSegment = current;
}
} catch (err) {
this.eventEmitter.emit(Events.ERROR, err);
this.eventEmitter.emit(Events.STOP, 'Error', err);
}
}

private onSegmentStarted = async ({ previous }: SegmentStartedArg) => {
if (previous) {
await this.moveSegment(previous);
try {
if (previous) {
await this.moveSegment(previous);
}
await this.ensureSpaceEnough();
} catch (err) {
this.eventEmitter.emit(Events.ERROR, err);
}
}

Expand All @@ -209,18 +229,9 @@ export default class Recorder implements IRecorder {
}
}

private onProgress = async () => {
try {
await this.ensureSpaceEnough();
} catch (err) {
this.eventEmitter.emit(Events.ERROR, err);
this.eventEmitter.emit(Events.STOP);
}
}

private ensureSpaceEnough = async () => {
if (!this.dirSizeThreshold) {
return;
return true;
}

const used = await du(this.path, { disk: true });
Expand All @@ -231,7 +242,9 @@ export default class Recorder implements IRecorder {
threshold: this.dirSizeThreshold,
used,
});
return false;
}
return true;
}

private handleProgressBuffer = (message: string) => {
Expand Down Expand Up @@ -270,20 +283,17 @@ export default class Recorder implements IRecorder {
}

private moveSegment = async (path: string) => {
try {
const { dirpath, dirname, filename } = this.parseSegmentPath(path);
await this.ensureDirectory(dirpath, dirname);
const target = `${dirpath}/${filename}`;
await fse.move(path, target);
this.eventEmitter.emit(Events.FILE_CREATED, {
filepath: target,
dirpath,
dirname,
filename,
});
} catch (err) {
this.eventEmitter.emit(Events.ERROR, err);
}
const { dirpath, dirname, filename } = this.parseSegmentPath(path);
await this.ensureDirectory(dirpath, dirname);
const target = `${dirpath}/${filename}`;
await fse.move(path, target);
this.eventEmitter.emit(Events.FILE_CREATED, {
source: path,
filepath: target,
dirpath,
dirname,
filename,
});
}
}

Expand Down
Loading

0 comments on commit d18e656

Please sign in to comment.