From 3d751a26d939fa5525648c38e2b294cc6c7fdeba Mon Sep 17 00:00:00 2001 From: mongodb-matthew-normyle <120420718+mongodb-matthew-normyle@users.noreply.github.com> Date: Fri, 22 Nov 2024 06:25:32 -0600 Subject: [PATCH] feat(shell-api): add options in stream processor start, stop, and drop MONGOSH-1920 (#2274) --- packages/shell-api/src/stream-processor.ts | 13 ++-- packages/shell-api/src/streams.spec.ts | 82 ++++++++++++++++------ 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index a70b9c8c0..789fc9dd7 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -26,27 +26,30 @@ export default class StreamProcessor extends ShellApiWithMongoClass { } @returnsPromise - async start() { + async start(options: Document = {}) { return await this._streams._runStreamCommand({ startStreamProcessor: this.name, + ...options, }); } @returnsPromise - async stop() { + async stop(options: Document = {}) { return await this._streams._runStreamCommand({ stopStreamProcessor: this.name, + ...options, }); } @returnsPromise - async drop() { - return this._drop(); + async drop(options: Document = {}) { + return this._drop(options); } - async _drop() { + async _drop(options: Document = {}) { return await this._streams._runStreamCommand({ dropStreamProcessor: this.name, + ...options, }); } diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 135db7856..8a26b6d71 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -164,18 +164,67 @@ describe('Streams', function () { }); }); + // Create a stream processor. + const createProcessor = async (name: string) => { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + return { runCmdStub, processor }; + }; + + // Validate supplying options in start,stop, and drop commands. + describe('options', function () { + it('supplies options in start, stop, and drop', async function () { + const name = 'testOptions'; + const { runCmdStub, processor } = await createProcessor(name); + + // Start the stream processor with an extra option. + await processor.start({ resumeFromCheckpoint: false }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { startStreamProcessor: name, resumeFromCheckpoint: false }, + {} + ) + ).to.be.true; + + // Stop the stream processor with an extra option. + await processor.stop({ force: true }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { stopStreamProcessor: name, force: true }, + {} + ) + ).to.be.true; + + // Drop the stream processor with a few extra options. + const opts = { + force: true, + ttl: { unit: 'day', size: 30 }, + }; + await processor.drop(opts); + expect( + runCmdStub.calledWithExactly( + 'admin', + { + dropStreamProcessor: name, + ...opts, + }, + {} + ) + ).to.be.true; + }); + }); + describe('modify', function () { it('throws with invalid parameters', async function () { - // Create the stream processor. - const runCmdStub = sinon - .stub(mongo._serviceProvider, 'runCommand') - .resolves({ ok: 1 }); - const name = 'p1'; - const pipeline = [{ $match: { foo: 'bar' } }]; - const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); - const cmd = { createStreamProcessor: name, pipeline }; - expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + const { processor } = await createProcessor('testModify'); // No arguments to modify. const caught = await processor @@ -206,17 +255,8 @@ describe('Streams', function () { }); it('works with pipeline and options arguments', async function () { - const runCmdStub = sinon - .stub(mongo._serviceProvider, 'runCommand') - .resolves({ ok: 1 }); - - // Create the stream processor. - const name = 'p1'; - const pipeline = [{ $match: { foo: 'bar' } }]; - const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); - const cmd = { createStreamProcessor: name, pipeline }; - expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + const name = 'testModify'; + const { runCmdStub, processor } = await createProcessor(name); // Start the stream processor. await processor.start();