Skip to content

Commit

Permalink
feat(shell-api): add options in stream processor start, stop, and drop
Browse files Browse the repository at this point in the history
  • Loading branch information
mongodb-matthew-normyle committed Nov 22, 2024
1 parent dce0b68 commit f74918b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
13 changes: 8 additions & 5 deletions packages/shell-api/src/stream-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@ export default class StreamProcessor extends ShellApiWithMongoClass {
}

@returnsPromise
async start() {
async start(options: Document = {}) {
return await this._streams._runStreamCommand({
startStreamProcessor: this.name,
...options,
});
}

@returnsPromise
async stop() {
async stop(options: Document = {}) {
return await this._streams._runStreamCommand({
stopStreamProcessor: this.name,
...options,
});
}

@returnsPromise
async drop() {
return this._drop();
async drop(options: Document = {}) {
return this._drop(options);
}

async _drop() {
async _drop(options: Document = {}) {
return await this._streams._runStreamCommand({
dropStreamProcessor: this.name,
...options,
});
}

Expand Down
55 changes: 54 additions & 1 deletion packages/shell-api/src/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Streams } from './streams';
import { InterruptFlag, MongoshInterruptedError } from './interruptor';
import type { MongoshInvalidInputError } from '@mongosh/errors';

describe('Streams', function () {
describe.only('Streams', function () {
let mongo: Mongo;
let streams: Streams;
const identity = (a: unknown) => a;
Expand Down Expand Up @@ -164,6 +164,59 @@ describe('Streams', function () {
});
});

// Validate supplying options in start,stop, and drop commands.
describe('options', function () {
it('supplies options in start, stop, and drop', async function () {
// Create the stream processor.
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1 });
const name = 'optionsTest';
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;

// Start the stream processor with an extra option.
await processor.start({ resumeFromCheckpoint: false });
expect(
runCmdStub.calledWithExactly(
'admin',
{ startStreamProcessor: name, resumeFromCheckpoint: false },
{}
)
).to.be.true;

// Stop the stream processor with an extra option.
await processor.stop({ force: true });
expect(
runCmdStub.calledWithExactly(
'admin',
{ stopStreamProcessor: name, force: true },
{}
)
).to.be.true;

// Drop the stream processor with a few extra options.
const opts = {
force: true,
ttl: { unit: 'day', size: 30 },
};
await processor.drop(opts);
expect(
runCmdStub.calledWithExactly(
'admin',
{
dropStreamProcessor: name,
...opts,
},
{}
)
).to.be.true;
});
});

describe('modify', function () {
it('throws with invalid parameters', async function () {
// Create the stream processor.
Expand Down

0 comments on commit f74918b

Please sign in to comment.