From 11e1aeff2964127a7995e6c9f066b71d64bdad1c Mon Sep 17 00:00:00 2001 From: Tahsin Tunan Date: Thu, 13 Jul 2023 00:21:29 +0600 Subject: [PATCH] feat(das): add metric for task queue depth (#83) * feat(das): add task queue depth metric * count pending tasks only Co-authored-by: Nikhil Acharya <128666399+nikhil-helius@users.noreply.github.com> --------- Co-authored-by: Nikhil Acharya <128666399+nikhil-helius@users.noreply.github.com> --- README.md | 27 ++++++++++---------- digital_asset_types/src/dao/mod.rs | 5 ++-- nft_ingester/src/error/mod.rs | 18 +++++++------- nft_ingester/src/tasks/mod.rs | 40 +++++++++++++++++++++++++++--- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 4e7bcfd65..fe5bf3d31 100644 --- a/README.md +++ b/README.md @@ -40,10 +40,10 @@ This spec is what providers of this api must implement against. Along with the above rust binaries, this repo also maintains examples and best practice settings for running the entire infrastructure. The example infrastructure is as follows. -- A Solana No-Vote Validator - This validator is configured to only have secure access to the validator ledger and account data under consensus. -- A Geyser Plugin (Plerkle) - The above validator is further configured to load this geyser plugin that sends Plerkle Serialized Messages over a messaging system. -- A Redis Cluster (Stream Optimized) - The example messaging system is a light weight redis deployment that supports the streaming configuration. -- A Kubernetes Cluster - The orchestration system for the API and Ingester processes. Probably overkill for a small installation, but it's a rock solid platform for critical software. +- A Solana No-Vote Validator - This validator is configured to only have secure access to the validator ledger and account data under consensus. +- A Geyser Plugin (Plerkle) - The above validator is further configured to load this geyser plugin that sends Plerkle Serialized Messages over a messaging system. +- A Redis Cluster (Stream Optimized) - The example messaging system is a light weight redis deployment that supports the streaming configuration. +- A Kubernetes Cluster - The orchestration system for the API and Ingester processes. Probably overkill for a small installation, but it's a rock solid platform for critical software. This repo houses Helm Charts, Docker files and Terraform files to assist in the deployment of the example infrastructure. @@ -67,10 +67,10 @@ If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`. _Prerequisites_ -- A Postgres Server running with the database setup according to ./init.sql -- A Redis instance that has streams enabled or a version that supports streams -- A local solana validator with the Plerkle plugin running. -- Environment Variables set to allow your validator, ingester and api to access those prerequisites. +- A Postgres Server running with the database setup according to ./init.sql +- A Redis instance that has streams enabled or a version that supports streams +- A local solana validator with the Plerkle plugin running. +- Environment Variables set to allow your validator, ingester and api to access those prerequisites. See [Plugin Configuration](https://github.com/metaplex-foundation/digital-asset-validator-plugin#building-locally) for how to locally configure the test validator plugin to work. @@ -118,11 +118,11 @@ For production you should split the components up. Developing with Docker is much easier, but has some nuances to it. This test docker compose system relies on a programs folder being accessible, this folder needs to have the shared object files for the following programs -- Token Metadata -- Bubblegum -- Gummyroll -- Token 2022 -- Latest version of the Associated token program +- Token Metadata +- Bubblegum +- Gummyroll +- Token 2022 +- Latest version of the Associated token program You need to run the following script (which takes a long time) in order to get all those .so files. @@ -288,6 +288,7 @@ count ingester.bgtask.network_error count ingester.bgtask.unrecoverable_error time ingester.bgtask.bus_time count ingester.bgtask.identical +gauge ingester.bgtask.queue_depth ### BACKFILLER diff --git a/digital_asset_types/src/dao/mod.rs b/digital_asset_types/src/dao/mod.rs index 34968ac28..677a71185 100644 --- a/digital_asset_types/src/dao/mod.rs +++ b/digital_asset_types/src/dao/mod.rs @@ -183,9 +183,8 @@ impl SearchAssetsQuery { conditions = conditions.add(asset_creators::Column::Creator.eq(c)); } - // N.B. Something to consider is that without specifying the creators themselves, - // there is no index being hit. That means in some scenarios this query could be very slow. - // But those should only happen in rare scenarios. + // Without specifying the creators themselves, there is no index being hit. + // So in some rare scenarios, this query could be very slow. if let Some(cv) = self.creator_verified.to_owned() { conditions = conditions.add(asset_creators::Column::Verified.eq(cv)); } diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 918caf1e1..6a990197f 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -22,31 +22,31 @@ pub enum IngesterError { StorageWriteError(String), #[error("NotImplemented")] NotImplemented, - #[error("Deserialization Error {0}")] + #[error("Deserialization Error: {0}")] DeserializationError(String), - #[error("Task Manager Error {0}")] + #[error("Task Manager Error: {0}")] TaskManagerError(String), #[error("Missing or invalid configuration: ({msg})")] ConfigurationError { msg: String }, #[error("Error getting RPC data {0}")] RpcGetDataError(String), - #[error("RPC returned data in unsupported format {0}")] + #[error("RPC returned data in unsupported format: {0}")] RpcDataUnsupportedFormat(String), - #[error("Data serializaton error {0}")] + #[error("Data serializaton error: {0}")] SerializatonError(String), - #[error("Messenger error {0}")] + #[error("Messenger error: {0}")] MessengerError(String), - #[error("Blockbuster Parsing error {0}")] + #[error("Blockbuster Parsing error: {0}")] ParsingError(String), - #[error("Data Base Error {0}")] + #[error("Database Error: {0}")] DatabaseError(String), - #[error("Unknown Task Type {0}")] + #[error("Unknown Task Type: {0}")] UnknownTaskType(String), #[error("BG Task Manager Not Started")] TaskManagerNotStarted, #[error("Unrecoverable task error: {0}")] UnrecoverableTaskError(String), - #[error("Cache Storage Write Error {0}")] + #[error("Cache Storage Write Error: {0}")] CacheStorageWriteError(String), #[error("HttpError {status_code}")] HttpError { status_code: String }, diff --git a/nft_ingester/src/tasks/mod.rs b/nft_ingester/src/tasks/mod.rs index 9d920e963..de2025950 100644 --- a/nft_ingester/src/tasks/mod.rs +++ b/nft_ingester/src/tasks/mod.rs @@ -1,6 +1,6 @@ use crate::{error::IngesterError, metric}; use async_trait::async_trait; -use cadence_macros::{is_global_default_set, statsd_count, statsd_histogram}; +use cadence_macros::{is_global_default_set, statsd_count, statsd_gauge, statsd_histogram}; use chrono::{Duration, NaiveDateTime, Utc}; use crypto::{digest::Digest, sha2::Sha256}; use digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}; @@ -34,6 +34,7 @@ pub trait BgTask: Send + Sync { } const RETRY_INTERVAL: u64 = 1000; +const QUEUE_DEPTH_INTERVAL: u64 = 2500; const DELETE_INTERVAL: u64 = 30000; const MAX_TASK_BATCH_SIZE: u64 = 100; @@ -191,6 +192,14 @@ impl TaskManager { .map_err(|e| e.into()) } + pub async fn get_task_queue_depth(conn: &DatabaseConnection) -> Result { + tasks::Entity::find() + .filter(tasks::Column::Status.eq(TaskStatus::Pending)) + .count(conn) + .await + .map_err(|e| e.into()) + } + pub fn get_sender(&self) -> Result, IngesterError> { self.producer .clone() @@ -323,11 +332,9 @@ impl TaskManager { } pub fn start_runner(&self) -> JoinHandle<()> { - let task_map = self.registered_task_types.clone(); let pool = self.pool.clone(); - let instance_name = self.instance_name.clone(); tokio::spawn(async move { - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); let mut interval = time::interval(tokio::time::Duration::from_millis(DELETE_INTERVAL)); loop { interval.tick().await; // ticks immediately @@ -342,8 +349,33 @@ impl TaskManager { }; } }); + + let pool = self.pool.clone(); + tokio::spawn(async move { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + let mut interval = + time::interval(tokio::time::Duration::from_millis(QUEUE_DEPTH_INTERVAL)); + loop { + interval.tick().await; // ticks immediately + let res = TaskManager::get_task_queue_depth(&conn).await; + match res { + Ok(depth) => { + debug!("Task queue depth: {}", depth); + metric! { + statsd_gauge!("ingester.bgtask.queue_depth", depth); + } + } + Err(e) => { + error!("error getting queue depth: {}", e); + } + }; + } + }); + let pool = self.pool.clone(); let ipfs_gateway = self.ipfs_gateway.clone(); + let task_map = self.registered_task_types.clone(); + let instance_name = self.instance_name.clone(); tokio::spawn(async move { let mut interval = time::interval(tokio::time::Duration::from_millis(RETRY_INTERVAL)); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());