From 581491c00d6b0799dd7d209f9331676cc740e1f2 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 16:29:06 +0000 Subject: [PATCH 01/13] Add modify in stream-processor.ts --- packages/shell-api/src/stream-processor.ts | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 26590d542..89ac6ee5d 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -1,4 +1,5 @@ import type { Document } from '@mongosh/service-provider-core'; +import { CommonErrors, MongoshInvalidInputError } from '@mongosh/errors'; import type Mongo from './mongo'; import { asPrintable } from './enums'; @@ -57,6 +58,31 @@ export default class StreamProcessor extends ShellApiWithMongoClass { }); } + /** + * modify can be used like so: + * sp.name.modify(pipeline) + * sp.name.modify(pipeline, {resumeFromCheckpoint: false}) + * sp.name.modify({resumeFromCheckpoint: false}) + */ + @returnsPromise + async modify(pipelineOrOptions: Document[] | Document, options: Document = {}) { + if (Array.isArray(pipelineOrOptions)) { + options["pipeline"] = pipelineOrOptions; + } else if (typeof pipelineOrOptions == "object") { + options = {...options, ...pipelineOrOptions}; + } else { + throw new MongoshInvalidInputError( + "The first argument to modify must be an array or object.", + CommonErrors.InvalidArgument + ); + } + + return this._streams._runStreamCommand({ + modifyStreamProcessor: this.name, + ...options + }); + } + @returnsPromise async sample(options: Document = {}) { const r = await this._streams._runStreamCommand({ From d25a7c06f0a4a8ef03498cbf4735727dc3f6c745 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 17:25:32 +0000 Subject: [PATCH 02/13] Add tests --- packages/shell-api/src/streams.spec.ts | 93 ++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 7b649fbee..3f8b26447 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -162,4 +162,97 @@ describe('Streams', function () { ).to.be.true; }); }); + + describe('modify', function () { + it('throws with invalid parameters', async function () { + // Create the stream processor. + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + const name = 'p1'; + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + + // No arguments to modify. + const caught = await processor.modify().catch((e: any) => e); + expect(caught.message).to.contain( + '[COMMON-10001] The first argument to modify must be an array or object.' + ); + + // A single numeric argument to modify. + const caught2 = await processor.modify(1).catch((e: any) => e); + expect(caught2.message).to.contain( + '[COMMON-10001] The first argument to modify must be an array or object.' + ); + }); + + it('works with pipeline and options arguments', async function () { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + + // Create the stream processor. + const name = 'p1'; + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + + // Start the stream processor. + await processor.start(); + expect( + runCmdStub.calledWithExactly( + 'admin', + { startStreamProcessor: name }, + {} + ) + ).to.be.true; + + // Stop the stream processor. + await processor.stop(); + expect( + runCmdStub.calledWithExactly('admin', { stopStreamProcessor: name }, {}) + ).to.be.true; + + // Modify the stream processor. + const pipeline2 = [{ $match: { foo: 'baz' } }]; + processor.modify(pipeline2); + expect( + runCmdStub.calledWithExactly( + 'admin', + { modifyStreamProcessor: name, pipeline: pipeline2 }, + {} + ) + ).to.be.true; + + // Modify the stream processor with extra options. + const pipeline3 = [{ $match: { foo: 'bat' } }]; + processor.modify(pipeline3, { resumeFromCheckpoint: false }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { + modifyStreamProcessor: name, + pipeline: pipeline3, + resumeFromCheckpoint: false, + }, + {} + ) + ).to.be.true; + + // Modify the stream processor without changing pipeline. + processor.modify({ resumeFromCheckpoint: false }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { modifyStreamProcessor: name, resumeFromCheckpoint: false }, + {} + ) + ).to.be.true; + }); + }); }); From 743b05d48b223b6c7cfb7e10d0e8cf65a50b4c0d Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 17:27:52 +0000 Subject: [PATCH 03/13] cleanup comments --- packages/shell-api/src/stream-processor.ts | 29 +++++++++++++--------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 89ac6ee5d..98568f7e8 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -60,26 +60,31 @@ export default class StreamProcessor extends ShellApiWithMongoClass { /** * modify can be used like so: - * sp.name.modify(pipeline) - * sp.name.modify(pipeline, {resumeFromCheckpoint: false}) - * sp.name.modify({resumeFromCheckpoint: false}) + * Change the pipeline: + * sp.name.modify(pipeline) + * sp.name.modify(pipeline, {resumeFromCheckpoint: false}) + * For modify requests that don't change the pipeline: + * sp.name.modify({resumeFromCheckpoint: false}) */ @returnsPromise - async modify(pipelineOrOptions: Document[] | Document, options: Document = {}) { + async modify( + pipelineOrOptions: Document[] | Document, + options: Document = {} + ) { if (Array.isArray(pipelineOrOptions)) { - options["pipeline"] = pipelineOrOptions; - } else if (typeof pipelineOrOptions == "object") { - options = {...options, ...pipelineOrOptions}; + options['pipeline'] = pipelineOrOptions; + } else if (typeof pipelineOrOptions == 'object') { + options = { ...options, ...pipelineOrOptions }; } else { - throw new MongoshInvalidInputError( - "The first argument to modify must be an array or object.", - CommonErrors.InvalidArgument - ); + throw new MongoshInvalidInputError( + 'The first argument to modify must be an array or object.', + CommonErrors.InvalidArgument + ); } return this._streams._runStreamCommand({ modifyStreamProcessor: this.name, - ...options + ...options, }); } From 3e691c58d81461e72096b0562f3013553f182c06 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 18:53:32 +0000 Subject: [PATCH 04/13] Fix explicit any and use === instead of == --- packages/shell-api/src/stream-processor.ts | 2 +- packages/shell-api/src/streams.spec.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 98568f7e8..c8adf47e0 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -73,7 +73,7 @@ export default class StreamProcessor extends ShellApiWithMongoClass { ) { if (Array.isArray(pipelineOrOptions)) { options['pipeline'] = pipelineOrOptions; - } else if (typeof pipelineOrOptions == 'object') { + } else if (typeof pipelineOrOptions === 'object') { options = { ...options, ...pipelineOrOptions }; } else { throw new MongoshInvalidInputError( diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 3f8b26447..55e07fd2e 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -177,13 +177,13 @@ describe('Streams', function () { expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; // No arguments to modify. - const caught = await processor.modify().catch((e: any) => e); + const caught = await processor.modify().catch((e) => e); expect(caught.message).to.contain( '[COMMON-10001] The first argument to modify must be an array or object.' ); // A single numeric argument to modify. - const caught2 = await processor.modify(1).catch((e: any) => e); + const caught2 = await processor.modify(1).catch((e) => e); expect(caught2.message).to.contain( '[COMMON-10001] The first argument to modify must be an array or object.' ); From e33e3d7e9acb6d20886880105376a0bd4da663bd Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 19:13:14 +0000 Subject: [PATCH 05/13] add MongoshInvalidInputError type to catch block --- packages/shell-api/src/streams.spec.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 55e07fd2e..201cfae12 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -5,6 +5,7 @@ import type Mongo from './mongo'; import Database from './database'; import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; +import { MongoshInvalidInputError } from '@mongosh/errors'; describe('Streams', function () { let mongo: Mongo; @@ -177,13 +178,17 @@ describe('Streams', function () { expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; // No arguments to modify. - const caught = await processor.modify().catch((e) => e); + const caught = await processor + .modify() + .catch((e: MongoshInvalidInputError) => e); expect(caught.message).to.contain( '[COMMON-10001] The first argument to modify must be an array or object.' ); // A single numeric argument to modify. - const caught2 = await processor.modify(1).catch((e) => e); + const caught2 = await processor + .modify(1) + .catch((e: MongoshInvalidInputError) => e); expect(caught2.message).to.contain( '[COMMON-10001] The first argument to modify must be an array or object.' ); From 9197593302c267b2e97bdb4a0f708aa5fa9005fe Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 19:14:15 +0000 Subject: [PATCH 06/13] comment clarification --- packages/shell-api/src/stream-processor.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index c8adf47e0..1502f1776 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -62,6 +62,7 @@ export default class StreamProcessor extends ShellApiWithMongoClass { * modify can be used like so: * Change the pipeline: * sp.name.modify(pipeline) + * Change the pipeline with additional options: * sp.name.modify(pipeline, {resumeFromCheckpoint: false}) * For modify requests that don't change the pipeline: * sp.name.modify({resumeFromCheckpoint: false}) From dac25bfa586cff0e9718801d3828e132f640c271 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 19:37:10 +0000 Subject: [PATCH 07/13] Use import type --- packages/shell-api/src/streams.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 201cfae12..00983ccf8 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -5,7 +5,7 @@ import type Mongo from './mongo'; import Database from './database'; import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; -import { MongoshInvalidInputError } from '@mongosh/errors'; +import type { MongoshInvalidInputError } from '@mongosh/errors'; describe('Streams', function () { let mongo: Mongo; From 434f6c355cd889b63bad94204448e9efe016d757 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 19:42:47 +0000 Subject: [PATCH 08/13] add one more parameter validation --- packages/shell-api/src/stream-processor.ts | 8 +++++++- packages/shell-api/src/streams.spec.ts | 11 +++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 1502f1776..8b259de5a 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -75,7 +75,13 @@ export default class StreamProcessor extends ShellApiWithMongoClass { if (Array.isArray(pipelineOrOptions)) { options['pipeline'] = pipelineOrOptions; } else if (typeof pipelineOrOptions === 'object') { - options = { ...options, ...pipelineOrOptions }; + if (Object.keys(options).length != 0) { + throw new MongoshInvalidInputError( + 'If the first argument to modify is an object, the second argument should not be specified.', + CommonErrors.InvalidArgument + ); + } + options = { ...pipelineOrOptions }; } else { throw new MongoshInvalidInputError( 'The first argument to modify must be an array or object.', diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 00983ccf8..135db7856 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -192,6 +192,17 @@ describe('Streams', function () { expect(caught2.message).to.contain( '[COMMON-10001] The first argument to modify must be an array or object.' ); + + // Two object arguments to modify. + const caught3 = await processor + .modify( + { resumeFromCheckpoint: false }, + { dlq: { connectionName: 'foo' } } + ) + .catch((e: MongoshInvalidInputError) => e); + expect(caught3.message).to.contain( + '[COMMON-10001] If the first argument to modify is an object, the second argument should not be specified.' + ); }); it('works with pipeline and options arguments', async function () { From 3652b83cd18548c046c48507fba3da7bdcac7065 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 20:27:35 +0000 Subject: [PATCH 09/13] use !== instead of != --- packages/shell-api/src/stream-processor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 8b259de5a..0bd0c3fc6 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -75,7 +75,7 @@ export default class StreamProcessor extends ShellApiWithMongoClass { if (Array.isArray(pipelineOrOptions)) { options['pipeline'] = pipelineOrOptions; } else if (typeof pipelineOrOptions === 'object') { - if (Object.keys(options).length != 0) { + if (Object.keys(options).length !== 0) { throw new MongoshInvalidInputError( 'If the first argument to modify is an object, the second argument should not be specified.', CommonErrors.InvalidArgument From 3a637257091bd2d6b2e4ef06da75d1195a1b3a13 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Tue, 29 Oct 2024 21:06:37 +0000 Subject: [PATCH 10/13] Add modify description in en_US --- packages/i18n/src/locales/en_US.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/i18n/src/locales/en_US.ts b/packages/i18n/src/locales/en_US.ts index 5272483c2..c78a1da83 100644 --- a/packages/i18n/src/locales/en_US.ts +++ b/packages/i18n/src/locales/en_US.ts @@ -2771,6 +2771,9 @@ const translations: Catalog = { description: 'Return stats captured from a named stream processor.', }, + modify: { + description: 'Modify a stream processor definition.', + }, }, }, }, From ec0984d7f5a8d97b180bedb8f2ee87185ec03d2d Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Wed, 30 Oct 2024 18:41:55 +0000 Subject: [PATCH 11/13] Add TS overloads and return type definitions. Dont modofy the options argument --- packages/shell-api/src/stream-processor.ts | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 0bd0c3fc6..e06839af4 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -59,21 +59,26 @@ export default class StreamProcessor extends ShellApiWithMongoClass { } /** - * modify can be used like so: + * modify is used to modify a stream processor definition, like below: * Change the pipeline: - * sp.name.modify(pipeline) - * Change the pipeline with additional options: - * sp.name.modify(pipeline, {resumeFromCheckpoint: false}) - * For modify requests that don't change the pipeline: - * sp.name.modify({resumeFromCheckpoint: false}) + * sp.name.modify(newPipeline) + * Keep the same pipeline, change other options: + * sp.name.modify({resumeFromCheckpoint: false}) + */ + async modify(pipelineOrOptions: Document[] | Document): Promise; + + /** + * modify is used to modify a stream processor definition, like below: + * Change the pipeline and set additional options: + * sp.name.modify(newPipeline, {resumeFromCheckpoint: false}) */ @returnsPromise async modify( pipelineOrOptions: Document[] | Document, options: Document = {} - ) { + ): Promise { if (Array.isArray(pipelineOrOptions)) { - options['pipeline'] = pipelineOrOptions; + options = { ...options, pipeline: pipelineOrOptions }; } else if (typeof pipelineOrOptions === 'object') { if (Object.keys(options).length !== 0) { throw new MongoshInvalidInputError( From e136f6a0b22629da9f7be9cdb597d0195564fc4a Mon Sep 17 00:00:00 2001 From: mongodb-matthew-normyle <120420718+mongodb-matthew-normyle@users.noreply.github.com> Date: Thu, 31 Oct 2024 08:58:58 -0500 Subject: [PATCH 12/13] Update packages/shell-api/src/stream-processor.ts Co-authored-by: Anna Henningsen --- packages/shell-api/src/stream-processor.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index e06839af4..bb86697da 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -65,7 +65,8 @@ export default class StreamProcessor extends ShellApiWithMongoClass { * Keep the same pipeline, change other options: * sp.name.modify({resumeFromCheckpoint: false}) */ - async modify(pipelineOrOptions: Document[] | Document): Promise; + async modify(options: Document): Promise; + async modify(pipeline: Document[], options?: Document): Promise; /** * modify is used to modify a stream processor definition, like below: From 37cd6b26f056a848454d93ed22a7d13303786cfc Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Thu, 31 Oct 2024 14:01:42 +0000 Subject: [PATCH 13/13] Use optons?: Document --- packages/shell-api/src/stream-processor.ts | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index bb86697da..a70b9c8c0 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -64,24 +64,21 @@ export default class StreamProcessor extends ShellApiWithMongoClass { * sp.name.modify(newPipeline) * Keep the same pipeline, change other options: * sp.name.modify({resumeFromCheckpoint: false}) + * Change the pipeline and set additional options: + * sp.name.modify(newPipeline, {resumeFromCheckpoint: false}) */ async modify(options: Document): Promise; async modify(pipeline: Document[], options?: Document): Promise; - /** - * modify is used to modify a stream processor definition, like below: - * Change the pipeline and set additional options: - * sp.name.modify(newPipeline, {resumeFromCheckpoint: false}) - */ @returnsPromise async modify( pipelineOrOptions: Document[] | Document, - options: Document = {} + options?: Document ): Promise { if (Array.isArray(pipelineOrOptions)) { options = { ...options, pipeline: pipelineOrOptions }; } else if (typeof pipelineOrOptions === 'object') { - if (Object.keys(options).length !== 0) { + if (options) { throw new MongoshInvalidInputError( 'If the first argument to modify is an object, the second argument should not be specified.', CommonErrors.InvalidArgument