Skip to content

Commit

Permalink
chore: bound the parallelism (backport #162) (#166)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `v2-main`:
- [chore: bound the parallelism
(#162)](#162)

<!--- Backport version: 9.5.1 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

---------

Signed-off-by: github-actions <[email protected]>
Co-authored-by: Rico Hermans <[email protected]>
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent 61e345c commit 6c5b4b1
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 4 deletions.
6 changes: 5 additions & 1 deletion .eslintrc.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .projen/deps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions .projen/tasks.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .projenrc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,10 @@ project.addTask('shrinkwrap', {
],
});

project.addDevDeps('@cdklabs/eslint-plugin');
project.eslint?.addPlugins('@cdklabs');
project.eslint?.addRules({
'@cdklabs/promiseall-no-unbounded-parallelism': ['error'],
});

project.synth();
2 changes: 2 additions & 0 deletions lib/private/archive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ function writeZipFile(directory: string, outputFile: string): Promise<void> {
// Append files serially to ensure file order
for (const file of files) {
const fullPath = path.resolve(directory, file);
// There are exactly 2 promises
// eslint-disable-next-line @cdklabs/promiseall-no-unbounded-parallelism
const [data, stat] = await Promise.all([fs.readFile(fullPath), fs.stat(fullPath)]);
archive.append(data, {
name: file,
Expand Down
62 changes: 62 additions & 0 deletions lib/private/p-limit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* A minimal of p-limit that does not bring in new dependencies, and is not ESM.
*/

type PromiseFactory<A> = () => Promise<A>;

export function pLimit(concurrency: number): PLimit {
const queue: Array<[PromiseFactory<any>, (x: any) => void, (reason?: any) => void]> = [];
let activeCount = 0;
let stopped = false;

function dispatch() {
if (activeCount < concurrency && queue.length > 0) {
const [fac, resolve, reject] = queue.shift()!;
activeCount++;
fac().then(
(r) => {
// Start a new job before reporting back on the previous one
resumeNext();
resolve(r);
},
(e) => {
// Start a new job before reporting back on the previous one
resumeNext();
reject(e);
}
);
}
}

function resumeNext() {
activeCount--;
if (stopped) {
for (const [_, __, reject] of queue) {
reject(new Error('Task has been cancelled'));
}
queue.splice(0, queue.length);
}
dispatch();
}

const ret = <A>(promiseFactory: PromiseFactory<A>) => {
return new Promise<A>((resolve, reject) => {
queue.push([promiseFactory, resolve, reject]);
dispatch();
});
};
Object.defineProperties(ret, {
dispose: {
value: () => {
stopped = true;
},
},
});

return ret as PLimit;
}

interface PLimit {
dispose(): void;
<A>(promiseFactory: PromiseFactory<A>): Promise<A>;
}
7 changes: 6 additions & 1 deletion lib/publishing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IAws } from './aws';
import { IAssetHandler, IHandlerHost, type PublishOptions } from './private/asset-handler';
import { DockerFactory } from './private/docker';
import { makeAssetHandler } from './private/handlers';
import { pLimit } from './private/p-limit';
import { EventType, IPublishProgress, IPublishProgressListener } from './progress';

export interface AssetPublishingOptions {
Expand Down Expand Up @@ -119,7 +120,11 @@ export class AssetPublishing implements IPublishProgress {
*/
public async publish(options: PublishOptions = {}): Promise<void> {
if (this.publishInParallel) {
await Promise.all(this.assets.map(async (asset) => this.publishAsset(asset, options)));
const limit = pLimit(20);
// eslint-disable-next-line @cdklabs/promiseall-no-unbounded-parallelism
await Promise.all(
this.assets.map((asset) => limit(async () => this.publishAsset(asset, options)))
);
} else {
for (const asset of this.assets) {
if (!(await this.publishAsset(asset, options))) {
Expand Down
1 change: 1 addition & 0 deletions package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions test/private/p-limit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/* eslint-disable @cdklabs/promiseall-no-unbounded-parallelism */
import { pLimit } from '../../lib/private/p-limit';

test('never running more than N jobs at once', async () => {
const limit = pLimit(5);
let current = 0;
let max = 0;

await Promise.all(
Array.from({ length: 20 }).map(() =>
limit(async () => {
max = Math.max(max, ++current);
await sleep(1);
--current;
})
)
);

expect(max).toBeLessThanOrEqual(5);
});

test('new jobs arent started after dispose is called', async () => {
const limit = pLimit(2);
let started = 0;

await expect(() =>
Promise.all(
Array.from({ length: 20 }).map(() =>
limit(async () => {
started += 1;
await sleep(0);
throw new Error('oops');
})
)
)
).rejects.toThrow(/oops/);

limit.dispose();

await sleep(20);

// It may be that we started 1 more job here, but definitely not all 20
expect(started).toBeLessThanOrEqual(3);
});

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
16 changes: 16 additions & 0 deletions yarn.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6c5b4b1

Please sign in to comment.