Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #94 from ar-io/PE-5330-bootstrap
Browse files Browse the repository at this point in the history
feat(s3): bootstrap the cache from S3
  • Loading branch information
dtfiedler authored Jan 17, 2024
2 parents 9b595bf + ba07a35 commit fbbaba8
Show file tree
Hide file tree
Showing 7 changed files with 1,169 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lerna-debug.log*

# warp
cache
tmp
**/contracts
**/wallets

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
},
"dependencies": {
"@ardrive/ardrive-promise-cache": "^1.1.4",
"@aws-sdk/client-s3": "^3.490.0",
"@koa/cors": "^4.0.0",
"@koa/router": "^12.0.0",
"arweave": "^1.13.7",
Expand All @@ -45,6 +46,7 @@
"yaml": "^2.3.1"
},
"devDependencies": {
"@commitlint/config-conventional": "^17.7.0",
"@types/axios": "^0.14.0",
"@types/chai": "^4.3.5",
"@types/koa": "^2.13.8",
Expand All @@ -56,7 +58,6 @@
"@types/node": "^18.15.11",
"@typescript-eslint/eslint-plugin": "^5.59.11",
"@typescript-eslint/parser": "^5.59.11",
"@commitlint/config-conventional": "^17.7.0",
"axios": "^1.4.0",
"chai": "^4.3.7",
"commitlint": "^17.7.1",
Expand Down
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import * as system from './system';
const app = new Koa();

// load arns contract state on startup
system.prefetchContracts();
system.bootstrapCache();

// attach middlewares
app.use(loggerMiddleware);
Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
import { EvaluationOptions } from 'warp-contracts';

export const PREFETCH_CONTRACTS = process.env.PREFETCH_CONTRACTS === 'true';
export const BOOTSTRAP_CACHE = process.env.BOOTSTRAP_CACHE === 'true';
export const ARNS_CONTRACT_ID_REGEX = '([a-zA-Z0-9-_s+]{43})';
export const ARNS_NAME_REGEX = '([a-zA-Z0-9-s+]{1,51})';
export const SUB_CONTRACT_EVALUATION_TIMEOUT_MS = 10_000; // 10 sec state timeout - non configurable
Expand Down
2 changes: 1 addition & 1 deletion src/middleware/warp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ export const warp = WarpFactory.forMainnet(
new SqliteContractCache(
{
...defaultCacheOptions,
inMemory: false,
dbLocation: `./cache/warp/sqlite/state`,
},
{
maxEntriesPerContract: 10_000_000,
},
),
);
// TODO: useContractCache when they support custom gateways

export function warpMiddleware(ctx: KoaContext, next: Next) {
ctx.state.warp = warp;
Expand Down
106 changes: 94 additions & 12 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,39 @@ import { getContractState } from './api/warp';
import { prefetchContractTxIds } from './config';
import logger from './logger';
import { warp } from './middleware';
import {
S3Client,
ListObjectsV2Command,
GetObjectCommand,
} from '@aws-sdk/client-s3';
import fs from 'node:fs';
import { Readable } from 'stream';
import path from 'node:path';
import { BOOTSTRAP_CACHE, PREFETCH_CONTRACTS } from './constants';

export const bootstrapCache = async () => {
if (BOOTSTRAP_CACHE) {
await fetchCacheFromS3();
}

if (PREFETCH_CONTRACTS) {
await prefetchContracts();
}
};

let successfullyPrefetchedContracts = false;
const prefetchRequired = process.env.PREFETCH_CONTRACTS === 'true';
export const getPrefetchStatusCode = () => {
if (!prefetchRequired) {
if (!PREFETCH_CONTRACTS) {
return 200;
}
return successfullyPrefetchedContracts ? 200 : 503;
};

export const prefetchContracts = async () => {
if (!prefetchRequired) {
logger.info('Skipping pre-fetching contracts');
return;
}

const startTimeMs = Date.now();
logger.info('Pre-fetching contracts...', {
contractTxIds: prefetchContractTxIds,
});
// don't wait - just fire and forget
const prefetchResults = await Promise.all(
prefetchContractTxIds.map((contractTxId: string) => {
Expand All @@ -52,8 +69,6 @@ export const prefetchContracts = async () => {
const endTimestamp = Date.now();
logger.info('Successfully prefetched contract state', {
contractTxId,
startTimestamp,
endTimestamp,
durationMs: endTimestamp - startTimestamp,
});
return true;
Expand All @@ -64,8 +79,6 @@ export const prefetchContracts = async () => {
logger.error('Failed to prefetch contract state', {
error: message,
contractTxId,
startTimestamp,
endTimestamp,
durationMs: endTimestamp - startTimestamp,
});
// don't fail the entire prefetch operation if one contract fails
Expand All @@ -74,7 +87,7 @@ export const prefetchContracts = async () => {
}),
).catch((error: unknown) => {
const message = error instanceof Error ? error.message : error;
logger.error('Failed to prefetch contract state', {
logger.error('Failed to prefetch all contracts', {
error: message,
contractTxIds: prefetchContractTxIds,
});
Expand All @@ -88,5 +101,74 @@ export const prefetchContracts = async () => {
logger.info('Finished pre-fetching contracts', {
success: successfullyPrefetchedContracts,
contractTxIds: prefetchContractTxIds,
durationMs: Date.now() - startTimeMs,
});
};

export const fetchCacheFromS3 = async () => {
const startTimeMs = Date.now();
const s3 = new S3Client({
region: process.env.AWS_REGION,
});
const params = {
Bucket: process.env.WARP_CACHE_BUCKET || 'arns-warp-cache',
Key: process.env.WARP_CACHE_KEY || 'cache',
};

logger.info('Bootstrapping warp cache from S3', {
params,
});

try {
const data = await s3.send(new ListObjectsV2Command(params));

for (const obj of data.Contents || []) {
const fileKey = obj.Key;
if (!fileKey) {
continue;
}

const tempFilePath = path.join(
process.cwd(),
fileKey.replace(params.Key, 'tmp'),
);

const tmpFileDir = path.dirname(tempFilePath);

await fs.promises.mkdir(tmpFileDir, { recursive: true });

const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);

logger.debug('Saving cache to temp file', {
fileKey,
tempFilePath,
});

if (data.Body) {
const readableStream = data.Body as Readable;
await fs.promises.writeFile(tempFilePath, readableStream);
logger.debug('Successfully saved file to local filesystem', {
fileKey,
tempFilePath,
});
const warpCacheDir = path.dirname(fileKey);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(fileKey)) {
await fs.promises.unlink(fileKey);
}
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, fileKey);
}
}
logger.info('Successfully bootstrapped warp cache from S3', {
durationMs: Date.now() - startTimeMs,
});
} catch (error: unknown) {
const message = error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to bootstrap cache from S3', {
error: message,
});
}
};
Loading

0 comments on commit fbbaba8

Please sign in to comment.