From 6c5b4b1d92b12b822e96c2c3f8c4137d86ad6116 Mon Sep 17 00:00:00 2001 From: AWS CDK Automation <43080478+aws-cdk-automation@users.noreply.github.com> Date: Wed, 6 Nov 2024 03:40:02 -0800 Subject: [PATCH] chore: bound the parallelism (backport #162) (#166) # Backport This will backport the following commits from `main` to `v2-main`: - [chore: bound the parallelism (#162)](https://github.com/cdklabs/cdk-assets/pull/162) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) --------- Signed-off-by: github-actions Co-authored-by: Rico Hermans Co-authored-by: github-actions --- .eslintrc.json | 6 +++- .projen/deps.json | 4 +++ .projen/tasks.json | 4 +-- .projenrc.ts | 6 ++++ lib/private/archive.ts | 2 ++ lib/private/p-limit.ts | 62 ++++++++++++++++++++++++++++++++++++ lib/publishing.ts | 7 +++- package.json | 1 + test/private/p-limit.test.ts | 48 ++++++++++++++++++++++++++++ yarn.lock | 16 ++++++++++ 10 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 lib/private/p-limit.ts create mode 100644 test/private/p-limit.test.ts diff --git a/.eslintrc.json b/.eslintrc.json index da6692c..70e53f4 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -7,7 +7,8 @@ "root": true, "plugins": [ "@typescript-eslint", - "import" + "import", + "@cdklabs" ], "parser": "@typescript-eslint/parser", "parserOptions": { @@ -133,6 +134,9 @@ "trailingComma": "es5", "printWidth": 100 } + ], + "@cdklabs/promiseall-no-unbounded-parallelism": [ + "error" ] }, "overrides": [ diff --git a/.projen/deps.json b/.projen/deps.json index fde902c..ae44971 100644 --- a/.projen/deps.json +++ b/.projen/deps.json @@ -1,5 +1,9 @@ { "dependencies": [ + { + "name": "@cdklabs/eslint-plugin", + "type": "build" + }, { "name": "@types/archiver", "type": "build" diff --git a/.projen/tasks.json b/.projen/tasks.json index fe4e9fb..55708a6 100644 --- a/.projen/tasks.json +++ b/.projen/tasks.json @@ -245,13 +245,13 @@ }, "steps": [ { - "exec": "npx npm-check-updates@16 --upgrade --target=minor --peer --no-deprecated --dep=dev,peer,prod,optional --filter=@types/archiver,@types/glob,@types/jest,@types/mime,@types/mock-fs,@types/node,@types/yargs,eslint-config-prettier,eslint-import-resolver-typescript,eslint-plugin-import,eslint-plugin-prettier,fs-extra,graceful-fs,jest,jszip,mock-fs,prettier,projen,ts-jest,ts-node,typescript,@aws-cdk/cloud-assembly-schema,@aws-cdk/cx-api,archiver,aws-sdk,glob,mime,yargs" + "exec": "npx npm-check-updates@16 --upgrade --target=minor --peer --no-deprecated --dep=dev,peer,prod,optional --filter=@cdklabs/eslint-plugin,@types/archiver,@types/glob,@types/jest,@types/mime,@types/mock-fs,@types/node,@types/yargs,eslint-config-prettier,eslint-import-resolver-typescript,eslint-plugin-import,eslint-plugin-prettier,fs-extra,graceful-fs,jest,jszip,mock-fs,prettier,projen,ts-jest,ts-node,typescript,@aws-cdk/cloud-assembly-schema,@aws-cdk/cx-api,archiver,aws-sdk,glob,mime,yargs" }, { "exec": "yarn install --check-files" }, { - "exec": "yarn upgrade @types/archiver @types/glob @types/jest @types/mime @types/mock-fs @types/node @types/yargs @typescript-eslint/eslint-plugin @typescript-eslint/parser commit-and-tag-version constructs eslint-config-prettier eslint-import-resolver-typescript eslint-plugin-import eslint-plugin-prettier eslint fs-extra graceful-fs jest jest-junit jszip mock-fs prettier projen ts-jest ts-node typescript @aws-cdk/cloud-assembly-schema @aws-cdk/cx-api archiver aws-sdk glob mime yargs" + "exec": "yarn upgrade @cdklabs/eslint-plugin @types/archiver @types/glob @types/jest @types/mime @types/mock-fs @types/node @types/yargs @typescript-eslint/eslint-plugin @typescript-eslint/parser commit-and-tag-version constructs eslint-config-prettier eslint-import-resolver-typescript eslint-plugin-import eslint-plugin-prettier eslint fs-extra graceful-fs jest jest-junit jszip mock-fs prettier projen ts-jest ts-node typescript @aws-cdk/cloud-assembly-schema @aws-cdk/cx-api archiver aws-sdk glob mime yargs" }, { "exec": "npx projen" diff --git a/.projenrc.ts b/.projenrc.ts index 0d6a072..1298302 100644 --- a/.projenrc.ts +++ b/.projenrc.ts @@ -118,4 +118,10 @@ project.addTask('shrinkwrap', { ], }); +project.addDevDeps('@cdklabs/eslint-plugin'); +project.eslint?.addPlugins('@cdklabs'); +project.eslint?.addRules({ + '@cdklabs/promiseall-no-unbounded-parallelism': ['error'], +}); + project.synth(); diff --git a/lib/private/archive.ts b/lib/private/archive.ts index 8ac533c..baa8589 100644 --- a/lib/private/archive.ts +++ b/lib/private/archive.ts @@ -49,6 +49,8 @@ function writeZipFile(directory: string, outputFile: string): Promise { // Append files serially to ensure file order for (const file of files) { const fullPath = path.resolve(directory, file); + // There are exactly 2 promises + // eslint-disable-next-line @cdklabs/promiseall-no-unbounded-parallelism const [data, stat] = await Promise.all([fs.readFile(fullPath), fs.stat(fullPath)]); archive.append(data, { name: file, diff --git a/lib/private/p-limit.ts b/lib/private/p-limit.ts new file mode 100644 index 0000000..b987a0b --- /dev/null +++ b/lib/private/p-limit.ts @@ -0,0 +1,62 @@ +/** + * A minimal of p-limit that does not bring in new dependencies, and is not ESM. + */ + +type PromiseFactory = () => Promise; + +export function pLimit(concurrency: number): PLimit { + const queue: Array<[PromiseFactory, (x: any) => void, (reason?: any) => void]> = []; + let activeCount = 0; + let stopped = false; + + function dispatch() { + if (activeCount < concurrency && queue.length > 0) { + const [fac, resolve, reject] = queue.shift()!; + activeCount++; + fac().then( + (r) => { + // Start a new job before reporting back on the previous one + resumeNext(); + resolve(r); + }, + (e) => { + // Start a new job before reporting back on the previous one + resumeNext(); + reject(e); + } + ); + } + } + + function resumeNext() { + activeCount--; + if (stopped) { + for (const [_, __, reject] of queue) { + reject(new Error('Task has been cancelled')); + } + queue.splice(0, queue.length); + } + dispatch(); + } + + const ret = (promiseFactory: PromiseFactory) => { + return new Promise((resolve, reject) => { + queue.push([promiseFactory, resolve, reject]); + dispatch(); + }); + }; + Object.defineProperties(ret, { + dispose: { + value: () => { + stopped = true; + }, + }, + }); + + return ret as PLimit; +} + +interface PLimit { + dispose(): void; + (promiseFactory: PromiseFactory): Promise; +} diff --git a/lib/publishing.ts b/lib/publishing.ts index 6f6ead4..cf0293c 100644 --- a/lib/publishing.ts +++ b/lib/publishing.ts @@ -3,6 +3,7 @@ import { IAws } from './aws'; import { IAssetHandler, IHandlerHost, type PublishOptions } from './private/asset-handler'; import { DockerFactory } from './private/docker'; import { makeAssetHandler } from './private/handlers'; +import { pLimit } from './private/p-limit'; import { EventType, IPublishProgress, IPublishProgressListener } from './progress'; export interface AssetPublishingOptions { @@ -119,7 +120,11 @@ export class AssetPublishing implements IPublishProgress { */ public async publish(options: PublishOptions = {}): Promise { if (this.publishInParallel) { - await Promise.all(this.assets.map(async (asset) => this.publishAsset(asset, options))); + const limit = pLimit(20); + // eslint-disable-next-line @cdklabs/promiseall-no-unbounded-parallelism + await Promise.all( + this.assets.map((asset) => limit(async () => this.publishAsset(asset, options))) + ); } else { for (const asset of this.assets) { if (!(await this.publishAsset(asset, options))) { diff --git a/package.json b/package.json index b08e8b2..9973381 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "projen": "npx projen" }, "devDependencies": { + "@cdklabs/eslint-plugin": "^1.0.0", "@types/archiver": "^5.3.4", "@types/glob": "^7.2.0", "@types/jest": "^29.5.14", diff --git a/test/private/p-limit.test.ts b/test/private/p-limit.test.ts new file mode 100644 index 0000000..53bb95d --- /dev/null +++ b/test/private/p-limit.test.ts @@ -0,0 +1,48 @@ +/* eslint-disable @cdklabs/promiseall-no-unbounded-parallelism */ +import { pLimit } from '../../lib/private/p-limit'; + +test('never running more than N jobs at once', async () => { + const limit = pLimit(5); + let current = 0; + let max = 0; + + await Promise.all( + Array.from({ length: 20 }).map(() => + limit(async () => { + max = Math.max(max, ++current); + await sleep(1); + --current; + }) + ) + ); + + expect(max).toBeLessThanOrEqual(5); +}); + +test('new jobs arent started after dispose is called', async () => { + const limit = pLimit(2); + let started = 0; + + await expect(() => + Promise.all( + Array.from({ length: 20 }).map(() => + limit(async () => { + started += 1; + await sleep(0); + throw new Error('oops'); + }) + ) + ) + ).rejects.toThrow(/oops/); + + limit.dispose(); + + await sleep(20); + + // It may be that we started 1 more job here, but definitely not all 20 + expect(started).toBeLessThanOrEqual(3); +}); + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/yarn.lock b/yarn.lock index 7673442..a5cc29e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -288,6 +288,13 @@ resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== +"@cdklabs/eslint-plugin@^1.0.0": + version "1.0.0" + resolved "https://registry.yarnpkg.com/@cdklabs/eslint-plugin/-/eslint-plugin-1.0.0.tgz#3b0c79ccac1c13a35aa3fb81fd3cacc90cc1d8f0" + integrity sha512-R3HyeQqDMeYULyDPX5NqOf57m8gyJRHUEWDEQUjAqgKGrFUzcCND8Sus4o8uzn1b57qWYcQdNLWl+Q3lioT8Bg== + dependencies: + fs-extra "^11.2.0" + "@cspotcode/source-map-support@^0.8.0": version "0.8.1" resolved "https://registry.yarnpkg.com/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz#00629c35a688e05a88b1cda684fb9d5e73f000a1" @@ -2461,6 +2468,15 @@ fs-constants@^1.0.0: resolved "https://registry.yarnpkg.com/fs-constants/-/fs-constants-1.0.0.tgz#6be0de9be998ce16af8afc24497b9ee9b7ccd9ad" integrity sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow== +fs-extra@^11.2.0: + version "11.2.0" + resolved "https://registry.yarnpkg.com/fs-extra/-/fs-extra-11.2.0.tgz#e70e17dfad64232287d01929399e0ea7c86b0e5b" + integrity sha512-PmDi3uwK5nFuXh7XDTlVnS17xJS7vW36is2+w3xcv8SVxiB4NyATf4ctkVY5bkSjX0Y4nbvZCq1/EjtEyr9ktw== + dependencies: + graceful-fs "^4.2.0" + jsonfile "^6.0.1" + universalify "^2.0.0" + fs-extra@^9.1.0: version "9.1.0" resolved "https://registry.yarnpkg.com/fs-extra/-/fs-extra-9.1.0.tgz#5954460c764a8da2094ba3554bf839e6b9a7c86d"