From 347cc30bf8df1eedefebf1eda8980638fdd8a067 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 24 Apr 2024 13:55:01 -0600 Subject: [PATCH 1/2] feat(s3): download files from s3 concurrently --- src/system.ts | 83 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/src/system.ts b/src/system.ts index b69a6c4..19b232e 100644 --- a/src/system.ts +++ b/src/system.ts @@ -146,11 +146,19 @@ export const fetchCacheFromS3 = async () => { try { const data = await s3.send(new ListObjectsV2Command(params)); - for (const obj of data.Contents || []) { - const fileKey = obj.Key; - if (!fileKey) { - continue; - } + const parallelLimit = pLimit(10); + const files = data.Contents || []; + + const fetchFileFromS3 = async ({ + fileKey, + params, + }: { + fileKey: string; + params: { Bucket: string; Key: string }; + }) => { + logger.debug('Fetching file from S3', { + fileKey, + }); const tempFilePath = path.join( process.cwd(), @@ -159,33 +167,56 @@ export const fetchCacheFromS3 = async () => { const tmpFileDir = path.dirname(tempFilePath); - await fs.promises.mkdir(tmpFileDir, { recursive: true }); + try { + if (fs.existsSync(fileKey)) { + await fs.promises.mkdir(tmpFileDir, { recursive: true }); + } - const data = await s3.send( - new GetObjectCommand({ ...params, Key: fileKey }), - ); + const data = await s3.send( + new GetObjectCommand({ ...params, Key: fileKey }), + ); - logger.debug('Saving cache to temp file', { - fileKey, - tempFilePath, - }); + if (data.Body) { + const readableStream = data.Body as Readable; + await fs.promises.writeFile(tempFilePath, readableStream); + logger.debug('Successfully saved file to local filesystem', { + fileKey, + tempFilePath, + }); + const warpCacheDir = path.dirname(fileKey); + await fs.promises.mkdir(warpCacheDir, { recursive: true }); + if (fs.existsSync(fileKey)) { + await fs.promises.unlink(fileKey); + } + // moves the file from the temp location to the final location + await fs.promises.rename(tempFilePath, fileKey); - if (data.Body) { - const readableStream = data.Body as Readable; - await fs.promises.writeFile(tempFilePath, readableStream); - logger.debug('Successfully saved file to local filesystem', { + logger.debug('Successfully fetched file from S3', { + fileKey, + tempFilePath, + }); + } + } catch (error: unknown) { + const message = + error instanceof Error ? error : new Error('Unknown error'); + logger.error('Failed to fetch file from S3', { + error: message, fileKey, - tempFilePath, }); - const warpCacheDir = path.dirname(fileKey); - await fs.promises.mkdir(warpCacheDir, { recursive: true }); - if (fs.existsSync(fileKey)) { - await fs.promises.unlink(fileKey); - } - // moves the file from the temp location to the final location - await fs.promises.rename(tempFilePath, fileKey); } - } + }; + + await Promise.all( + files.map(async (obj) => { + const fileKey = obj.Key; + if (!fileKey) { + return; + } + return parallelLimit(() => { + return fetchFileFromS3({ fileKey, params }); + }); + }), + ); logger.info('Successfully bootstrapped warp cache from S3', { durationMs: Date.now() - startTimeMs, }); From 04f1859c13db4f9d065b331e2603609d8205cc83 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 24 Apr 2024 14:06:14 -0600 Subject: [PATCH 2/2] chore: ensure we are using filesystem paths correctly --- src/system.ts | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/system.ts b/src/system.ts index 19b232e..3bcbad8 100644 --- a/src/system.ts +++ b/src/system.ts @@ -145,7 +145,6 @@ export const fetchCacheFromS3 = async () => { try { const data = await s3.send(new ListObjectsV2Command(params)); - const parallelLimit = pLimit(10); const files = data.Contents || []; @@ -156,6 +155,7 @@ export const fetchCacheFromS3 = async () => { fileKey: string; params: { Bucket: string; Key: string }; }) => { + const fileStartTime = Date.now(); logger.debug('Fetching file from S3', { fileKey, }); @@ -166,6 +166,7 @@ export const fetchCacheFromS3 = async () => { ); const tmpFileDir = path.dirname(tempFilePath); + const finalFilePath = path.join(process.cwd(), fileKey); try { if (fs.existsSync(fileKey)) { @@ -178,22 +179,28 @@ export const fetchCacheFromS3 = async () => { if (data.Body) { const readableStream = data.Body as Readable; - await fs.promises.writeFile(tempFilePath, readableStream); - logger.debug('Successfully saved file to local filesystem', { + logger.debug('Writing file to temporary location', { fileKey, tempFilePath, }); - const warpCacheDir = path.dirname(fileKey); + await fs.promises.writeFile(tempFilePath, readableStream); + const warpCacheDir = path.dirname(finalFilePath); await fs.promises.mkdir(warpCacheDir, { recursive: true }); - if (fs.existsSync(fileKey)) { - await fs.promises.unlink(fileKey); + if (fs.existsSync(finalFilePath)) { + await fs.promises.unlink(finalFilePath); } + logger.debug('Moving file to final location', { + fileKey, + finalFilePath, + tempFilePath, + }); // moves the file from the temp location to the final location - await fs.promises.rename(tempFilePath, fileKey); + await fs.promises.rename(tempFilePath, finalFilePath); logger.debug('Successfully fetched file from S3', { fileKey, - tempFilePath, + finalFilePath, + durationMs: Date.now() - fileStartTime, }); } } catch (error: unknown) { @@ -206,6 +213,10 @@ export const fetchCacheFromS3 = async () => { } }; + logger.debug('Found files to fetch from S3', { + fileCount: files.length, + }); + await Promise.all( files.map(async (obj) => { const fileKey = obj.Key;