diff --git a/packages/i18n/src/locales/en_US.ts b/packages/i18n/src/locales/en_US.ts index 5272483c2..c78a1da83 100644 --- a/packages/i18n/src/locales/en_US.ts +++ b/packages/i18n/src/locales/en_US.ts @@ -2771,6 +2771,9 @@ const translations: Catalog = { description: 'Return stats captured from a named stream processor.', }, + modify: { + description: 'Modify a stream processor definition.', + }, }, }, }, diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 26590d542..a70b9c8c0 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -1,4 +1,5 @@ import type { Document } from '@mongosh/service-provider-core'; +import { CommonErrors, MongoshInvalidInputError } from '@mongosh/errors'; import type Mongo from './mongo'; import { asPrintable } from './enums'; @@ -57,6 +58,46 @@ export default class StreamProcessor extends ShellApiWithMongoClass { }); } + /** + * modify is used to modify a stream processor definition, like below: + * Change the pipeline: + * sp.name.modify(newPipeline) + * Keep the same pipeline, change other options: + * sp.name.modify({resumeFromCheckpoint: false}) + * Change the pipeline and set additional options: + * sp.name.modify(newPipeline, {resumeFromCheckpoint: false}) + */ + async modify(options: Document): Promise; + async modify(pipeline: Document[], options?: Document): Promise; + + @returnsPromise + async modify( + pipelineOrOptions: Document[] | Document, + options?: Document + ): Promise { + if (Array.isArray(pipelineOrOptions)) { + options = { ...options, pipeline: pipelineOrOptions }; + } else if (typeof pipelineOrOptions === 'object') { + if (options) { + throw new MongoshInvalidInputError( + 'If the first argument to modify is an object, the second argument should not be specified.', + CommonErrors.InvalidArgument + ); + } + options = { ...pipelineOrOptions }; + } else { + throw new MongoshInvalidInputError( + 'The first argument to modify must be an array or object.', + CommonErrors.InvalidArgument + ); + } + + return this._streams._runStreamCommand({ + modifyStreamProcessor: this.name, + ...options, + }); + } + @returnsPromise async sample(options: Document = {}) { const r = await this._streams._runStreamCommand({ diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 7b649fbee..135db7856 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -5,6 +5,7 @@ import type Mongo from './mongo'; import Database from './database'; import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; +import type { MongoshInvalidInputError } from '@mongosh/errors'; describe('Streams', function () { let mongo: Mongo; @@ -162,4 +163,112 @@ describe('Streams', function () { ).to.be.true; }); }); + + describe('modify', function () { + it('throws with invalid parameters', async function () { + // Create the stream processor. + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + const name = 'p1'; + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + + // No arguments to modify. + const caught = await processor + .modify() + .catch((e: MongoshInvalidInputError) => e); + expect(caught.message).to.contain( + '[COMMON-10001] The first argument to modify must be an array or object.' + ); + + // A single numeric argument to modify. + const caught2 = await processor + .modify(1) + .catch((e: MongoshInvalidInputError) => e); + expect(caught2.message).to.contain( + '[COMMON-10001] The first argument to modify must be an array or object.' + ); + + // Two object arguments to modify. + const caught3 = await processor + .modify( + { resumeFromCheckpoint: false }, + { dlq: { connectionName: 'foo' } } + ) + .catch((e: MongoshInvalidInputError) => e); + expect(caught3.message).to.contain( + '[COMMON-10001] If the first argument to modify is an object, the second argument should not be specified.' + ); + }); + + it('works with pipeline and options arguments', async function () { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + + // Create the stream processor. + const name = 'p1'; + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + + // Start the stream processor. + await processor.start(); + expect( + runCmdStub.calledWithExactly( + 'admin', + { startStreamProcessor: name }, + {} + ) + ).to.be.true; + + // Stop the stream processor. + await processor.stop(); + expect( + runCmdStub.calledWithExactly('admin', { stopStreamProcessor: name }, {}) + ).to.be.true; + + // Modify the stream processor. + const pipeline2 = [{ $match: { foo: 'baz' } }]; + processor.modify(pipeline2); + expect( + runCmdStub.calledWithExactly( + 'admin', + { modifyStreamProcessor: name, pipeline: pipeline2 }, + {} + ) + ).to.be.true; + + // Modify the stream processor with extra options. + const pipeline3 = [{ $match: { foo: 'bat' } }]; + processor.modify(pipeline3, { resumeFromCheckpoint: false }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { + modifyStreamProcessor: name, + pipeline: pipeline3, + resumeFromCheckpoint: false, + }, + {} + ) + ).to.be.true; + + // Modify the stream processor without changing pipeline. + processor.modify({ resumeFromCheckpoint: false }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { modifyStreamProcessor: name, resumeFromCheckpoint: false }, + {} + ) + ).to.be.true; + }); + }); });