Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mongodb-runner): add programmatic api for mongodb-runner #186

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 11 additions & 56 deletions packages/mongodb-runner/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,10 @@ import yargs from 'yargs';
import { MongoCluster } from './mongocluster';
import os from 'os';
import path from 'path';
import { promises as fs } from 'fs';
import { BSON } from 'mongodb';
import { spawn } from 'child_process';
import createDebug from 'debug';
import { once } from 'events';

interface StoredInstance {
id: string;
filepath: string;
serialized: string;
connectionString: string;
}
import * as utilities from './index';
Anemy marked this conversation as resolved.
Show resolved Hide resolved

(async function () {
const defaultRunnerDir = path.join(os.homedir(), '.mongodb', 'runner2');
Expand Down Expand Up @@ -77,6 +69,7 @@ interface StoredInstance {
.option('debug', { type: 'boolean', describe: 'Enable debug output' })
.command('start', 'Start a MongoDB instance')
.command('stop', 'Stop a MongoDB instance')
.command('prune', 'Clean up metadata for any dead MongoDB instances')
.command('ls', 'List currently running MongoDB instances')
.command(
'exec',
Expand All @@ -90,24 +83,8 @@ interface StoredInstance {
}

async function start() {
const id = argv.id || new BSON.UUID().toHexString();
if (!/^[a-zA-Z0-9_-]+$/.test(id)) {
throw new Error(`ID '${id}' contains non-alphanumeric characters`);
}
await fs.mkdir(argv.runnerDir, { recursive: true });

const cluster = await MongoCluster.start({
...argv,
args,
});
const serialized = await cluster.serialize();
const { connectionString } = cluster;

await fs.writeFile(
path.join(argv.runnerDir, `m-${id}.json`),
JSON.stringify({ id, serialized, connectionString })
);
console.log(`Server started and running at ${connectionString}`);
const { cluster, id } = await utilities.start(argv);
console.log(`Server started and running at ${cluster.connectionString}`);
console.log('Run the following command to stop the instance:');
console.log(
`${argv.$0} stop --id=${id}` +
Expand All @@ -118,45 +95,23 @@ interface StoredInstance {
cluster.unref();
}

async function* instances(): AsyncIterable<StoredInstance> {
for await (const { name } of await fs.opendir(argv.runnerDir)) {
if (name.startsWith('m-') && name.endsWith('.json')) {
try {
const filepath = path.join(argv.runnerDir, name);
const stored = JSON.parse(await fs.readFile(filepath, 'utf8'));
yield { ...stored, filepath };
} catch {
/* ignore */
}
}
}
}

async function stop() {
if (!argv.id && !argv.all) {
throw new Error('Need --id or --all to know which server to stop');
}
const toStop: Array<StoredInstance> = [];
for await (const instance of instances()) {
if (instance.id === argv.id || argv.all) toStop.push(instance);
}
await Promise.all(
toStop.map(async ({ id, filepath, serialized, connectionString }) => {
await (await MongoCluster.deserialize(serialized)).close();
await fs.rm(filepath);
console.log(
`Stopped cluster '${id}' (was running at '${connectionString}')`
);
})
);
await utilities.stop(argv);
}

async function ls() {
for await (const { id, connectionString } of instances()) {
for await (const { id, connectionString } of utilities.instances(argv)) {
console.log(`${id}: ${connectionString}`);
}
}

async function prune() {
await utilities.prune(argv);
}

async function exec() {
let mongodArgs: string[];
let execArgs: string[];
Expand Down Expand Up @@ -198,7 +153,7 @@ interface StoredInstance {
);
}

await ({ start, stop, exec, ls }[command] ?? unknown)();
await ({ start, stop, exec, ls, prune }[command] ?? unknown)();
})().catch((err) => {
process.nextTick(() => {
throw err;
Expand Down
1 change: 1 addition & 0 deletions packages/mongodb-runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { MongoServer, MongoServerOptions } from './mongoserver';

export { MongoCluster, MongoClusterOptions } from './mongocluster';
export type { ConnectionString } from 'mongodb-connection-string-url';
export { prune, start, stop, instances } from './runner-helpers';
9 changes: 7 additions & 2 deletions packages/mongodb-runner/src/mongocluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MongoServer } from './mongoserver';
import { ConnectionString } from 'mongodb-connection-string-url';
import type { DownloadOptions } from '@mongodb-js/mongodb-downloader';
import { downloadMongoDb } from '@mongodb-js/mongodb-downloader';
import type { MongoClientOptions } from 'mongodb';
import { MongoClient } from 'mongodb';
import { sleep, range, uuid, debug } from './util';

Expand Down Expand Up @@ -225,9 +226,13 @@ export class MongoCluster {
}

async withClient<Fn extends (client: MongoClient) => any>(
fn: Fn
fn: Fn,
clientOptions: MongoClientOptions = {}
): Promise<ReturnType<Fn>> {
const client = await MongoClient.connect(this.connectionString);
const client = await MongoClient.connect(
this.connectionString,
clientOptions
);
try {
return await fn(client);
} finally {
Expand Down
97 changes: 97 additions & 0 deletions packages/mongodb-runner/src/runner-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { BSON } from 'mongodb';
import path from 'path';
import type { MongoClusterOptions } from './mongocluster';
import { MongoCluster } from './mongocluster';
import { parallelForEach } from './util';
import * as fs from 'fs/promises';

interface StoredInstance {
id: string;
filepath: string;
serialized: string;
connectionString: string;
}

export async function start(
argv: {
id?: string;
runnerDir: string;
} & MongoClusterOptions,
args?: string[]
) {
const id = argv.id || new BSON.UUID().toHexString();
if (!/^[a-zA-Z0-9_-]+$/.test(id)) {
throw new Error(`ID '${id}' contains non-alphanumeric characters`);
}
await fs.mkdir(argv.runnerDir, { recursive: true });

const cluster = await MongoCluster.start({
...argv,
args,
});
const serialized = await cluster.serialize();
const { connectionString } = cluster;

await fs.writeFile(
path.join(argv.runnerDir, `m-${id}.json`),
JSON.stringify({ id, serialized, connectionString })
);

cluster.unref();

return { cluster, id };
}

export async function* instances(argv: {
runnerDir: string;
}): AsyncIterable<StoredInstance> {
for await (const { name } of await fs.opendir(argv.runnerDir)) {
if (name.startsWith('m-') && name.endsWith('.json')) {
try {
const filepath = path.join(argv.runnerDir, name);
const stored = JSON.parse(await fs.readFile(filepath, 'utf8'));
yield { ...stored, filepath };
} catch {
/* ignore */
}
}
}
}

/**
* Attempts to connect to every mongodb instance defined in `runnerDir`.
* If it cannot connect to an instance, it cleans up the entry from `runnerDir`.
*/
export async function prune(argv: { runnerDir: string }): Promise<void> {
async function handler(instance: StoredInstance) {
try {
const cluster = await MongoCluster.deserialize(instance.serialized);
await cluster.withClient(
() => {
// connect and close
},
{ serverSelectionTimeoutMS: 2000 }
);
} catch (e) {
await fs.rm(instance.filepath);
}
}
await parallelForEach(instances(argv), handler);
}

export async function stop(argv: {
runnerDir: string;
id?: string;
all?: boolean;
}) {
const toStop: Array<StoredInstance> = [];
for await (const instance of instances(argv)) {
if (instance.id === argv.id || argv.all) toStop.push(instance);
}
await Promise.all(
toStop.map(async ({ filepath, serialized }) => {
await (await MongoCluster.deserialize(serialized)).close();
await fs.rm(filepath);
})
);
}
13 changes: 13 additions & 0 deletions packages/mongodb-runner/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,16 @@ export const uuid = () => new BSON.UUID().toHexString(true);
export const sleep = (ms: number): Promise<void> =>
new Promise((r) => setTimeout(r, ms));
export const range = (n: number): number[] => [...Array(n).keys()];
export async function parallelForEach<T>(
iterable: AsyncIterable<T>,
fn: (arg0: T) => Promise<void> | void
) {
const items = iterable[Symbol.asyncIterator]();

const result = [];
for (let item = await items.next(); !item.done; item = await items.next()) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
result.push(fn(item.value));
}

return await Promise.allSettled(result);
addaleax marked this conversation as resolved.
Show resolved Hide resolved
}
Loading