Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

feat(s3): bootstrap the cache from S3 #94

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lerna-debug.log*

# warp
cache
tmp
**/contracts
**/wallets

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
},
"dependencies": {
"@ardrive/ardrive-promise-cache": "^1.1.4",
"@aws-sdk/client-s3": "^3.490.0",
"@koa/cors": "^4.0.0",
"@koa/router": "^12.0.0",
"arweave": "^1.13.7",
Expand All @@ -45,6 +46,7 @@
"yaml": "^2.3.1"
},
"devDependencies": {
"@commitlint/config-conventional": "^17.7.0",
"@types/axios": "^0.14.0",
"@types/chai": "^4.3.5",
"@types/koa": "^2.13.8",
Expand All @@ -56,7 +58,6 @@
"@types/node": "^18.15.11",
"@typescript-eslint/eslint-plugin": "^5.59.11",
"@typescript-eslint/parser": "^5.59.11",
"@commitlint/config-conventional": "^17.7.0",
"axios": "^1.4.0",
"chai": "^4.3.7",
"commitlint": "^17.7.1",
Expand Down
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import * as system from './system';
const app = new Koa();

// load arns contract state on startup
system.prefetchContracts();
system.bootstrapCache();

// attach middlewares
app.use(loggerMiddleware);
Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
import { EvaluationOptions } from 'warp-contracts';

export const PREFETCH_CONTRACTS = process.env.PREFETCH_CONTRACTS === 'true';
export const BOOTSTRAP_CACHE = process.env.BOOTSTRAP_CACHE === 'true';
export const ARNS_CONTRACT_ID_REGEX = '([a-zA-Z0-9-_s+]{43})';
export const ARNS_NAME_REGEX = '([a-zA-Z0-9-s+]{1,51})';
export const SUB_CONTRACT_EVALUATION_TIMEOUT_MS = 10_000; // 10 sec state timeout - non configurable
Expand Down
2 changes: 1 addition & 1 deletion src/middleware/warp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ export const warp = WarpFactory.forMainnet(
new SqliteContractCache(
{
...defaultCacheOptions,
inMemory: false,
dbLocation: `./cache/warp/sqlite/state`,
},
{
maxEntriesPerContract: 10_000_000,
},
),
);
// TODO: useContractCache when they support custom gateways

export function warpMiddleware(ctx: KoaContext, next: Next) {
ctx.state.warp = warp;
Expand Down
106 changes: 94 additions & 12 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,39 @@ import { getContractState } from './api/warp';
import { prefetchContractTxIds } from './config';
import logger from './logger';
import { warp } from './middleware';
import {
S3Client,
ListObjectsV2Command,
GetObjectCommand,
} from '@aws-sdk/client-s3';
import fs from 'node:fs';
import { Readable } from 'stream';
import path from 'node:path';
import { BOOTSTRAP_CACHE, PREFETCH_CONTRACTS } from './constants';

export const bootstrapCache = async () => {
if (BOOTSTRAP_CACHE) {
await fetchCacheFromS3();
}

if (PREFETCH_CONTRACTS) {
await prefetchContracts();
}
};

let successfullyPrefetchedContracts = false;
const prefetchRequired = process.env.PREFETCH_CONTRACTS === 'true';
export const getPrefetchStatusCode = () => {
if (!prefetchRequired) {
if (!PREFETCH_CONTRACTS) {
return 200;
}
return successfullyPrefetchedContracts ? 200 : 503;
};

export const prefetchContracts = async () => {
if (!prefetchRequired) {
logger.info('Skipping pre-fetching contracts');
return;
}

const startTimeMs = Date.now();
logger.info('Pre-fetching contracts...', {
contractTxIds: prefetchContractTxIds,
});
// don't wait - just fire and forget
const prefetchResults = await Promise.all(
prefetchContractTxIds.map((contractTxId: string) => {
Expand All @@ -52,8 +69,6 @@ export const prefetchContracts = async () => {
const endTimestamp = Date.now();
logger.info('Successfully prefetched contract state', {
contractTxId,
startTimestamp,
endTimestamp,
durationMs: endTimestamp - startTimestamp,
});
return true;
Expand All @@ -64,8 +79,6 @@ export const prefetchContracts = async () => {
logger.error('Failed to prefetch contract state', {
error: message,
contractTxId,
startTimestamp,
endTimestamp,
durationMs: endTimestamp - startTimestamp,
});
// don't fail the entire prefetch operation if one contract fails
Expand All @@ -74,7 +87,7 @@ export const prefetchContracts = async () => {
}),
).catch((error: unknown) => {
const message = error instanceof Error ? error.message : error;
logger.error('Failed to prefetch contract state', {
logger.error('Failed to prefetch all contracts', {
error: message,
contractTxIds: prefetchContractTxIds,
});
Expand All @@ -88,5 +101,74 @@ export const prefetchContracts = async () => {
logger.info('Finished pre-fetching contracts', {
success: successfullyPrefetchedContracts,
contractTxIds: prefetchContractTxIds,
durationMs: Date.now() - startTimeMs,
});
};

export const fetchCacheFromS3 = async () => {
const startTimeMs = Date.now();
const s3 = new S3Client({
region: process.env.AWS_REGION,
});
const params = {
Bucket: process.env.WARP_CACHE_BUCKET || 'arns-warp-cache',
Key: process.env.WARP_CACHE_KEY || 'cache',
};

logger.info('Bootstrapping warp cache from S3', {
params,
});

try {
const data = await s3.send(new ListObjectsV2Command(params));

for (const obj of data.Contents || []) {
const fileKey = obj.Key;
if (!fileKey) {
continue;
}

const tempFilePath = path.join(
process.cwd(),
fileKey.replace(params.Key, 'tmp'),
);

const tmpFileDir = path.dirname(tempFilePath);

await fs.promises.mkdir(tmpFileDir, { recursive: true });

const data = await s3.send(
new GetObjectCommand({ ...params, Key: fileKey }),
);

logger.debug('Saving cache to temp file', {
fileKey,
tempFilePath,
});

if (data.Body) {
const readableStream = data.Body as Readable;
await fs.promises.writeFile(tempFilePath, readableStream);
logger.debug('Successfully saved file to local filesystem', {
fileKey,
tempFilePath,
});
const warpCacheDir = path.dirname(fileKey);
await fs.promises.mkdir(warpCacheDir, { recursive: true });
if (fs.existsSync(fileKey)) {
await fs.promises.unlink(fileKey);
}
// moves the file from the temp location to the final location
await fs.promises.rename(tempFilePath, fileKey);
}
}
logger.info('Successfully bootstrapped warp cache from S3', {
durationMs: Date.now() - startTimeMs,
});
} catch (error: unknown) {
const message = error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to bootstrap cache from S3', {
error: message,
});
}
};
Loading
Loading