Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

feat(s3): download files from s3 concurrently #111

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 68 additions & 26 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,47 +145,89 @@ export const fetchCacheFromS3 = async () => {

try {
const data = await s3.send(new ListObjectsV2Command(params));
const parallelLimit = pLimit(10);
const files = data.Contents || [];

for (const obj of data.Contents || []) {
const fileKey = obj.Key;
if (!fileKey) {
continue;
}
const fetchFileFromS3 = async ({
fileKey,
params,
}: {
fileKey: string;
params: { Bucket: string; Key: string };
}) => {
const fileStartTime = Date.now();
logger.debug('Fetching file from S3', {
fileKey,
});

const tempFilePath = path.join(
process.cwd(),
fileKey.replace(params.Key, 'tmp'),
);

const tmpFileDir = path.dirname(tempFilePath);
const finalFilePath = path.join(process.cwd(), fileKey);

await fs.promises.mkdir(tmpFileDir, { recursive: true });
try {
if (fs.existsSync(fileKey)) {
await fs.promises.mkdir(tmpFileDir, { recursive: true });
}

const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);
const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);

logger.debug('Saving cache to temp file', {
fileKey,
tempFilePath,
});
if (data.Body) {
const readableStream = data.Body as Readable;
logger.debug('Writing file to temporary location', {
fileKey,
tempFilePath,
});
await fs.promises.writeFile(tempFilePath, readableStream);
const warpCacheDir = path.dirname(finalFilePath);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(finalFilePath)) {
await fs.promises.unlink(finalFilePath);
}
logger.debug('Moving file to final location', {
fileKey,
finalFilePath,
tempFilePath,
});
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, finalFilePath);

if (data.Body) {
const readableStream = data.Body as Readable;
await fs.promises.writeFile(tempFilePath, readableStream);
logger.debug('Successfully saved file to local filesystem', {
logger.debug('Successfully fetched file from S3', {
fileKey,
finalFilePath,
durationMs: Date.now() - fileStartTime,
});
}
} catch (error: unknown) {
const message =
error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to fetch file from S3', {
error: message,
fileKey,
tempFilePath,
});
const warpCacheDir = path.dirname(fileKey);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(fileKey)) {
await fs.promises.unlink(fileKey);
}
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, fileKey);
}
}
};

logger.debug('Found files to fetch from S3', {
fileCount: files.length,
});

await Promise.all(
files.map(async (obj) => {
const fileKey = obj.Key;
if (!fileKey) {
return;
}
return parallelLimit(() => {
return fetchFileFromS3({ fileKey, params });
});
}),
);
logger.info('Successfully bootstrapped warp cache from S3', {
durationMs: Date.now() - startTimeMs,
});
Expand Down
Loading