diff --git a/src/system.ts b/src/system.ts index b69a6c4..3bcbad8 100644 --- a/src/system.ts +++ b/src/system.ts @@ -145,12 +145,20 @@ export const fetchCacheFromS3 = async () => { try { const data = await s3.send(new ListObjectsV2Command(params)); + const parallelLimit = pLimit(10); + const files = data.Contents || []; - for (const obj of data.Contents || []) { - const fileKey = obj.Key; - if (!fileKey) { - continue; - } + const fetchFileFromS3 = async ({ + fileKey, + params, + }: { + fileKey: string; + params: { Bucket: string; Key: string }; + }) => { + const fileStartTime = Date.now(); + logger.debug('Fetching file from S3', { + fileKey, + }); const tempFilePath = path.join( process.cwd(), @@ -158,34 +166,68 @@ export const fetchCacheFromS3 = async () => { ); const tmpFileDir = path.dirname(tempFilePath); + const finalFilePath = path.join(process.cwd(), fileKey); - await fs.promises.mkdir(tmpFileDir, { recursive: true }); + try { + if (fs.existsSync(fileKey)) { + await fs.promises.mkdir(tmpFileDir, { recursive: true }); + } - const data = await s3.send( - new GetObjectCommand({ ...params, Key: fileKey }), - ); + const data = await s3.send( + new GetObjectCommand({ ...params, Key: fileKey }), + ); - logger.debug('Saving cache to temp file', { - fileKey, - tempFilePath, - }); + if (data.Body) { + const readableStream = data.Body as Readable; + logger.debug('Writing file to temporary location', { + fileKey, + tempFilePath, + }); + await fs.promises.writeFile(tempFilePath, readableStream); + const warpCacheDir = path.dirname(finalFilePath); + await fs.promises.mkdir(warpCacheDir, { recursive: true }); + if (fs.existsSync(finalFilePath)) { + await fs.promises.unlink(finalFilePath); + } + logger.debug('Moving file to final location', { + fileKey, + finalFilePath, + tempFilePath, + }); + // moves the file from the temp location to the final location + await fs.promises.rename(tempFilePath, finalFilePath); - if (data.Body) { - const readableStream = data.Body as Readable; - await fs.promises.writeFile(tempFilePath, readableStream); - logger.debug('Successfully saved file to local filesystem', { + logger.debug('Successfully fetched file from S3', { + fileKey, + finalFilePath, + durationMs: Date.now() - fileStartTime, + }); + } + } catch (error: unknown) { + const message = + error instanceof Error ? error : new Error('Unknown error'); + logger.error('Failed to fetch file from S3', { + error: message, fileKey, - tempFilePath, }); - const warpCacheDir = path.dirname(fileKey); - await fs.promises.mkdir(warpCacheDir, { recursive: true }); - if (fs.existsSync(fileKey)) { - await fs.promises.unlink(fileKey); - } - // moves the file from the temp location to the final location - await fs.promises.rename(tempFilePath, fileKey); } - } + }; + + logger.debug('Found files to fetch from S3', { + fileCount: files.length, + }); + + await Promise.all( + files.map(async (obj) => { + const fileKey = obj.Key; + if (!fileKey) { + return; + } + return parallelLimit(() => { + return fetchFileFromS3({ fileKey, params }); + }); + }), + ); logger.info('Successfully bootstrapped warp cache from S3', { durationMs: Date.now() - startTimeMs, });