Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

Commit

Permalink
feat(s3): download files from s3 concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
dtfiedler committed Apr 24, 2024
1 parent 1aec46e commit 347cc30
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,19 @@ export const fetchCacheFromS3 = async () => {
try {
const data = await s3.send(new ListObjectsV2Command(params));

for (const obj of data.Contents || []) {
const fileKey = obj.Key;
if (!fileKey) {
continue;
}
const parallelLimit = pLimit(10);
const files = data.Contents || [];

const fetchFileFromS3 = async ({
fileKey,
params,
}: {
fileKey: string;
params: { Bucket: string; Key: string };
}) => {
logger.debug('Fetching file from S3', {
fileKey,
});

const tempFilePath = path.join(
process.cwd(),
Expand All @@ -159,33 +167,56 @@ export const fetchCacheFromS3 = async () => {

const tmpFileDir = path.dirname(tempFilePath);

await fs.promises.mkdir(tmpFileDir, { recursive: true });
try {
if (fs.existsSync(fileKey)) {
await fs.promises.mkdir(tmpFileDir, { recursive: true });
}

const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);
const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);

logger.debug('Saving cache to temp file', {
fileKey,
tempFilePath,
});
if (data.Body) {
const readableStream = data.Body as Readable;
await fs.promises.writeFile(tempFilePath, readableStream);
logger.debug('Successfully saved file to local filesystem', {
fileKey,
tempFilePath,
});
const warpCacheDir = path.dirname(fileKey);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(fileKey)) {
await fs.promises.unlink(fileKey);
}
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, fileKey);

if (data.Body) {
const readableStream = data.Body as Readable;
await fs.promises.writeFile(tempFilePath, readableStream);
logger.debug('Successfully saved file to local filesystem', {
logger.debug('Successfully fetched file from S3', {
fileKey,
tempFilePath,
});
}
} catch (error: unknown) {
const message =
error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to fetch file from S3', {
error: message,
fileKey,
tempFilePath,
});
const warpCacheDir = path.dirname(fileKey);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(fileKey)) {
await fs.promises.unlink(fileKey);
}
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, fileKey);
}
}
};

await Promise.all(
files.map(async (obj) => {
const fileKey = obj.Key;
if (!fileKey) {
return;
}
return parallelLimit(() => {
return fetchFileFromS3({ fileKey, params });
});
}),
);
logger.info('Successfully bootstrapped warp cache from S3', {
durationMs: Date.now() - startTimeMs,
});
Expand Down

0 comments on commit 347cc30

Please sign in to comment.