diff --git a/config/image-cache.js b/config/image-cache.js deleted file mode 100755 index d9d6f100..00000000 --- a/config/image-cache.js +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env node - -/** - * This script is used to upload token metadata images to a Google Cloud Storage bucket. It also - * provides the option to resize an image to a max width before uploading so file sizes are more - * manageable upon display. - * - * The following arguments are taken in order from `argv`: - * * Remote image URL - * * Smart Contract principal - * * Token number - * - * Functionality can be tweaked with the following ENV vars: - * * `IMAGE_CACHE_MAX_BYTE_SIZE`: Max payload size accepted when downloading remote images. - * * `IMAGE_CACHE_RESIZE_WIDTH`: Width to resize images into while preserving aspect ratio. - * * `IMAGE_CACHE_GCS_BUCKET_NAME`: Google Cloud Storage bucket name. Example: 'assets.dev.hiro.so' - * * `IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX`: Path for object storage inside the bucket. Example: - * 'token-metadata-api/mainnet/' - * * `IMAGE_CACHE_GCS_AUTH_TOKEN`: Google Cloud Storage authorization token. If undefined, the token - * will be fetched dynamically from Google. - * * `IMAGE_CACHE_CDN_BASE_PATH`: Base path for URLs that will be returned to the API for storage. - * Example: 'https://assets.dev.hiro.so/token-metadata-api/mainnet/' - */ - -const sharp = require('sharp'); -const { request, fetch, Agent } = require('undici'); -const { Readable, PassThrough } = require('node:stream'); - -const IMAGE_URL = process.argv[2]; -const CONTRACT_PRINCIPAL = process.argv[3]; -const TOKEN_NUMBER = process.argv[4]; - -const IMAGE_RESIZE_WIDTH = parseInt(process.env['IMAGE_CACHE_RESIZE_WIDTH'] ?? '300'); -const GCS_BUCKET_NAME = process.env['IMAGE_CACHE_GCS_BUCKET_NAME']; -const GCS_OBJECT_NAME_PREFIX = process.env['IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX']; -const CDN_BASE_PATH = process.env['IMAGE_CACHE_CDN_BASE_PATH']; -const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30000'); -const MAX_REDIRECTIONS = parseInt(process.env['METADATA_FETCH_MAX_REDIRECTIONS'] ?? '0'); -const MAX_RESPONSE_SIZE = parseInt(process.env['IMAGE_CACHE_MAX_BYTE_SIZE'] ?? '-1'); - -async function getGcsAuthToken() { - const envToken = process.env['IMAGE_CACHE_GCS_AUTH_TOKEN']; - if (envToken !== undefined) return envToken; - try { - const response = await request( - 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token', - { - method: 'GET', - headers: { 'Metadata-Flavor': 'Google' }, - throwOnError: true, - } - ); - const json = await response.body.json(); - // Cache the token so we can reuse it for other images. - process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token; - return json.access_token; - } catch (error) { - throw new Error(`GCS access token error: ${error}`); - } -} - -async function upload(stream, name, authToken) { - await request( - `https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`, - { - method: 'POST', - body: stream, - headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` }, - throwOnError: true, - } - ); - return `${CDN_BASE_PATH}${name}`; -} - -fetch( - IMAGE_URL, - { - dispatcher: new Agent({ - headersTimeout: TIMEOUT, - bodyTimeout: TIMEOUT, - maxRedirections: MAX_REDIRECTIONS, - maxResponseSize: MAX_RESPONSE_SIZE, - throwOnError: true, - connect: { - rejectUnauthorized: false, // Ignore SSL cert errors. - }, - }), - }, - ({ body }) => body -) - .then(async response => { - const imageReadStream = Readable.fromWeb(response.body); - const passThrough = new PassThrough(); - const fullSizeTransform = sharp().png(); - const thumbnailTransform = sharp() - .resize({ width: IMAGE_RESIZE_WIDTH, withoutEnlargement: true }) - .png(); - imageReadStream.pipe(passThrough); - passThrough.pipe(fullSizeTransform); - passThrough.pipe(thumbnailTransform); - - let didRetryUnauthorized = false; - while (true) { - const authToken = await getGcsAuthToken(); - try { - const results = await Promise.all([ - upload(fullSizeTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}.png`, authToken), - upload(thumbnailTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}-thumb.png`, authToken), - ]); - for (const r of results) console.log(r); - break; - } catch (error) { - if ( - !didRetryUnauthorized && - error.cause && - error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' && - (error.cause.statusCode === 401 || error.cause.statusCode === 403) - ) { - // GCS token is probably expired. Force a token refresh before trying again. - process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = undefined; - didRetryUnauthorized = true; - } else throw error; - } - } - }) - .catch(error => { - console.error(error); - // TODO: Handle `Input buffer contains unsupported image format` error from sharp when the image - // is actually a video or another media file. - let exitCode = 1; - if ( - error.cause && - (error.cause.code == 'UND_ERR_HEADERS_TIMEOUT' || - error.cause.code == 'UND_ERR_BODY_TIMEOUT' || - error.cause.code == 'UND_ERR_CONNECT_TIMEOUT' || - error.cause.code == 'ECONNRESET') - ) { - exitCode = 2; - } else if ( - error.cause && - error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' && - error.cause.statusCode === 429 - ) { - exitCode = 3; - } - process.exit(exitCode); - }); diff --git a/package.json b/package.json index d9869211..37abc86d 100644 --- a/package.json +++ b/package.json @@ -14,9 +14,9 @@ "test:api": "npm run test -- ./tests/api/", "test:chainhook": "npm run test -- ./tests/chainhook/", "test:token-queue": "npm run test -- ./tests/token-queue/", - "testenv:run": "docker-compose -f docker/docker-compose.dev.postgres.yml up", - "testenv:stop": "docker-compose -f docker/docker-compose.dev.postgres.yml down -v -t 0", - "testenv:logs": "docker-compose -f docker/docker-compose.dev.postgres.yml logs -t -f", + "testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml up", + "testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml down -v -t 0", + "testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml logs -t -f", "migrate": "ts-node node_modules/.bin/node-pg-migrate -j ts", "lint:eslint": "eslint . --ext .js,.jsx,.ts,.tsx -f unix", "lint:prettier": "prettier --check src/**/*.ts tests/**/*.ts migrations/**/*.ts", diff --git a/src/admin-rpc/init.ts b/src/admin-rpc/init.ts index 71762ed8..cd7fe9f8 100644 --- a/src/admin-rpc/init.ts +++ b/src/admin-rpc/init.ts @@ -5,9 +5,10 @@ import { Server } from 'http'; import { Type } from '@sinclair/typebox'; import { SmartContractRegEx } from '../api/schemas'; import { logger, PINO_LOGGER_CONFIG } from '@hirosystems/api-toolkit'; -import { reprocessTokenImageCache } from '../token-processor/util/image-cache'; +import { reprocessTokenImageCache } from '../token-processor/images/image-cache'; import { StacksNodeRpcClient } from '../token-processor/stacks-node/stacks-node-rpc-client'; import { getSmartContractSip } from '../token-processor/util/sip-validation'; +import { ENV } from '../env'; export const AdminApi: FastifyPluginCallback, Server, TypeBoxTypeProvider> = ( fastify, @@ -83,6 +84,10 @@ export const AdminApi: FastifyPluginCallback, Server, TypeB }, }, async (request, reply) => { + if (!ENV.IMAGE_CACHE_PROCESSOR_ENABLED) { + await reply.code(422).send({ error: 'Image cache processor is not enabled' }); + return; + } logger.info( `AdminRPC reprocessing image cache for ${request.body.contractId}: (${ request.body.tokenIds ?? 'all' diff --git a/src/env.ts b/src/env.ts index 7c056e54..a3047a54 100644 --- a/src/env.ts +++ b/src/env.ts @@ -93,12 +93,6 @@ const schema = Type.Object({ * service in our token queue. Defaults to 50,000. */ METADATA_MAX_NFT_CONTRACT_TOKEN_COUNT: Type.Number({ default: 50_000 }), - /** - * Configure a script to handle image URLs during token metadata processing. Must be an executable - * script that accepts the URL as the first program argument and outputs a result URL to stdout. - * Example: ./config/image-cache.js - */ - METADATA_IMAGE_CACHE_PROCESSOR: Type.Optional(Type.String()), /** * How often will token metadata that is marked `dynamic` will be refreshed if it doesn't specify * an explicit TTL (seconds). See SIP-019 for more information. Defaults to 86400 seconds (24 @@ -126,6 +120,22 @@ const schema = Type.Object({ * `https://arweave.net`. */ PUBLIC_GATEWAY_ARWEAVE: Type.String({ default: 'https://arweave.net' }), + + /** Enables token image uploads to a Google Cloud Storage bucket. */ + IMAGE_CACHE_PROCESSOR_ENABLED: Type.Boolean({ default: false }), + /** Width to resize images into while preserving aspect ratio. */ + IMAGE_CACHE_RESIZE_WIDTH: Type.Integer({ default: 300 }), + /** Google Cloud Storage bucket name. Example: 'assets.dev.hiro.so' */ + IMAGE_CACHE_GCS_BUCKET_NAME: Type.Optional(Type.String()), + /** Path for object storage inside the bucket. Example: 'token-metadata-api/mainnet/' */ + IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX: Type.Optional(Type.String()), + /** + * Base path for URLs that will be returned to the API for storage. Example: + * 'https://assets.dev.hiro.so/token-metadata-api/mainnet/' + */ + IMAGE_CACHE_CDN_BASE_PATH: Type.Optional(Type.String()), + /** Max payload size accepted when downloading remote images. */ + IMAGE_CACHE_MAX_BYTE_SIZE: Type.Optional(Type.Integer()), }); type Env = Static; diff --git a/src/token-processor/images/image-cache.ts b/src/token-processor/images/image-cache.ts new file mode 100644 index 00000000..92df36e8 --- /dev/null +++ b/src/token-processor/images/image-cache.ts @@ -0,0 +1,186 @@ +import { ENV } from '../../env'; +import { parseDataUrl, getFetchableDecentralizedStorageUrl } from '../util/metadata-helpers'; +import { logger } from '@hirosystems/api-toolkit'; +import { PgStore } from '../../pg/pg-store'; +import { PassThrough, Readable } from 'node:stream'; +import * as sharp from 'sharp'; +import { Agent, fetch, request, errors, Response } from 'undici'; +import { + HttpError, + MetadataParseError, + MetadataTimeoutError, + TooManyRequestsHttpError, + UndiciCauseTypeError, +} from '../util/errors'; + +let gcsAuthToken: string | undefined; + +async function getGcsAuthToken(): Promise { + if (gcsAuthToken !== undefined) return gcsAuthToken; + try { + const response = await request( + 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token', + { + method: 'GET', + headers: { 'Metadata-Flavor': 'Google' }, + throwOnError: true, + } + ); + const json = (await response.body.json()) as { access_token: string }; + // Cache the token so we can reuse it for other images. + gcsAuthToken = json.access_token; + return json.access_token; + } catch (error) { + throw new Error(`GCS access token error: ${error}`); + } +} + +async function uploadToGcs(stream: Readable, name: string, authToken: string) { + await request( + `https://storage.googleapis.com/upload/storage/v1/b/${ENV.IMAGE_CACHE_GCS_BUCKET_NAME}/o?uploadType=media&name=${ENV.IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX}${name}`, + { + method: 'POST', + body: stream, + headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` }, + throwOnError: true, + } + ); + return `${ENV.IMAGE_CACHE_CDN_BASE_PATH}${name}`; +} + +/** + * Uploads processed token metadata images to a Google Cloud Storage bucket. It also provides the + * option to resize the image to a max width before uploading so file sizes are more manageable upon + * display. + * + * For a list of configuration options, see `env.ts`. + */ +export async function processImageCache( + imgUrl: string, + contractPrincipal: string, + tokenNumber: bigint +): Promise { + logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`); + if (imgUrl.startsWith('data:')) return [imgUrl]; + + // Fetch original image. + let fetchResponse: Response; + try { + fetchResponse = await fetch(imgUrl, { + dispatcher: new Agent({ + headersTimeout: ENV.METADATA_FETCH_TIMEOUT_MS, + bodyTimeout: ENV.METADATA_FETCH_TIMEOUT_MS, + maxRedirections: ENV.METADATA_FETCH_MAX_REDIRECTIONS, + maxResponseSize: ENV.IMAGE_CACHE_MAX_BYTE_SIZE, + connect: { + rejectUnauthorized: false, // Ignore SSL cert errors. + }, + }), + }); + } catch (error) { + if (error instanceof TypeError) { + const typeError = error as UndiciCauseTypeError; + if ( + typeError.cause instanceof errors.HeadersTimeoutError || + typeError.cause instanceof errors.BodyTimeoutError || + typeError.cause instanceof errors.ConnectTimeoutError + ) { + throw new MetadataTimeoutError(new URL(imgUrl)); + } + } + throw new HttpError(`ImageCache fetch error: ${imgUrl} ${error}`, error); + } + if (fetchResponse.status == 429) { + throw new TooManyRequestsHttpError(new URL(imgUrl), new errors.ResponseStatusCodeError()); + } + const imageBody = fetchResponse.body; + if (!fetchResponse.ok || !imageBody) { + throw new HttpError( + `ImageCache fetch error`, + new errors.ResponseStatusCodeError(fetchResponse.statusText, fetchResponse.status) + ); + } + + // Transform image. + let fullSizeTransform: sharp.Sharp; + let thumbnailTransform: sharp.Sharp; + try { + const imageReadStream = Readable.fromWeb(imageBody); + const passThrough = new PassThrough(); + fullSizeTransform = sharp().png(); + thumbnailTransform = sharp() + .resize({ width: ENV.IMAGE_CACHE_RESIZE_WIDTH, withoutEnlargement: true }) + .png(); + imageReadStream.pipe(passThrough); + passThrough.pipe(fullSizeTransform); + passThrough.pipe(thumbnailTransform); + } catch (error) { + throw new MetadataParseError(`ImageCache error transforming image: ${error}`); + } + + let didRetryUnauthorized = false; + while (true) { + const authToken = await getGcsAuthToken(); + try { + const results = await Promise.all([ + uploadToGcs(fullSizeTransform, `${contractPrincipal}/${tokenNumber}.png`, authToken), + uploadToGcs(thumbnailTransform, `${contractPrincipal}/${tokenNumber}-thumb.png`, authToken), + ]); + return results; + } catch (error) { + if ( + !didRetryUnauthorized && + error instanceof errors.ResponseStatusCodeError && + (error.statusCode === 401 || error.statusCode === 403) + ) { + // GCS token is probably expired. Force a token refresh before trying again. + gcsAuthToken = undefined; + didRetryUnauthorized = true; + } else throw new HttpError(`ImageCache upload error: ${error}`, error); + } + } +} + +/** + * Converts a raw image URI from metadata into a fetchable URL. + * @param uri - Original image URI + * @returns Normalized URL string + */ +export function normalizeImageUri(uri: string): string { + // Support images embedded in a Data URL + if (uri.startsWith('data:')) { + const dataUrl = parseDataUrl(uri); + if (!dataUrl) { + throw new MetadataParseError(`Data URL could not be parsed: ${uri}`); + } + if (!dataUrl.mediaType?.startsWith('image/')) { + throw new MetadataParseError(`Token image is a Data URL with a non-image media type: ${uri}`); + } + return uri; + } + const fetchableUrl = getFetchableDecentralizedStorageUrl(uri); + return fetchableUrl.toString(); +} + +export async function reprocessTokenImageCache( + db: PgStore, + contractPrincipal: string, + tokenIds?: number[] +): Promise { + await db.sqlWriteTransaction(async sql => { + const imageUris = await db.getTokenImageUris(contractPrincipal, tokenIds); + for (const token of imageUris) { + try { + const [cached, thumbnail] = await processImageCache( + getFetchableDecentralizedStorageUrl(token.image).toString(), + contractPrincipal, + BigInt(token.token_number) + ); + if (cached && thumbnail) + await db.updateTokenCachedImages(token.token_id, cached, thumbnail); + } catch (error) { + logger.error(error, `ImageCache unable to reprocess token image cache`); + } + } + }); +} diff --git a/src/token-processor/util/errors.ts b/src/token-processor/util/errors.ts index d87394be..51882944 100644 --- a/src/token-processor/util/errors.ts +++ b/src/token-processor/util/errors.ts @@ -1,6 +1,10 @@ import { errors } from 'undici'; import { parseRetryAfterResponseHeader } from './helpers'; +export interface UndiciCauseTypeError extends TypeError { + cause?: unknown; +} + /** Tags an error as a user error i.e. caused by a bad contract, incorrect SIP-016 metadata, etc. */ export class UserError extends Error {} @@ -15,9 +19,11 @@ export class MetadataSizeExceededError extends UserError { /** Thrown when fetching metadata exceeds the max allowed timeout */ export class MetadataTimeoutError extends UserError { - constructor(message: string) { + public url: URL; + + constructor(url: URL) { super(); - this.message = message; + this.url = url; this.name = this.constructor.name; } } diff --git a/src/token-processor/util/helpers.ts b/src/token-processor/util/helpers.ts index c8f7e34d..d99cd1c3 100644 --- a/src/token-processor/util/helpers.ts +++ b/src/token-processor/util/helpers.ts @@ -12,26 +12,6 @@ export function dbSipNumberToDbTokenType(sip: DbSipNumber): DbTokenType { } } -export type Waiter = Promise & { - finish: (result: T) => void; - isFinished: boolean; -}; - -export function waiter(): Waiter { - let resolveFn: (result: T) => void; - const promise = new Promise(resolve => { - resolveFn = resolve; - }); - const completer = { - finish: (result: T) => { - completer.isFinished = true; - resolveFn(result); - }, - isFinished: false, - }; - return Object.assign(promise, completer); -} - /** * Parses a `Retry-After` HTTP header from an undici 429 `ResponseStatusCodeError` error so we can * determine when we can try calling the same host again looking for metadata. diff --git a/src/token-processor/util/image-cache.ts b/src/token-processor/util/image-cache.ts deleted file mode 100644 index 1c11aac9..00000000 --- a/src/token-processor/util/image-cache.ts +++ /dev/null @@ -1,142 +0,0 @@ -import * as child_process from 'child_process'; -import { ENV } from '../../env'; -import { MetadataParseError, MetadataTimeoutError, TooManyRequestsHttpError } from './errors'; -import { parseDataUrl, getFetchableDecentralizedStorageUrl } from './metadata-helpers'; -import { logger } from '@hirosystems/api-toolkit'; -import { PgStore } from '../../pg/pg-store'; -import { errors } from 'undici'; -import { RetryableJobError } from '../queue/errors'; - -/** - * If an external image processor script is configured in the `METADATA_IMAGE_CACHE_PROCESSOR` ENV - * var, this function will process the given image URL for the purpose of caching on a CDN (or - * whatever else it may be created to do). The script is expected to return a new URL for the image - * via `stdout`, with an optional 2nd line with another URL for a thumbnail version of the same - * cached image. If the script is not configured, then the original URL is returned immediately. If - * a data-uri is passed, it is also immediately returned without being passed to the script. - * - * The Image Cache script must return a status code of `0` to mark a successful cache. Other code - * returns available are: - * * `1`: A generic error occurred. Cache should not be retried. - * * `2`: Image fetch timed out before caching was possible. Should be retried. - * * `3`: Image fetch failed due to rate limits from the remote server. Should be retried. - */ -export async function processImageCache( - imgUrl: string, - contractPrincipal: string, - tokenNumber: bigint -): Promise { - const imageCacheProcessor = ENV.METADATA_IMAGE_CACHE_PROCESSOR; - if (!imageCacheProcessor || imgUrl.startsWith('data:')) return [imgUrl]; - logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`); - const { code, stdout, stderr } = await callImageCacheScript( - imageCacheProcessor, - imgUrl, - contractPrincipal, - tokenNumber - ); - switch (code) { - case 0: - try { - const urls = stdout - .trim() - .split('\n') - .map(r => new URL(r).toString()); - logger.info(urls, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`); - return urls; - } catch (error) { - // The script returned a code `0` but the results are invalid. This could happen because of - // an unknown script error so we should mark it as retryable. - throw new RetryableJobError( - `ImageCache unknown error`, - new Error(`Invalid cached url for ${imgUrl}: ${stdout}, stderr: ${stderr}`) - ); - } - case 2: - throw new RetryableJobError(`ImageCache fetch timed out`, new MetadataTimeoutError(imgUrl)); - case 3: - throw new RetryableJobError( - `ImageCache fetch rate limited`, - new TooManyRequestsHttpError(new URL(imgUrl), new errors.ResponseStatusCodeError()) - ); - default: - throw new Error(`ImageCache script error (code ${code}): ${stderr}`); - } -} - -async function callImageCacheScript( - imageCacheProcessor: string, - imgUrl: string, - contractPrincipal: string, - tokenNumber: bigint -): Promise<{ - code: number; - stdout: string; - stderr: string; -}> { - const repoDir = process.cwd(); - return await new Promise<{ - code: number; - stdout: string; - stderr: string; - }>(resolve => { - const cp = child_process.spawn( - imageCacheProcessor, - [imgUrl, contractPrincipal, tokenNumber.toString()], - { cwd: repoDir } - ); - let code = 0; - let stdout = ''; - let stderr = ''; - cp.stdout.on('data', data => (stdout += data)); - cp.stderr.on('data', data => (stderr += data)); - cp.on('close', _ => resolve({ code, stdout, stderr })); - cp.on('exit', processCode => { - code = processCode ?? 0; - }); - }); -} - -/** - * Converts a raw image URI from metadata into a fetchable URL. - * @param uri - Original image URI - * @returns Normalized URL string - */ -export function normalizeImageUri(uri: string): string { - // Support images embedded in a Data URL - if (uri.startsWith('data:')) { - const dataUrl = parseDataUrl(uri); - if (!dataUrl) { - throw new MetadataParseError(`Data URL could not be parsed: ${uri}`); - } - if (!dataUrl.mediaType?.startsWith('image/')) { - throw new MetadataParseError(`Token image is a Data URL with a non-image media type: ${uri}`); - } - return uri; - } - const fetchableUrl = getFetchableDecentralizedStorageUrl(uri); - return fetchableUrl.toString(); -} - -export async function reprocessTokenImageCache( - db: PgStore, - contractPrincipal: string, - tokenIds?: number[] -): Promise { - await db.sqlWriteTransaction(async sql => { - const imageUris = await db.getTokenImageUris(contractPrincipal, tokenIds); - for (const token of imageUris) { - try { - const [cached, thumbnail] = await processImageCache( - getFetchableDecentralizedStorageUrl(token.image).toString(), - contractPrincipal, - BigInt(token.token_number) - ); - if (cached && thumbnail) - await db.updateTokenCachedImages(token.token_id, cached, thumbnail); - } catch (error) { - logger.error(error, `ImageCache unable to reprocess token image cache`); - } - } - }); -} diff --git a/src/token-processor/util/metadata-helpers.ts b/src/token-processor/util/metadata-helpers.ts index 8bfbd37f..2c538f7e 100644 --- a/src/token-processor/util/metadata-helpers.ts +++ b/src/token-processor/util/metadata-helpers.ts @@ -18,7 +18,7 @@ import { TooManyRequestsHttpError, } from './errors'; import { RetryableJobError } from '../queue/errors'; -import { normalizeImageUri, processImageCache } from './image-cache'; +import { normalizeImageUri, processImageCache } from '../images/image-cache'; import { RawMetadataLocale, RawMetadataLocalizationCType, @@ -53,37 +53,69 @@ export async function fetchAllMetadataLocalesFromBaseUri( token: DbToken ): Promise { const tokenUri = getTokenSpecificUri(uri, token.token_number); - const rawMetadataLocales: RawMetadataLocale[] = []; + let insertBundles: DbMetadataLocaleInsertBundle[] = []; - const defaultMetadata = await getMetadataFromUri(tokenUri); - rawMetadataLocales.push({ - metadata: defaultMetadata, - default: true, - uri: tokenUri, - }); + // We'll try to fetch metadata and give it `METADATA_MAX_IMMEDIATE_URI_RETRIES` attempts + // for the external service to return a reasonable response, otherwise we'll consider the + // metadata as dead. + let fetchImmediateRetryCount = 0; + do { + try { + const rawMetadataLocales: RawMetadataLocale[] = []; - // Does it declare localizations? If so, fetch and parse all of them. - if (RawMetadataLocalizationCType.Check(defaultMetadata.localization)) { - const uri = defaultMetadata.localization.uri; - const locales = defaultMetadata.localization.locales; - rawMetadataLocales[0].locale = defaultMetadata.localization.default; - for (const locale of locales) { - if (locale === rawMetadataLocales[0].locale) { - // Skip the default, we already have it. - continue; - } - const localeUri = getTokenSpecificUri(uri, token.token_number, locale); - const localeMetadata = await getMetadataFromUri(localeUri); + const defaultMetadata = await getMetadataFromUri(tokenUri); rawMetadataLocales.push({ - metadata: localeMetadata, - locale: locale, - default: false, - uri: localeUri, + metadata: defaultMetadata, + default: true, + uri: tokenUri, }); + + // Does it declare localizations? If so, fetch and parse all of them. + if (RawMetadataLocalizationCType.Check(defaultMetadata.localization)) { + const uri = defaultMetadata.localization.uri; + const locales = defaultMetadata.localization.locales; + rawMetadataLocales[0].locale = defaultMetadata.localization.default; + for (const locale of locales) { + if (locale === rawMetadataLocales[0].locale) { + // Skip the default, we already have it. + continue; + } + const localeUri = getTokenSpecificUri(uri, token.token_number, locale); + const localeMetadata = await getMetadataFromUri(localeUri); + rawMetadataLocales.push({ + metadata: localeMetadata, + locale: locale, + default: false, + uri: localeUri, + }); + } + } + + insertBundles = await parseMetadataForInsertion(rawMetadataLocales, contract, token); + break; + } catch (error) { + fetchImmediateRetryCount++; + if ( + error instanceof MetadataTimeoutError && + isUriFromDecentralizedStorage(error.url.toString()) + ) { + // Gateways like IPFS and Arweave commonly time out when a resource can't be found quickly. + // Try again later if this is the case. + throw new RetryableJobError(`Gateway timeout for ${error.url}`, error); + } else if (error instanceof TooManyRequestsHttpError) { + // 429 status codes are common when fetching metadata for thousands of tokens in the same + // server. + throw new RetryableJobError(`Too many requests for ${error.url}`, error); + } else if ( + error instanceof MetadataSizeExceededError || + fetchImmediateRetryCount >= ENV.METADATA_MAX_IMMEDIATE_URI_RETRIES + ) { + throw error; + } } - } + } while (fetchImmediateRetryCount < ENV.METADATA_MAX_IMMEDIATE_URI_RETRIES); - return parseMetadataForInsertion(rawMetadataLocales, contract, token); + return insertBundles; } /** @@ -133,7 +165,7 @@ async function parseMetadataForInsertion( null; let cachedImage: string | undefined; let cachedThumbnailImage: string | undefined; - if (image && typeof image === 'string') { + if (image && typeof image === 'string' && ENV.IMAGE_CACHE_PROCESSOR_ENABLED) { const normalizedUrl = normalizeImageUri(image); [cachedImage, cachedThumbnailImage] = await processImageCache( normalizedUrl, @@ -221,7 +253,7 @@ export async function fetchMetadata(httpUrl: URL): Promise { error instanceof errors.BodyTimeoutError || error instanceof errors.ConnectTimeoutError ) { - throw new MetadataTimeoutError(url); + throw new MetadataTimeoutError(new URL(url)); } else if (error instanceof errors.ResponseExceededMaxSizeError) { throw new MetadataSizeExceededError(url); } else if (error instanceof errors.ResponseStatusCodeError && error.statusCode === 429) { @@ -257,35 +289,7 @@ export async function getMetadataFromUri(token_uri: string): Promise= ENV.METADATA_MAX_IMMEDIATE_URI_RETRIES - ) { - throw error; - } - } - } while (fetchImmediateRetryCount < ENV.METADATA_MAX_IMMEDIATE_URI_RETRIES); + const content = await fetchMetadata(httpUrl); return parseJsonMetadata(urlStr, content); } diff --git a/tests/admin/admin-rpc.test.ts b/tests/admin/admin-rpc.test.ts index 39bd1287..0d008e43 100644 --- a/tests/admin/admin-rpc.test.ts +++ b/tests/admin/admin-rpc.test.ts @@ -1,3 +1,4 @@ +import * as imageCache from '../../src/token-processor/images/image-cache'; import { cycleMigrations } from '@hirosystems/api-toolkit'; import { buildAdminRpcServer } from '../../src/admin-rpc/init'; import { ENV } from '../../src/env'; @@ -6,7 +7,6 @@ import { DbJobStatus, DbSipNumber } from '../../src/pg/types'; import { insertAndEnqueueTestContractWithTokens, markAllJobsAsDone, - sleep, TestFastifyServer, } from '../helpers'; @@ -118,7 +118,13 @@ describe('Admin RPC', () => { describe('/cache-images', () => { test('reprocesses token images', async () => { - ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/token-queue/test-image-cache.js'; + const spy = jest + .spyOn(imageCache, 'reprocessTokenImageCache') + .mockImplementation((a, b, c) => { + return Promise.resolve(); + }); + + ENV.IMAGE_CACHE_PROCESSOR_ENABLED = true; const principal = 'SP2SYHR84SDJJDK8M09HFS4KBFXPPCX9H7RZ9YVTS.hello-world'; await insertAndEnqueueTestContractWithTokens(db, principal, DbSipNumber.sip009, 1n); await db.updateProcessedTokenWithMetadata({ @@ -158,17 +164,52 @@ describe('Admin RPC', () => { headers: { 'content-type': 'application/json' }, }); expect(response.statusCode).toBe(200); - await sleep(1000); // Wait for changes to complete. - const bundle = await db.getTokenMetadataBundle({ - contractPrincipal: principal, - tokenNumber: 1, + + expect(spy).toHaveBeenCalledTimes(1); + spy.mockRestore(); + }); + + test('rejects when image cache is disabled', async () => { + ENV.IMAGE_CACHE_PROCESSOR_ENABLED = false; + const principal = 'SP2SYHR84SDJJDK8M09HFS4KBFXPPCX9H7RZ9YVTS.hello-world'; + await insertAndEnqueueTestContractWithTokens(db, principal, DbSipNumber.sip009, 1n); + await db.updateProcessedTokenWithMetadata({ + id: 1, + values: { + token: { + name: 'hello-world', + symbol: 'HELLO', + decimals: 6, + total_supply: '1', + uri: 'http://test.com/uri.json', + }, + metadataLocales: [ + { + metadata: { + sip: 16, + token_id: 1, + name: 'hello-world', + l10n_locale: 'en', + l10n_uri: null, + l10n_default: true, + description: 'test', + image: 'http://test.com/image.png', + cached_image: null, + cached_thumbnail_image: null, + }, + }, + ], + }, }); - expect(bundle.metadataLocale?.metadata.cached_image).toBe( - 'http://test.com/image.png?processed=true' - ); - expect(bundle.metadataLocale?.metadata.cached_thumbnail_image).toBe( - 'http://test.com/image.png?processed=true&thumb=true' - ); + const response = await fastify.inject({ + url: '/metadata/admin/cache-images', + method: 'POST', + payload: JSON.stringify({ + contractId: principal, + }), + headers: { 'content-type': 'application/json' }, + }); + expect(response.statusCode).toBe(422); }); }); }); diff --git a/tests/helpers.ts b/tests/helpers.ts index 84aef917..27271477 100644 --- a/tests/helpers.ts +++ b/tests/helpers.ts @@ -1,3 +1,4 @@ +import * as http from 'http'; import { PgStore } from '../src/pg/pg-store'; import { buildApiServer } from '../src/api/init'; import { FastifyBaseLogger, FastifyInstance } from 'fastify'; @@ -12,6 +13,7 @@ import { import { BlockCache, CachedEvent } from '../src/pg/chainhook/block-cache'; import { SmartContractDeployment } from '../src/token-processor/util/sip-validation'; import { DbJob, DbSipNumber, DbSmartContract, DbUpdateNotification } from '../src/pg/types'; +import { waiter } from '@hirosystems/api-toolkit'; export type TestFastifyServer = FastifyInstance< Server, @@ -25,9 +27,46 @@ export async function startTestApiServer(db: PgStore): Promise { - return new Promise(resolve => setTimeout(resolve, time)); -}; +export async function startTimeoutServer(delay: number, port: number = 9999) { + const server = http.createServer((req, res) => { + setTimeout(() => { + res.statusCode = 200; + res.end('Delayed response'); + }, delay); + }); + server.on('error', e => console.log(e)); + const serverReady = waiter(); + server.listen(port, '0.0.0.0', () => serverReady.finish()); + await serverReady; + return server; +} + +export async function startTestResponseServer( + response: string, + statusCode: number = 200, + port: number = 9999 +) { + const server = http.createServer((req, res) => { + res.statusCode = statusCode; + res.end(response); + }); + server.on('error', e => console.log(e)); + const serverReady = waiter(); + server.listen(port, '0.0.0.0', () => serverReady.finish()); + await serverReady; + return server; +} + +export async function closeTestServer(server: http.Server) { + const serverDone = waiter(); + server.close(err => { + if (err) { + console.log(err); + } + serverDone.finish(); + }); + await serverDone; +} export const SIP_009_ABI = { maps: [ diff --git a/tests/test-image-cache-error.js b/tests/test-image-cache-error.js deleted file mode 100755 index 878b3073..00000000 --- a/tests/test-image-cache-error.js +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env node -console.error('Test error'); -process.exit(1); diff --git a/tests/token-queue/image-cache.test.ts b/tests/token-queue/image-cache.test.ts index b021a937..81ac73b8 100644 --- a/tests/token-queue/image-cache.test.ts +++ b/tests/token-queue/image-cache.test.ts @@ -1,39 +1,51 @@ import { ENV } from '../../src/env'; -import { processImageCache } from '../../src/token-processor/util/image-cache'; +import { processImageCache } from '../../src/token-processor/images/image-cache'; +import { closeTestServer, startTestResponseServer, startTimeoutServer } from '../helpers'; +import { + HttpError, + MetadataTimeoutError, + TooManyRequestsHttpError, +} from '../../src/token-processor/util/errors'; describe('Image cache', () => { const contract = 'SP3QSAJQ4EA8WXEDSRRKMZZ29NH91VZ6C5X88FGZQ.crashpunks-v2'; const tokenNumber = 100n; - const url = 'http://cloudflare-ipfs.com/test/image.png'; beforeAll(() => { - ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/token-queue/test-image-cache.js'; + ENV.IMAGE_CACHE_PROCESSOR_ENABLED = true; + ENV.IMAGE_CACHE_GCS_BUCKET_NAME = 'test'; + ENV.IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX = 'prefix/'; }); - test('transforms image URL correctly', async () => { - const transformed = await processImageCache(url, contract, tokenNumber); - expect(transformed).toStrictEqual([ - 'http://cloudflare-ipfs.com/test/image.png?processed=true', - 'http://cloudflare-ipfs.com/test/image.png?processed=true&thumb=true', - ]); - }); + test('throws image fetch timeout error', async () => { + ENV.METADATA_FETCH_TIMEOUT_MS = 50; + const server = await startTimeoutServer(100); + await expect( + processImageCache('http://127.0.0.1:9999/', contract, tokenNumber) + ).rejects.toThrow(MetadataTimeoutError); + await closeTestServer(server); + }, 10000); - test('ignores data: URL', async () => { - const url = 'data:123456'; - const transformed = await processImageCache(url, contract, tokenNumber); - expect(transformed).toStrictEqual(['data:123456']); - }); + test('throws rate limit error', async () => { + const server = await startTestResponseServer('rate limit exceeded', 429); + await expect( + processImageCache('http://127.0.0.1:9999/', contract, tokenNumber) + ).rejects.toThrow(TooManyRequestsHttpError); + await closeTestServer(server); + }, 10000); - test('ignores empty script paths', async () => { - ENV.METADATA_IMAGE_CACHE_PROCESSOR = ''; - const transformed = await processImageCache(url, contract, tokenNumber); - expect(transformed).toStrictEqual([url]); - }); + test('throws other server errors', async () => { + const server = await startTestResponseServer('not found', 404); + await expect( + processImageCache('http://127.0.0.1:9999/', contract, tokenNumber) + ).rejects.toThrow(HttpError); + await closeTestServer(server); + }, 10000); - test('handles script errors', async () => { - ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/test-image-cache-error.js'; - await expect(processImageCache(url, contract, tokenNumber)).rejects.toThrow( - /ImageCache script error/ - ); + test('ignores data: URL', async () => { + const url = 'data:123456'; + await expect(processImageCache(url, contract, tokenNumber)).resolves.toStrictEqual([ + 'data:123456', + ]); }); }); diff --git a/tests/token-queue/job-queue.test.ts b/tests/token-queue/job-queue.test.ts index 1348fe54..25075236 100644 --- a/tests/token-queue/job-queue.test.ts +++ b/tests/token-queue/job-queue.test.ts @@ -1,9 +1,9 @@ import { ENV } from '../../src/env'; import { MIGRATIONS_DIR, PgStore } from '../../src/pg/pg-store'; -import { DbJob, DbJobStatus, DbSipNumber, DbSmartContractInsert } from '../../src/pg/types'; +import { DbJob, DbJobStatus, DbSipNumber } from '../../src/pg/types'; import { JobQueue } from '../../src/token-processor/queue/job-queue'; -import { insertAndEnqueueTestContract, sleep } from '../helpers'; -import { cycleMigrations } from '@hirosystems/api-toolkit'; +import { insertAndEnqueueTestContract } from '../helpers'; +import { cycleMigrations, timeout } from '@hirosystems/api-toolkit'; class TestJobQueue extends JobQueue { constructor(args: { db: PgStore }) { @@ -86,7 +86,7 @@ describe('JobQueue', () => { await db.close(); queue.start(); // Wait 2 seconds and kill the queue. - await sleep(2000); + await timeout(2000); await queue.close(); }); }); diff --git a/tests/token-queue/process-token-job.test.ts b/tests/token-queue/process-token-job.test.ts index d4f61bab..939271be 100644 --- a/tests/token-queue/process-token-job.test.ts +++ b/tests/token-queue/process-token-job.test.ts @@ -353,7 +353,6 @@ describe('ProcessTokenJob', () => { }); test('parses metadata with arbitrary types', async () => { - ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/token-queue/test-image-cache.js'; const metadata = { name: 'Mutant Monkeys #1', image: @@ -418,9 +417,6 @@ describe('ProcessTokenJob', () => { expect(bundle?.metadataLocale?.metadata.image).toBe( 'https://byzantion.mypinata.cloud/ipfs/QmWAYP9LJD15mgrnapfpJhBArG6T3J4XKTM77tzqggvP7w' ); - expect(bundle?.metadataLocale?.metadata.cached_image).toBe( - 'https://byzantion.mypinata.cloud/ipfs/QmWAYP9LJD15mgrnapfpJhBArG6T3J4XKTM77tzqggvP7w?processed=true' - ); expect(bundle?.metadataLocale?.metadata.description).toBeNull(); const attr0 = bundle?.metadataLocale?.attributes[0]; diff --git a/tests/token-queue/test-image-cache.js b/tests/token-queue/test-image-cache.js deleted file mode 100755 index 982f0eed..00000000 --- a/tests/token-queue/test-image-cache.js +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env node -const imgUrl = process.argv[2].toString(); -console.log(`${imgUrl}?processed=true`); -console.log(`${imgUrl}?processed=true&thumb=true`);