Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #111 from ar-io/parallel-download
Browse files Browse the repository at this point in the history
feat(s3): download files from s3 concurrently
  • Loading branch information
dtfiedler authored Apr 24, 2024
2 parents 1aec46e + 04f1859 commit 4e1dabe
Showing 1 changed file with 68 additions and 26 deletions.
94 changes: 68 additions & 26 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,47 +145,89 @@ export const fetchCacheFromS3 = async () => {

try {
const data = await s3.send(new ListObjectsV2Command(params));
const parallelLimit = pLimit(10);
const files = data.Contents || [];

for (const obj of data.Contents || []) {
const fileKey = obj.Key;
if (!fileKey) {
continue;
}
const fetchFileFromS3 = async ({
fileKey,
params,
}: {
fileKey: string;
params: { Bucket: string; Key: string };
}) => {
const fileStartTime = Date.now();
logger.debug('Fetching file from S3', {
fileKey,
});

const tempFilePath = path.join(
process.cwd(),
fileKey.replace(params.Key, 'tmp'),
);

const tmpFileDir = path.dirname(tempFilePath);
const finalFilePath = path.join(process.cwd(), fileKey);

await fs.promises.mkdir(tmpFileDir, { recursive: true });
try {
if (fs.existsSync(fileKey)) {
await fs.promises.mkdir(tmpFileDir, { recursive: true });
}

const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);
const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);

logger.debug('Saving cache to temp file', {
fileKey,
tempFilePath,
});
if (data.Body) {
const readableStream = data.Body as Readable;
logger.debug('Writing file to temporary location', {
fileKey,
tempFilePath,
});
await fs.promises.writeFile(tempFilePath, readableStream);
const warpCacheDir = path.dirname(finalFilePath);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(finalFilePath)) {
await fs.promises.unlink(finalFilePath);
}
logger.debug('Moving file to final location', {
fileKey,
finalFilePath,
tempFilePath,
});
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, finalFilePath);

if (data.Body) {
const readableStream = data.Body as Readable;
await fs.promises.writeFile(tempFilePath, readableStream);
logger.debug('Successfully saved file to local filesystem', {
logger.debug('Successfully fetched file from S3', {
fileKey,
finalFilePath,
durationMs: Date.now() - fileStartTime,
});
}
} catch (error: unknown) {
const message =
error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to fetch file from S3', {
error: message,
fileKey,
tempFilePath,
});
const warpCacheDir = path.dirname(fileKey);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(fileKey)) {
await fs.promises.unlink(fileKey);
}
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, fileKey);
}
}
};

logger.debug('Found files to fetch from S3', {
fileCount: files.length,
});

await Promise.all(
files.map(async (obj) => {
const fileKey = obj.Key;
if (!fileKey) {
return;
}
return parallelLimit(() => {
return fetchFileFromS3({ fileKey, params });
});
}),
);
logger.info('Successfully bootstrapped warp cache from S3', {
durationMs: Date.now() - startTimeMs,
});
Expand Down

0 comments on commit 4e1dabe

Please sign in to comment.