Skip to content

Commit

Permalink
feat(das): add metric for task queue depth (metaplex-foundation#83)
Browse files Browse the repository at this point in the history
* feat(das): add task queue depth metric

* count pending tasks only

Co-authored-by: Nikhil Acharya <[email protected]>

---------

Co-authored-by: Nikhil Acharya <[email protected]>
  • Loading branch information
tahsintunan and Nikhil Acharya authored Jul 12, 2023
1 parent 7451b67 commit 11e1aef
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 29 deletions.
27 changes: 14 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ This spec is what providers of this api must implement against.
Along with the above rust binaries, this repo also maintains examples and best practice settings for running the entire infrastructure.
The example infrastructure is as follows.

- A Solana No-Vote Validator - This validator is configured to only have secure access to the validator ledger and account data under consensus.
- A Geyser Plugin (Plerkle) - The above validator is further configured to load this geyser plugin that sends Plerkle Serialized Messages over a messaging system.
- A Redis Cluster (Stream Optimized) - The example messaging system is a light weight redis deployment that supports the streaming configuration.
- A Kubernetes Cluster - The orchestration system for the API and Ingester processes. Probably overkill for a small installation, but it's a rock solid platform for critical software.
- A Solana No-Vote Validator - This validator is configured to only have secure access to the validator ledger and account data under consensus.
- A Geyser Plugin (Plerkle) - The above validator is further configured to load this geyser plugin that sends Plerkle Serialized Messages over a messaging system.
- A Redis Cluster (Stream Optimized) - The example messaging system is a light weight redis deployment that supports the streaming configuration.
- A Kubernetes Cluster - The orchestration system for the API and Ingester processes. Probably overkill for a small installation, but it's a rock solid platform for critical software.

This repo houses Helm Charts, Docker files and Terraform files to assist in the deployment of the example infrastructure.

Expand All @@ -67,10 +67,10 @@ If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`.

_Prerequisites_

- A Postgres Server running with the database setup according to ./init.sql
- A Redis instance that has streams enabled or a version that supports streams
- A local solana validator with the Plerkle plugin running.
- Environment Variables set to allow your validator, ingester and api to access those prerequisites.
- A Postgres Server running with the database setup according to ./init.sql
- A Redis instance that has streams enabled or a version that supports streams
- A local solana validator with the Plerkle plugin running.
- Environment Variables set to allow your validator, ingester and api to access those prerequisites.

See [Plugin Configuration](https://github.com/metaplex-foundation/digital-asset-validator-plugin#building-locally) for how to locally configure the test validator plugin to work.

Expand Down Expand Up @@ -118,11 +118,11 @@ For production you should split the components up.

Developing with Docker is much easier, but has some nuances to it. This test docker compose system relies on a programs folder being accessible, this folder needs to have the shared object files for the following programs

- Token Metadata
- Bubblegum
- Gummyroll
- Token 2022
- Latest version of the Associated token program
- Token Metadata
- Bubblegum
- Gummyroll
- Token 2022
- Latest version of the Associated token program

You need to run the following script (which takes a long time) in order to get all those .so files.

Expand Down Expand Up @@ -288,6 +288,7 @@ count ingester.bgtask.network_error
count ingester.bgtask.unrecoverable_error
time ingester.bgtask.bus_time
count ingester.bgtask.identical
gauge ingester.bgtask.queue_depth

### BACKFILLER

Expand Down
5 changes: 2 additions & 3 deletions digital_asset_types/src/dao/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ impl SearchAssetsQuery {
conditions = conditions.add(asset_creators::Column::Creator.eq(c));
}

// N.B. Something to consider is that without specifying the creators themselves,
// there is no index being hit. That means in some scenarios this query could be very slow.
// But those should only happen in rare scenarios.
// Without specifying the creators themselves, there is no index being hit.
// So in some rare scenarios, this query could be very slow.
if let Some(cv) = self.creator_verified.to_owned() {
conditions = conditions.add(asset_creators::Column::Verified.eq(cv));
}
Expand Down
18 changes: 9 additions & 9 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,31 @@ pub enum IngesterError {
StorageWriteError(String),
#[error("NotImplemented")]
NotImplemented,
#[error("Deserialization Error {0}")]
#[error("Deserialization Error: {0}")]
DeserializationError(String),
#[error("Task Manager Error {0}")]
#[error("Task Manager Error: {0}")]
TaskManagerError(String),
#[error("Missing or invalid configuration: ({msg})")]
ConfigurationError { msg: String },
#[error("Error getting RPC data {0}")]
RpcGetDataError(String),
#[error("RPC returned data in unsupported format {0}")]
#[error("RPC returned data in unsupported format: {0}")]
RpcDataUnsupportedFormat(String),
#[error("Data serializaton error {0}")]
#[error("Data serializaton error: {0}")]
SerializatonError(String),
#[error("Messenger error {0}")]
#[error("Messenger error: {0}")]
MessengerError(String),
#[error("Blockbuster Parsing error {0}")]
#[error("Blockbuster Parsing error: {0}")]
ParsingError(String),
#[error("Data Base Error {0}")]
#[error("Database Error: {0}")]
DatabaseError(String),
#[error("Unknown Task Type {0}")]
#[error("Unknown Task Type: {0}")]
UnknownTaskType(String),
#[error("BG Task Manager Not Started")]
TaskManagerNotStarted,
#[error("Unrecoverable task error: {0}")]
UnrecoverableTaskError(String),
#[error("Cache Storage Write Error {0}")]
#[error("Cache Storage Write Error: {0}")]
CacheStorageWriteError(String),
#[error("HttpError {status_code}")]
HttpError { status_code: String },
Expand Down
40 changes: 36 additions & 4 deletions nft_ingester/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::IngesterError, metric};
use async_trait::async_trait;
use cadence_macros::{is_global_default_set, statsd_count, statsd_histogram};
use cadence_macros::{is_global_default_set, statsd_count, statsd_gauge, statsd_histogram};
use chrono::{Duration, NaiveDateTime, Utc};
use crypto::{digest::Digest, sha2::Sha256};
use digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks};
Expand Down Expand Up @@ -34,6 +34,7 @@ pub trait BgTask: Send + Sync {
}

const RETRY_INTERVAL: u64 = 1000;
const QUEUE_DEPTH_INTERVAL: u64 = 2500;
const DELETE_INTERVAL: u64 = 30000;
const MAX_TASK_BATCH_SIZE: u64 = 100;

Expand Down Expand Up @@ -191,6 +192,14 @@ impl TaskManager {
.map_err(|e| e.into())
}

pub async fn get_task_queue_depth(conn: &DatabaseConnection) -> Result<u64, IngesterError> {
tasks::Entity::find()
.filter(tasks::Column::Status.eq(TaskStatus::Pending))
.count(conn)
.await
.map_err(|e| e.into())
}

pub fn get_sender(&self) -> Result<UnboundedSender<TaskData>, IngesterError> {
self.producer
.clone()
Expand Down Expand Up @@ -323,11 +332,9 @@ impl TaskManager {
}

pub fn start_runner(&self) -> JoinHandle<()> {
let task_map = self.registered_task_types.clone();
let pool = self.pool.clone();
let instance_name = self.instance_name.clone();
tokio::spawn(async move {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let mut interval = time::interval(tokio::time::Duration::from_millis(DELETE_INTERVAL));
loop {
interval.tick().await; // ticks immediately
Expand All @@ -342,8 +349,33 @@ impl TaskManager {
};
}
});

let pool = self.pool.clone();
tokio::spawn(async move {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let mut interval =
time::interval(tokio::time::Duration::from_millis(QUEUE_DEPTH_INTERVAL));
loop {
interval.tick().await; // ticks immediately
let res = TaskManager::get_task_queue_depth(&conn).await;
match res {
Ok(depth) => {
debug!("Task queue depth: {}", depth);
metric! {
statsd_gauge!("ingester.bgtask.queue_depth", depth);
}
}
Err(e) => {
error!("error getting queue depth: {}", e);
}
};
}
});

let pool = self.pool.clone();
let ipfs_gateway = self.ipfs_gateway.clone();
let task_map = self.registered_task_types.clone();
let instance_name = self.instance_name.clone();
tokio::spawn(async move {
let mut interval = time::interval(tokio::time::Duration::from_millis(RETRY_INTERVAL));
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());
Expand Down

0 comments on commit 11e1aef

Please sign in to comment.