From 9986234a6d6d7fa327ec724aa8c31b83dbb72ef3 Mon Sep 17 00:00:00 2001 From: Paul Damnhorns Date: Fri, 15 Nov 2024 17:11:43 +0100 Subject: [PATCH] Fix EMFILE too many open files, added maxConcurrency option (#8) --- CHANGELOG.md | 1 + README.md | 1 + src/index.ts | 36 ++++++++++++++++------------- src/loose-object-index.ts | 24 ++++++++++--------- src/packed-object-index.ts | 8 +++---- src/resolve-ref.ts | 14 ++++++++---- src/stat.ts | 21 +++++++++-------- src/types.ts | 13 ++++++++++- src/utils/threads.ts | 47 ++++++++++++++++++++++++++++++++++++++ test/utils.ts | 29 +++++++++++++++++++++++ 10 files changed, 146 insertions(+), 48 deletions(-) create mode 100644 src/utils/threads.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ab66fcd..d9e44ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## next +- Added `maxConcurrency` option to limit concurrent FS operations, preventing "too many open files" errors (#8) - Fixed Node.js warnings such as "Warning: Closing file descriptor # on garbage collection", which is deprecated in Node.js 22 and will result in an error being thrown in the future ## 0.1.4 (2024-10-30) diff --git a/README.md b/README.md index e0af469..bc4c440 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ await repo.dispose(); - `gitdir`: string - path to the git repo - `options` – optional settings: + - `maxConcurrency` – limit the number of file system operations (default: 50) - `cruftPacks` – defines how [cruft packs](https://git-scm.com/docs/cruft-packs) are processed: - `'include'` or `true` (default) - process all packs - `'exclude'` or `false` - exclude cruft packs from processing diff --git a/src/index.ts b/src/index.ts index 0738984..57c0fc2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,19 +6,20 @@ import { createPackedObjectIndex } from './packed-object-index.js'; import { createFilesMethods } from './files-methods.js'; import { createCommitMethods } from './commits.js'; import { createStatMethod } from './stat.js'; +import { promiseAllThreaded } from './utils/threads.js'; import { GitReaderOptions, NormalizedGitReaderOptions, CruftPackMode } from './types'; export * from './types.js'; export * from './parse-object.js'; export { isGitDir, resolveGitDir }; -export async function createGitReader(gitdir: string, options?: GitReaderOptions) { +export async function createGitReader(gitdir: string, options?: Partial) { const startInitTime = Date.now(); const normalizedOptions = normalizeOptions(options); const resolvedGitDir = await resolveGitDir(gitdir); const [refIndex, looseObjectIndex, packedObjectIndex] = await Promise.all([ - createRefIndex(resolvedGitDir), - createLooseObjectIndex(resolvedGitDir), + createRefIndex(resolvedGitDir, normalizedOptions), + createLooseObjectIndex(resolvedGitDir, normalizedOptions), createPackedObjectIndex(resolvedGitDir, normalizedOptions) ]); const { readObjectHeaderByHash, readObjectByHash, readObjectHeaderByOid, readObjectByOid } = @@ -38,27 +39,30 @@ export async function createGitReader(gitdir: string, options?: GitReaderOptions async dispose() { await Promise.all([looseObjectIndex.dispose(), packedObjectIndex.dispose()]); }, - stat: createStatMethod({ - gitdir: resolvedGitDir, - refIndex, - looseObjectIndex, - packedObjectIndex - }), + stat: createStatMethod( + resolvedGitDir, + { refIndex, looseObjectIndex, packedObjectIndex }, + normalizedOptions + ), initTime: Date.now() - startInitTime }; } -function normalizeOptions(options?: GitReaderOptions): NormalizedGitReaderOptions { - if (!options || options.cruftPacks === undefined) { - return { cruftPacks: 'include' }; - } +function normalizeOptions(options?: Partial): NormalizedGitReaderOptions { + const { cruftPacks = true, maxConcurrency } = options || {}; + const maxConcurrencyNormalized = Number.isFinite(maxConcurrency) + ? (maxConcurrency as number) + : 50; return { + maxConcurrency: maxConcurrencyNormalized, + performConcurrent: (queue, action) => + promiseAllThreaded(maxConcurrencyNormalized, queue, action), cruftPacks: - typeof options.cruftPacks === 'string' - ? validateCruftPackMode(options.cruftPacks) - : options.cruftPacks // expands true/false aliases + typeof cruftPacks === 'string' + ? validateCruftPackMode(cruftPacks) + : cruftPacks // expands true/false aliases ? 'include' : 'exclude' }; diff --git a/src/loose-object-index.ts b/src/loose-object-index.ts index a992b98..2a1107a 100644 --- a/src/loose-object-index.ts +++ b/src/loose-object-index.ts @@ -5,6 +5,7 @@ import { GitObject, InternalGitObjectContent, InternalGitObjectHeader, + NormalizedGitReaderOptions, ObjectsTypeStat, PackedObjectType } from './types.js'; @@ -14,20 +15,21 @@ import { createObjectsTypeStat, objectsStatFromTypes } from './utils/stat.js'; type LooseObjectMap = Map; type LooseObjectMapEntry = [oid: string, relpath: string]; -async function createLooseObjectMap(gitdir: string): Promise { +async function createLooseObjectMap( + gitdir: string, + { performConcurrent }: NormalizedGitReaderOptions +): Promise { const objectsPath = pathJoin(gitdir, 'objects'); const looseDirs = (await fsPromises.readdir(objectsPath)).filter((p) => /^[0-9a-f]{2}$/.test(p) ); - const objectDirs = await Promise.all( - looseDirs.map((dir) => - fsPromises - .readdir(pathJoin(objectsPath, dir)) - .then((files) => - files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`]) - ) - ) + const objectDirs = await performConcurrent(looseDirs, (dir) => + fsPromises + .readdir(pathJoin(objectsPath, dir)) + .then((files) => + files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`]) + ) ); return new Map(objectDirs.flat().sort(([a], [b]) => (a < b ? -1 : 1))); @@ -77,8 +79,8 @@ function parseLooseObject(buffer: Buffer): InternalGitObjectContent { }; } -export async function createLooseObjectIndex(gitdir: string) { - const looseObjectMap = await createLooseObjectMap(gitdir); +export async function createLooseObjectIndex(gitdir: string, options: NormalizedGitReaderOptions) { + const looseObjectMap = await createLooseObjectMap(gitdir, options); const { fanoutTable, binaryNames, names } = indexObjectNames([...looseObjectMap.keys()]); const getOidFromHash = (hash: Buffer) => { diff --git a/src/packed-object-index.ts b/src/packed-object-index.ts index 4d6b2ba..e3d8a8c 100644 --- a/src/packed-object-index.ts +++ b/src/packed-object-index.ts @@ -19,7 +19,7 @@ const PACKDIR = 'objects/pack'; */ export async function createPackedObjectIndex( gitdir: string, - { cruftPacks }: NormalizedGitReaderOptions + { cruftPacks, performConcurrent }: NormalizedGitReaderOptions ) { function readObjectHeaderByHash( hash: Buffer, @@ -75,10 +75,8 @@ export async function createPackedObjectIndex( : !cruftPackFilenames.includes(filename); }); - const packFiles = await Promise.all( - packFilenames.map((filename) => - readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash) - ) + const packFiles = await performConcurrent(packFilenames, async (filename) => + readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash) ); return { diff --git a/src/resolve-ref.ts b/src/resolve-ref.ts index 66943ad..3013bff 100644 --- a/src/resolve-ref.ts +++ b/src/resolve-ref.ts @@ -1,6 +1,7 @@ import { promises as fsPromises, existsSync } from 'fs'; import { join as pathJoin, basename, sep as pathSep } from 'path'; import { scanFs } from '@discoveryjs/scan-fs'; +import { NormalizedGitReaderOptions } from './types.js'; type Ref = { name: string; @@ -49,7 +50,10 @@ function isOid(value: unknown) { return typeof value === 'string' && value.length === 40 && /^[0-9a-f]{40}$/.test(value); } -export async function createRefIndex(gitdir: string) { +export async function createRefIndex( + gitdir: string, + { performConcurrent }: NormalizedGitReaderOptions +) { const refResolver = await createRefResolver(gitdir); // expand a ref into a full form @@ -136,8 +140,8 @@ export async function createRefIndex(gitdir: string) { let cachedRefsWithOid = listRefsWithOidCache.get(prefix); if (cachedRefsWithOid === undefined) { - const oids = await Promise.all( - cachedRefs.map((name) => refResolver.resolveOid(prefix + name)) + const oids = await performConcurrent(cachedRefs, (name) => + refResolver.resolveOid(prefix + name) ); cachedRefsWithOid = cachedRefs.map((name, index) => ({ @@ -210,8 +214,8 @@ export async function createRefIndex(gitdir: string) { async stat() { const remotes = listRemotes(); - const branchesByRemote = await Promise.all( - remotes.map((remote) => listRemoteBranches(remote)) + const branchesByRemote = await performConcurrent(remotes, (remote) => + listRemoteBranches(remote) ); return { diff --git a/src/stat.ts b/src/stat.ts index 983e33e..4818e38 100644 --- a/src/stat.ts +++ b/src/stat.ts @@ -5,18 +5,19 @@ import { sumObjectsStat } from './utils/stat.js'; import { createRefIndex } from './resolve-ref.js'; import { createLooseObjectIndex } from './loose-object-index.js'; import { createPackedObjectIndex } from './packed-object-index.js'; +import { NormalizedGitReaderOptions } from './types.js'; -export function createStatMethod({ - gitdir, - refIndex, - looseObjectIndex, - packedObjectIndex -}: { - gitdir: string; +type CreateStatMethodInput = { refIndex: Awaited>; looseObjectIndex: Awaited>; packedObjectIndex: Awaited>; -}) { +}; + +export function createStatMethod( + gitdir: string, + { refIndex, looseObjectIndex, packedObjectIndex }: CreateStatMethodInput, + { performConcurrent }: NormalizedGitReaderOptions +) { return async function () { const [refs, looseObjects, packedObjects, { files }] = await Promise.all([ refIndex.stat(), @@ -25,8 +26,8 @@ export function createStatMethod({ scanFs(gitdir) ]); - const fileStats = await Promise.all( - files.map((file) => fsPromises.stat(path.join(gitdir, file.path))) + const fileStats = await performConcurrent(files, (file) => + fsPromises.stat(path.join(gitdir, file.path)) ); const objectsTypes = looseObjects.objects.types.map((entry) => ({ ...entry })); diff --git a/src/types.ts b/src/types.ts index 73337d6..b8ba9f7 100644 --- a/src/types.ts +++ b/src/types.ts @@ -131,9 +131,20 @@ export interface GitReaderOptions { * * @default 'include' */ - cruftPacks?: CruftPackMode | boolean; + cruftPacks: CruftPackMode | boolean; + + /** + * Maximum number of concurrent file system operations. + * @default 50 + */ + maxConcurrency: number; } export interface NormalizedGitReaderOptions { cruftPacks: CruftPackMode; + maxConcurrency: number; + performConcurrent: ( + queue: T[], + action: (item: T, itemIdx: number) => Promise + ) => Promise; } diff --git a/src/utils/threads.ts b/src/utils/threads.ts new file mode 100644 index 0000000..340dca5 --- /dev/null +++ b/src/utils/threads.ts @@ -0,0 +1,47 @@ +/** + * Run async tasks in queue with a maximum number of threads. + * Works like Promise.all, but with a maximum number of threads. + * - The order of the results is guaranteed to be the same as the order of the input queue. + * - If any task fails, the whole queue is rejected. + * - If the queue is empty, the result is an empty array. + * - If the queue has only one task, the result is an array with one element. + * + * @example + * // Before + * const packFiles = await Promise.all( + * packFilenames.map((filename) => + * readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash) + * ) + * ); + * + * // After + * const packFiles = await promiseAllThreaded(50, packFilenames, async (filename) => + * readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash) + * ); + */ +export async function promiseAllThreaded( + maxThreadCount: number, + queue: T[], + asyncFn: (task: T, taskIdx: number) => Promise +): Promise { + const result = Array(queue.length); + let taskProcessed = 0; + let queueSnapshot = [...queue]; + const thread = async () => { + while (taskProcessed < queueSnapshot.length) { + const taskIdx = taskProcessed++; + const task = queueSnapshot[taskIdx]; + result[taskIdx] = await asyncFn(task, taskIdx); + } + }; + + await Promise.all( + Array.from({ length: Math.min(maxThreadCount, queueSnapshot.length) }, () => thread()) + ).catch((err) => { + // remove all pending tasks + queueSnapshot = []; + throw err; + }); + + return result; +} diff --git a/test/utils.ts b/test/utils.ts index 7258dcb..89a9e07 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -1,5 +1,6 @@ import assert from 'assert'; import { readEncodedOffset, BufferCursor } from '../src/utils/buffer.js'; +import { promiseAllThreaded } from '../src/utils/threads.js'; it('readEncodedOffset', () => { const buffer = Buffer.from([142, 254, 254, 254, 254, 254, 254, 127]); @@ -7,3 +8,31 @@ it('readEncodedOffset', () => { assert.strictEqual(readEncodedOffset(cursor), Number.MAX_SAFE_INTEGER); }); + +it('promiseAllThreaded', async () => { + const maxThreadCount = 2; + const queue = [1, 2, 3, 4, 5]; + const asyncFn = async (task: number) => task * 2; + + const result = await promiseAllThreaded(maxThreadCount, queue, asyncFn); + + assert.deepStrictEqual(result, [2, 4, 6, 8, 10]); +}); + +it('promiseAllThreaded with error', async () => { + const maxThreadCount = 2; + const queue = [1, 2, 3, 4, 5]; + const asyncFn = async (task: number) => { + if (task === 3) { + throw new Error('Task failed'); + } + return task * 2; + }; + + try { + await promiseAllThreaded(maxThreadCount, queue, asyncFn); + assert.fail('Expected an error'); + } catch (err) { + assert.strictEqual(err.message, 'Task failed'); + } +});