From 2ffa458924f29c8391cc870ebebd029d2f0584d8 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 29 May 2022 20:37:03 +0300 Subject: [PATCH] Add prometheus, compression, s3 for large payloads (#6) --- Cargo.lock | 190 ++++++++++++---- Cargo.toml | 17 +- README.md | 22 +- config.json | 25 ++- src/aws.rs | 217 ++++++++++++++++++ src/bin/consumer.rs | 218 +++++++++++++----- src/config.rs | 191 +++++++++++++--- src/lib.rs | 2 + src/plugin.rs | 60 +++-- src/prom.rs | 148 ++++++++++++ src/sqs.rs | 537 +++++++++++++++++++++----------------------- 11 files changed, 1189 insertions(+), 438 deletions(-) create mode 100644 src/aws.rs create mode 100644 src/prom.rs diff --git a/Cargo.lock b/Cargo.lock index 220aafb..328ff47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,6 +343,7 @@ checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b" dependencies = [ "atty", "bitflags", + "clap_derive", "clap_lex", "indexmap", "lazy_static", @@ -351,6 +352,19 @@ dependencies = [ "textwrap", ] +[[package]] +name = "clap_derive" +version = "3.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "clap_lex" version = "0.2.0" @@ -607,9 +621,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "1.5.0" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d916019f70ae3a1faa1195685e290287f39207d38e6dfee727197cffcc002214" +checksum = "1e9c280362032ea4203659fc489832d0204ef09f247a0506f170dafcac08c369" dependencies = [ "signature", ] @@ -935,6 +949,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1004,9 +1024,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", "http", @@ -1134,9 +1154,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "jobserver" @@ -1158,9 +1178,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67c21572b4949434e4fc1e1978b99c5f77064153c59d998bf13ecd96fb5ecba7" +checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838" [[package]] name = "lazy_static" @@ -1170,9 +1190,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" [[package]] name = "libloading" @@ -1251,6 +1271,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "matches" version = "0.1.9" @@ -1409,9 +1435,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" [[package]] name = "opaque-debug" @@ -1466,9 +1492,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.0.0" +version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" +checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" [[package]] name = "ouroboros" @@ -1502,7 +1528,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core", + "parking_lot_core 0.8.5", +] + +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.3", ] [[package]] @@ -1519,6 +1555,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "pbkdf2" version = "0.9.0" @@ -1603,13 +1652,34 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa" +checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" dependencies = [ - "unicode-xid", + "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cface98dfa6d645ea4c789839f176e4b072265d085bfcc48eaa8d137f58d3c39" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.0", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96" + [[package]] name = "qstring" version = "0.7.2" @@ -1671,9 +1741,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221" +checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" dependencies = [ "autocfg", "crossbeam-deque", @@ -1683,9 +1753,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.2" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f51245e1e62e1f1629cbfec37b5793bbabcaeb90f30e94d2ba03564687353e4" +checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" dependencies = [ "crossbeam-channel", "crossbeam-deque", @@ -1715,9 +1785,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.5" +version = "1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" dependencies = [ "aho-corasick", "memchr", @@ -1726,9 +1796,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.25" +version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" [[package]] name = "remove_dir_all" @@ -1794,9 +1864,9 @@ dependencies = [ [[package]] name = "rusoto_core" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b4f000e8934c1b4f70adde180056812e7ea6b1a247952db8ee98c94cd3116cc" +checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2" dependencies = [ "async-trait", "base64 0.13.0", @@ -1819,9 +1889,9 @@ dependencies = [ [[package]] name = "rusoto_credential" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a46b67db7bb66f5541e44db22b0a02fed59c9603e146db3a9e633272d3bac2f" +checksum = "ee0a6c13db5aad6047b6a44ef023dbbc21a056b6dab5be3b79ce4283d5c02d05" dependencies = [ "async-trait", "chrono", @@ -1835,11 +1905,24 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rusoto_s3" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aae4677183411f6b0b412d66194ef5403293917d66e70ab118f07cc24c5b14d" +dependencies = [ + "async-trait", + "bytes", + "futures", + "rusoto_core", + "xml-rs", +] + [[package]] name = "rusoto_signature" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6264e93384b90a747758bcc82079711eacf2e755c3a8b5091687b5349d870bcc" +checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272" dependencies = [ "base64 0.13.0", "bytes", @@ -1863,9 +1946,9 @@ dependencies = [ [[package]] name = "rusoto_sqs" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ae091bb560b2aa3b6ec2ab8224516b63f6b6f7c495ae4e41f0566089b156e5f" +checksum = "5218423da8976dfc3f14c72d602681c9cedb0cfa29eddb5c36a440eca6444131" dependencies = [ "async-trait", "bytes", @@ -1892,9 +1975,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.4" +version = "0.20.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" dependencies = [ "log", "ring", @@ -1919,9 +2002,9 @@ checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f" [[package]] name = "ryu" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" [[package]] name = "same-file" @@ -1934,12 +2017,12 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "winapi", + "windows-sys", ] [[package]] @@ -2265,11 +2348,17 @@ dependencies = [ "bincode", "clap", "derivative", + "flate2", "futures", "humantime", + "hyper", + "lazy_static", "log", + "maplit", + "prometheus", "rusoto_core", "rusoto_credential", + "rusoto_s3", "rusoto_sqs", "serde", "serde_json", @@ -2280,6 +2369,7 @@ dependencies = [ "spl-token", "thiserror", "tokio", + "zstd", ] [[package]] @@ -2343,7 +2433,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "parking_lot", + "parking_lot 0.11.2", "rand", "rustc_version", "rustversion", @@ -2650,13 +2740,13 @@ checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" [[package]] name = "syn" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" +checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -2809,9 +2899,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" dependencies = [ "bytes", "futures-core", @@ -2886,6 +2976,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +[[package]] +name = "unicode-ident" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" + [[package]] name = "unicode-normalization" version = "0.1.19" diff --git a/Cargo.toml b/Cargo.toml index 315c927..686f212 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,14 +13,20 @@ arrayref = "0.3.6" async-trait = "0.1" base64 = "0.13.0" bincode = "1.3" -clap = { version = "3.1.8", features = ["cargo"] } +clap = { version = "3.1.8", features = ["cargo", "derive"] } derivative = "2" +flate2 = "1" futures = "0.3" humantime = "2.1.0" +hyper = { version = "0.14", features = ["server"] } +lazy_static = "1" log = "0.4.14" -rusoto_core = "0.47" -rusoto_credential = "0.47" -rusoto_sqs = "0.47" +maplit = "1" +prometheus = "0.13" +rusoto_core = "0.48" +rusoto_credential = "0.48" +rusoto_s3 = "0.48" +rusoto_sqs = "0.48" serde = { version = "1.0.132", features = ["derive"] } serde_json = "1.0.73" solana-geyser-plugin-interface = "=1.9.22" @@ -29,7 +35,8 @@ solana-sdk = "=1.9.22" solana-transaction-status = "=1.9.22" spl-token = "3.2.0" thiserror = "1.0.30" -tokio = { version = "1.15.0", features = ["rt-multi-thread", "time", "macros"] } +tokio = { version = "1.15.0", features = ["rt-multi-thread", "time", "macros", "io-util"] } +zstd = "0.9" [profile.release] codegen-units = 1 diff --git a/README.md b/README.md index a5ef613..5173cc2 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,27 @@ Now you can run validator with plugin: $ solana-validator --geyser-plugin-config ./config.json ``` +### Accounts data compression + +Since SQS payload can be only 256KB long and Solana accounts can be up to 10MB not all accounts will fit to SQS message. Currently messages with these accounts are dropped. But it's possible to decrease number of dropped messages if we will compress data. Right now `zstd` and `gzip` are supported. + +```json +"messages": { + "commitment_level": "confirmed", + "accounts_data_compression": {"algo": "none"} +} +"messages": { + "commitment_level": "confirmed", + "accounts_data_compression": {"algo": "zstd", "level": 3} +} +"messages": { + "commitment_level": "confirmed", + "accounts_data_compression": {"algo": "gzip", "level": 6} +} +``` + +`level` field is optional for both compression algorithms. + ### AWS Credentials **Required permissions:** @@ -71,7 +92,6 @@ All fields in filter are optional but at least 1 is required. Fields works as lo ##### Examples - Filter accounts: ```json diff --git a/config.json b/config.json index 6cfe806..0424e80 100644 --- a/config.json +++ b/config.json @@ -4,15 +4,30 @@ "level": "info", "filters": true }, + "prometheus": { + "address": "0.0.0.0:8999" + }, "sqs": { - "url": "https://sqs.us-east-2.amazonaws.com//", - "region": "us-east-2", - "commitment_level": "finalized", - "max_requests": 10000, "auth": { "access_key_id": "access_key_id", "secret_access_key": "secret_access_key" - } + }, + "region": "us-east-2", + "url": "https://sqs.us-east-2.amazonaws.com//", + "max_requests": "10_000" + }, + "s3": { + "auth": { + "access_key_id": "access_key_id", + "secret_access_key": "secret_access_key" + }, + "region": "us-east-2", + "bucket": "solana-geyser", + "max_requests": "10_000" + }, + "messages": { + "commitment_level": "confirmed", + "accounts_data_compression": {"algo": "none"} }, "slots": { "enabled": false diff --git a/src/aws.rs b/src/aws.rs new file mode 100644 index 0000000..ee7a2ac --- /dev/null +++ b/src/aws.rs @@ -0,0 +1,217 @@ +use { + crate::{ + config::{ConfigAwsAuth, ConfigAwsS3, ConfigAwsSqs}, + prom::{ + UploadAwsStatus, UPLOAD_S3_REQUESTS, UPLOAD_S3_TOTAL, UPLOAD_SQS_REQUESTS, + UPLOAD_SQS_TOTAL, + }, + }, + rusoto_core::{request::TlsError, ByteStream, Client as RusotoClient, HttpClient, RusotoError}, + rusoto_credential::{ + AutoRefreshingProvider, AwsCredentials, ChainProvider, CredentialsError, ProfileProvider, + ProvideAwsCredentials, StaticProvider, + }, + rusoto_s3::{PutObjectError, PutObjectRequest, S3Client as RusotoS3Client, S3}, + rusoto_sqs::{ + BatchResultErrorEntry, GetQueueAttributesError, GetQueueAttributesRequest, + SendMessageBatchError, SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, + SqsClient as RusotoSqsClient, + }, + std::sync::Arc, + thiserror::Error, + tokio::sync::Semaphore, +}; + +#[derive(Debug, Error)] +pub enum AwsError { + #[error("credential error: {0}")] + Credentials(#[from] CredentialsError), + #[error("http client error: {0}")] + HttpClientTls(#[from] TlsError), + #[error("failed to get sqs queue attributes: {0}")] + SqsGetAttributes(#[from] RusotoError), + #[error("failed to send messages to sqs: {0}")] + SqsSendMessageBatch(#[from] RusotoError), + #[error("failed to upload payload to s3: {0}")] + S3PutObject(#[from] RusotoError), +} + +pub type AwsResult = Result; + +#[derive(derivative::Derivative)] +#[derivative(Debug, Clone)] +pub struct SqsClient { + #[derivative(Debug = "ignore")] + pub client: RusotoSqsClient, + pub queue_url: String, +} + +impl SqsClient { + // The maximum allowed individual message size and the maximum total payload size (the sum of the + // individual lengths of all of the batched messages) are both 256 KB (262,144 bytes). + pub const REQUEST_LIMIT: usize = 250_000; + + pub fn new(config: ConfigAwsSqs) -> AwsResult { + let client = aws_create_client(config.auth)?; + Ok(Self { + client: RusotoSqsClient::new_with_client(client, config.region), + queue_url: config.url, + }) + } + + pub async fn check(self) -> AwsResult { + UPLOAD_SQS_REQUESTS.inc(); + let result = self + .client + .get_queue_attributes(GetQueueAttributesRequest { + attribute_names: None, + queue_url: self.queue_url, + }) + .await; + UPLOAD_SQS_REQUESTS.dec(); + + result.map_err(Into::into).map(|_| ()) + } + + pub async fn send_batch( + self, + entries: Vec, + ) -> Vec { + let entries_count = entries.len(); + + UPLOAD_SQS_REQUESTS.inc(); + let result = self + .client + .send_message_batch(SendMessageBatchRequest { + entries, + queue_url: self.queue_url, + }) + .await; + UPLOAD_SQS_REQUESTS.dec(); + + let failed = match result { + Ok(rusoto_sqs::SendMessageBatchResult { successful, failed }) => { + if !successful.is_empty() { + UPLOAD_SQS_TOTAL + .with_label_values(&[UploadAwsStatus::Success.as_str()]) + .inc_by(successful.len() as u64); + } + failed + } + Err(_error) => (0..entries_count) + .map(|id| BatchResultErrorEntry { + id: id.to_string(), + ..Default::default() + }) + .collect(), + }; + if !failed.is_empty() { + UPLOAD_SQS_TOTAL + .with_label_values(&[UploadAwsStatus::Failed.as_str()]) + .inc_by(failed.len() as u64); + } + failed + } +} + +#[derive(derivative::Derivative)] +#[derivative(Debug, Clone)] +pub struct S3Client { + #[derivative(Debug = "ignore")] + pub client: RusotoS3Client, + pub bucket: String, + pub permits: Arc, +} + +impl S3Client { + pub fn new(config: ConfigAwsS3) -> AwsResult { + let client = aws_create_client(config.auth)?; + Ok(Self { + client: RusotoS3Client::new_with_client(client, config.region), + bucket: config.bucket, + permits: Arc::new(Semaphore::new(config.max_requests)), + }) + } + + pub async fn put_object>(self, key: String, body: B) -> AwsResult { + let permit = self.permits.acquire().await.expect("alive"); + UPLOAD_S3_REQUESTS.inc(); + let result = self + .client + .put_object(PutObjectRequest { + body: Some(body.into()), + bucket: self.bucket, + key, + ..Default::default() + }) + .await; + UPLOAD_S3_REQUESTS.dec(); + drop(permit); + + let status = match result { + Ok(_) => UploadAwsStatus::Success, + Err(_) => UploadAwsStatus::Failed, + }; + UPLOAD_S3_TOTAL.with_label_values(&[status.as_str()]).inc(); + + result.map_err(Into::into).map(|_| ()) + } +} + +fn aws_create_client(config: ConfigAwsAuth) -> AwsResult { + let request_dispatcher = HttpClient::new()?; + let credentials_provider = AwsCredentialsProvider::new(config)?; + Ok(RusotoClient::new_with( + credentials_provider, + request_dispatcher, + )) +} + +#[allow(clippy::large_enum_variant)] +enum AwsCredentialsProvider { + Static(StaticProvider), + Chain(AutoRefreshingProvider), +} + +impl AwsCredentialsProvider { + pub fn new(config: ConfigAwsAuth) -> AwsResult { + match config { + ConfigAwsAuth::Static { + access_key_id, + secret_access_key, + } => Ok(Self::Static(StaticProvider::new_minimal( + access_key_id, + secret_access_key, + ))), + ConfigAwsAuth::Chain { + credentials_file, + profile, + } => { + let profile_provider = match (credentials_file, profile) { + (Some(file_path), Some(profile)) => { + ProfileProvider::with_configuration(file_path, profile) + } + (Some(file_path), None) => { + ProfileProvider::with_default_configuration(file_path) + } + (None, Some(profile)) => ProfileProvider::with_default_credentials(profile)?, + (None, None) => ProfileProvider::new()?, + }; + Ok(Self::Chain(AutoRefreshingProvider::new( + ChainProvider::with_profile_provider(profile_provider), + )?)) + } + } + } +} + +#[async_trait::async_trait] +impl ProvideAwsCredentials for AwsCredentialsProvider { + async fn credentials(&self) -> Result { + match self { + Self::Static(p) => p.credentials(), + Self::Chain(p) => p.credentials(), + } + .await + } +} diff --git a/src/bin/consumer.rs b/src/bin/consumer.rs index b6d107b..a1f3b79 100644 --- a/src/bin/consumer.rs +++ b/src/bin/consumer.rs @@ -1,17 +1,50 @@ use { anyhow::Result, - clap::{crate_name, Arg, Command}, + clap::Parser, + futures::stream::{self, StreamExt}, humantime::format_rfc3339_millis, + rusoto_s3::{DeleteObjectRequest, GetObjectOutput, GetObjectRequest, PutObjectRequest, S3}, rusoto_sqs::{ - DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, ReceiveMessageRequest, - ReceiveMessageResult, Sqs, + DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, MessageAttributeValue, + ReceiveMessageRequest, ReceiveMessageResult, SendMessageBatchRequest, + SendMessageBatchRequestEntry, Sqs, }, serde::Deserialize, - solana_geyser_sqs::{config::Config, sqs::AwsSqsClient}, - std::time::SystemTime, - tokio::time::{sleep, Duration}, + solana_geyser_sqs::{ + aws::{S3Client, SqsClient}, + config::Config, + }, + std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + time::SystemTime, + }, + tokio::{ + io::AsyncReadExt, + time::{sleep, Duration}, + }, }; +#[derive(Debug, Parser)] +#[clap(author, version, about)] +struct Args { + /// Path to geyser plugin config + #[clap(short, long)] + config: String, + + /// Upload payloads and send messages + #[clap(short, long)] + generate: bool, + + /// Print full messages + #[clap(short, long)] + details: bool, + + /// Max number of messages per one request + #[clap(short, long, default_value_t = 10)] + max_messages: i64, +} + #[derive(Debug, Deserialize)] struct Message<'a> { pub pubkey: &'a str, @@ -21,52 +54,77 @@ struct Message<'a> { #[tokio::main] async fn main() -> Result<()> { - let args = Command::new(crate_name!()) - .arg( - Arg::new("config") - .help("Path to geyser plugin config") - .long("config") - .short('c') - .required(true) - .takes_value(true), - ) - .arg( - Arg::new("details") - .help("Print full messages") - .long("details") - .short('d') - .takes_value(false), - ) - .get_matches(); - - let config_path = args.value_of("config").unwrap(); - let config = Config::load_from_file(config_path)?; - - // let (client, queue_url) = AwsSqsClient::create_sqs(config.sqs.clone())?; - // tokio::spawn(async move { - // use rusoto_sqs::{SendMessageBatchRequest, SendMessageBatchRequestEntry}; - // loop { - // let result = client - // .send_message_batch(SendMessageBatchRequest { - // entries: vec![SendMessageBatchRequestEntry { - // id: "0".to_owned(), - // message_body: "hello world".to_owned(), - // ..Default::default() - // }], - // queue_url: queue_url.clone(), - // }) - // .await; - // println!("{:?}", result); - // sleep(Duration::from_secs(1)).await; - // } - // }); - - let (client, queue_url) = AwsSqsClient::create_sqs(config.sqs)?; + let args = Args::parse(); + let config = Config::load_from_file(&args.config)?; + + if args.generate { + let config = config.clone(); + tokio::spawn(async move { + send_loop(config).await.unwrap(); + }); + } + + receive_loop(args, config).await +} + +async fn send_loop(config: Config) -> anyhow::Result<()> { + let sqs = SqsClient::new(config.sqs.clone())?; + let s3 = S3Client::new(config.s3.clone())?; + + loop { + let body = "hello world".as_bytes().to_vec(); + let mut hasher = DefaultHasher::new(); + SystemTime::now().hash(&mut hasher); + let key = format!("consumer-test-{:x}", hasher.finish()); + + let result = s3 + .client + .put_object(PutObjectRequest { + body: Some(body.into()), + bucket: s3.bucket.clone(), + key: key.clone(), + ..Default::default() + }) + .await; + println!("Put s3 object ({}): {:?}", key, result); + + let result = sqs + .client + .send_message_batch(SendMessageBatchRequest { + entries: vec![SendMessageBatchRequestEntry { + id: "0".to_owned(), + message_body: "s3".to_owned(), + message_attributes: Some(maplit::hashmap! { + "s3".to_owned() => + MessageAttributeValue { + data_type: "String".to_owned(), + string_value: Some(key), + ..Default::default() + }, + }), + ..Default::default() + }], + queue_url: sqs.queue_url.clone(), + }) + .await; + println!("Send sqs message: {:?}", result); + + sleep(Duration::from_secs(1)).await; + // break Ok(()); + } +} + +async fn receive_loop(args: Args, config: Config) -> anyhow::Result<()> { + let sqs = SqsClient::new(config.sqs)?; + let s3 = S3Client::new(config.s3)?; + loop { - let result = client + let result = sqs + .client .receive_message(ReceiveMessageRequest { - max_number_of_messages: Some(10), - queue_url: queue_url.clone(), + max_number_of_messages: Some(args.max_messages), + message_attribute_names: Some(vec!["All".to_owned()]), + queue_url: sqs.queue_url.clone(), ..Default::default() }) .await; @@ -75,7 +133,60 @@ async fn main() -> Result<()> { Ok(ReceiveMessageResult { messages: Some(messages), }) => { - if args.is_present("details") { + let messages = stream::iter(messages) + .filter_map(|mut message| { + let s3 = message + .message_attributes + .as_mut() + .and_then(|map| map.remove("s3")) + .and_then(|attr| attr.string_value) + .map(|key| (s3.clone(), key)); + + async move { + if let Some((s3, key)) = s3 { + let request = GetObjectRequest { + bucket: s3.bucket.clone(), + key: key.clone(), + ..Default::default() + }; + match s3.client.get_object(request).await { + Ok(GetObjectOutput { + body: Some(body), .. + }) => { + let mut body = body.into_async_read(); + let mut payload = String::new(); + if body.read_to_string(&mut payload).await.is_ok() { + message.body = Some(payload); + } + } + Ok(_) => {} + Err(error) => { + println!( + "failed to get payload from s3 ({}): {:?}", + key, error + ) + } + }; + + let request = DeleteObjectRequest { + bucket: s3.bucket.clone(), + key: key.clone(), + ..Default::default() + }; + if let Err(error) = s3.client.delete_object(request).await { + println!( + "failed to delete payload from s3 ({}): {:?}", + key, error + ); + } + } + Some(message) + } + }) + .collect::>() + .await; + + if args.details { println!("{} | messages: {:?}", now, messages); } else { let messages = messages @@ -105,10 +216,11 @@ async fn main() -> Result<()> { }) .collect::>(); if !entries.is_empty() { - let _ = client + let _ = sqs + .client .delete_message_batch(DeleteMessageBatchRequest { entries, - queue_url: queue_url.clone(), + queue_url: sqs.queue_url.clone(), }) .await; } diff --git a/src/config.rs b/src/config.rs index b6891c0..e2cc822 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ use { super::sqs::SlotStatus, + flate2::{write::GzEncoder, Compression as GzCompression}, rusoto_core::Region, serde::{de, Deserialize, Deserializer}, solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -7,18 +8,26 @@ use { }, solana_sdk::pubkey::Pubkey, std::{ + borrow::Cow, collections::{HashMap, HashSet}, + fmt, fs::read_to_string, + io::{Result as IoResult, Write}, + net::SocketAddr, path::Path, }, }; -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct Config { pub libpath: String, + #[serde(default)] pub log: ConfigLog, + pub prometheus: Option, pub sqs: ConfigAwsSqs, + pub s3: ConfigAwsS3, + pub messages: ConfigMessages, pub slots: ConfigSlots, #[serde(rename = "accounts")] pub accounts_filters: HashMap, @@ -39,38 +48,60 @@ impl Config { } } -#[derive(Debug, Deserialize)] -#[serde(deny_unknown_fields)] +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields, default)] pub struct ConfigLog { - pub level: Option, + /// Log level. + #[serde(default = "ConfigLog::default_level")] + pub level: String, /// Log filters on startup. - #[serde(default)] pub filters: bool, } +impl Default for ConfigLog { + fn default() -> Self { + Self { + level: Self::default_level(), + filters: false, + } + } +} + +impl ConfigLog { + fn default_level() -> String { + "info".to_owned() + } +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ConfigPrometheus { + /// Address of Prometheus service. + pub address: SocketAddr, +} + #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct ConfigAwsSqs { - #[serde(default, deserialize_with = "deserialize_commitment_level")] - pub commitment_level: SlotStatus, - pub url: String, + pub auth: ConfigAwsAuth, #[serde(deserialize_with = "deserialize_region")] pub region: Region, - pub auth: ConfigAwsAuth, + pub url: String, #[serde(deserialize_with = "deserialize_max_requests")] pub max_requests: usize, } -fn deserialize_commitment_level<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - match Deserialize::deserialize(deserializer)? { - SlotStatus::Processed => Err(de::Error::custom( - "`commitment_level` as `processed` is not supported", - )), - value => Ok(value), - } +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields, untagged)] +pub enum ConfigAwsAuth { + Static { + access_key_id: String, + secret_access_key: String, + }, + Chain { + credentials_file: Option, + profile: Option, + }, } fn deserialize_region<'de, D>(deserializer: D) -> Result @@ -85,25 +116,125 @@ fn deserialize_max_requests<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { - Ok(match usize::deserialize(deserializer)? { + Ok(match UsizeStr::deserialize(deserializer)?.value { 0 => usize::MAX, value => value, }) } +#[derive(Debug, Default, PartialEq, Eq, Hash)] +struct UsizeStr { + value: usize, +} + +impl<'de> Deserialize<'de> for UsizeStr { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct AnyVisitor; + + impl<'de> de::Visitor<'de> for AnyVisitor { + type Value = UsizeStr; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "string or number") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + value + .replace('_', "") + .parse::() + .map_err(de::Error::custom) + .map(|value| UsizeStr { value }) + } + + fn visit_u64(self, value: u64) -> Result + where + E: de::Error, + { + Ok(UsizeStr { + value: value as usize, + }) + } + } + + deserializer.deserialize_any(AnyVisitor) + } +} + #[derive(Debug, Clone, Deserialize)] -#[serde(deny_unknown_fields, untagged)] -pub enum ConfigAwsAuth { - Static { - access_key_id: String, - secret_access_key: String, +#[serde(deny_unknown_fields)] +pub struct ConfigAwsS3 { + pub auth: ConfigAwsAuth, + #[serde(deserialize_with = "deserialize_region")] + pub region: Region, + pub bucket: String, + #[serde(deserialize_with = "deserialize_max_requests")] + pub max_requests: usize, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ConfigMessages { + #[serde(default, deserialize_with = "deserialize_commitment_level")] + pub commitment_level: SlotStatus, + #[serde(default)] + pub accounts_data_compression: AccountsDataCompression, +} + +fn deserialize_commitment_level<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + match Deserialize::deserialize(deserializer)? { + SlotStatus::Processed => Err(de::Error::custom( + "`commitment_level` as `processed` is not supported", + )), + value => Ok(value), + } +} + +#[derive(Debug, Clone, Copy, Deserialize, derivative::Derivative)] +#[derivative(Default)] +#[serde(deny_unknown_fields, rename_all = "lowercase", tag = "algo")] +pub enum AccountsDataCompression { + #[derivative(Default)] + None, + Zstd { + #[serde(default)] + level: i32, }, - Chain { - credentials_file: Option, - profile: Option, + Gzip { + #[serde(default = "AccountsDataCompression::gzip_default_level")] + level: u32, }, } +impl AccountsDataCompression { + #[allow(clippy::ptr_arg)] + pub fn compress<'a>(&self, data: &'a Vec) -> IoResult>> { + Ok(match self { + AccountsDataCompression::None => Cow::Borrowed(data), + AccountsDataCompression::Zstd { level } => { + Cow::Owned(zstd::stream::encode_all::<&[u8]>(data.as_ref(), *level)?) + } + AccountsDataCompression::Gzip { level } => { + let mut encoder = GzEncoder::new(Vec::new(), GzCompression::new(*level)); + encoder.write_all(data)?; + Cow::Owned(encoder.finish()?) + } + }) + } + + fn gzip_default_level() -> u32 { + GzCompression::default().level() + } +} + #[derive(Debug, Clone, Copy, Deserialize)] pub struct ConfigSlots { pub enabled: bool, @@ -128,7 +259,7 @@ impl<'de> Deserialize<'de> for ConfigAccountsFilter { struct ConfigAccountsFilterRaw { account: HashSet, owner: HashSet, - data_size: HashSet, + data_size: HashSet, tokenkeg_owner: HashSet, tokenkeg_delegate: HashSet, } @@ -141,7 +272,7 @@ impl<'de> Deserialize<'de> for ConfigAccountsFilter { } let mut filter = ConfigAccountsFilter { - data_size: raw.data_size, + data_size: raw.data_size.into_iter().map(|v| v.value).collect(), ..Default::default() }; diff --git a/src/lib.rs b/src/lib.rs index bcb9b1a..cc6bdc5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ +pub mod aws; pub mod config; pub mod filter; pub mod plugin; +pub mod prom; pub mod sqs; diff --git a/src/plugin.rs b/src/plugin.rs index b1dd330..4825587 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -1,25 +1,35 @@ -use crate::sqs::SqsClientResult; - use { - crate::{config::Config, sqs::AwsSqsClient}, + crate::{ + config::Config, + prom::PrometheusService, + sqs::{AwsSqsClient, SqsClientResult}, + }, solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }, + tokio::{runtime::Runtime, time::Duration}, }; +#[derive(Debug)] +pub struct PluginInner { + runtime: Runtime, + client: AwsSqsClient, + prometheus: PrometheusService, +} + #[derive(Debug, Default)] pub struct Plugin { - sqs: Option, + inner: Option, } impl Plugin { - fn with_sqs(&self, f: F) -> PluginResult<()> + fn with_client(&self, f: F) -> PluginResult<()> where F: FnOnce(&AwsSqsClient) -> SqsClientResult, { - let sqs = self.sqs.as_ref().expect("initialized"); - f(sqs).map_err(|error| GeyserPluginError::Custom(Box::new(error))) + let inner = self.inner.as_ref().expect("initialized"); + f(&inner.client).map_err(|error| GeyserPluginError::Custom(Box::new(error))) } } @@ -32,21 +42,29 @@ impl GeyserPlugin for Plugin { let config = Config::load_from_file(config_file)?; // Setup logger - let log_level = config.log.level.as_deref().unwrap_or("info"); - solana_logger::setup_with_default(log_level); + solana_logger::setup_with_default(&config.log.level); + + // Create inner + let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; + let prometheus = PrometheusService::new(&runtime, config.prometheus); + let client = runtime + .block_on(AwsSqsClient::new(config)) + .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; - // Sqs client - self.sqs = Some( - AwsSqsClient::new(config) - .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?, - ); + self.inner = Some(PluginInner { + runtime, + client, + prometheus, + }); Ok(()) } fn on_unload(&mut self) { - if let Some(sqs) = self.sqs.take() { - sqs.shutdown(); + if let Some(inner) = self.inner.take() { + inner.prometheus.shutdown(); + inner.runtime.spawn(inner.client.shutdown()); + inner.runtime.shutdown_timeout(Duration::from_secs(30)); } } @@ -56,11 +74,11 @@ impl GeyserPlugin for Plugin { slot: u64, _is_startup: bool, ) -> PluginResult<()> { - self.with_sqs(|sqs| sqs.update_account(account, slot)) + self.with_client(|sqs| sqs.update_account(account, slot)) } fn notify_end_of_startup(&mut self) -> PluginResult<()> { - self.with_sqs(|sqs| sqs.startup_finished()) + self.with_client(|sqs| sqs.startup_finished()) } fn update_slot_status( @@ -69,7 +87,7 @@ impl GeyserPlugin for Plugin { _parent: Option, status: SlotStatus, ) -> PluginResult<()> { - self.with_sqs(|sqs| sqs.update_slot(slot, status)) + self.with_client(|sqs| sqs.update_slot(slot, status)) } fn notify_transaction( @@ -77,11 +95,11 @@ impl GeyserPlugin for Plugin { transaction: ReplicaTransactionInfoVersions, slot: u64, ) -> PluginResult<()> { - self.with_sqs(|sqs| sqs.notify_transaction(transaction, slot)) + self.with_client(|sqs| sqs.notify_transaction(transaction, slot)) } fn notify_block_metadata(&mut self, blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> { - self.with_sqs(|sqs| sqs.notify_block_metadata(blockinfo)) + self.with_client(|sqs| sqs.notify_block_metadata(blockinfo)) } fn transaction_notifications_enabled(&self) -> bool { diff --git a/src/prom.rs b/src/prom.rs new file mode 100644 index 0000000..315a60d --- /dev/null +++ b/src/prom.rs @@ -0,0 +1,148 @@ +use { + super::config::ConfigPrometheus, + futures::FutureExt, + hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, StatusCode, + }, + log::*, + prometheus::{IntCounterVec, IntGauge, Opts, Registry, TextEncoder}, + std::sync::Once, + tokio::{runtime::Runtime, sync::oneshot}, +}; + +lazy_static::lazy_static! { + pub static ref REGISTRY: Registry = Registry::new(); + + pub static ref UPLOAD_QUEUE_SIZE: IntGauge = IntGauge::new( + "upload_queue_size", + "Number of messages in the queue for upload" + ).unwrap(); + + pub static ref UPLOAD_MESSAGES_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("upload_messages_total", "Status of uploaded messages"), + &["status"] + ).unwrap(); + + pub static ref UPLOAD_SQS_REQUESTS: IntGauge = IntGauge::new( + "upload_sqs_requests", + "Number of active upload SQS requests" + ).unwrap(); + + pub static ref UPLOAD_SQS_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("upload_sqs_total", "Status of uploaded SQS messages"), + &["status"] + ).unwrap(); + + pub static ref UPLOAD_S3_REQUESTS: IntGauge = IntGauge::new( + "upload_s3_requests", + "Number of active upload S3 requests" + ).unwrap(); + + pub static ref UPLOAD_S3_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("upload_s3_total", "Status of uploaded S3 payloads"), + &["status"] + ).unwrap(); +} + +#[derive(Debug, Clone, Copy)] +pub enum UploadMessagesStatus { + Success, + Failed, + Dropped, +} + +impl UploadMessagesStatus { + pub fn as_str(&self) -> &str { + match *self { + UploadMessagesStatus::Success => "success", + UploadMessagesStatus::Failed => "failed", + UploadMessagesStatus::Dropped => "dropped", + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum UploadAwsStatus { + Success, + Failed, +} + +impl UploadAwsStatus { + pub fn as_str(&self) -> &str { + match *self { + UploadAwsStatus::Success => "success", + UploadAwsStatus::Failed => "failed", + } + } +} + +#[derive(Debug)] +pub struct PrometheusService { + shutdown_signal: oneshot::Sender<()>, +} + +impl PrometheusService { + pub fn new(runtime: &Runtime, config: Option) -> Self { + static REGISTER: Once = Once::new(); + REGISTER.call_once(|| { + macro_rules! register { + ($collector:ident) => { + REGISTRY + .register(Box::new($collector.clone())) + .expect("collector can't be registered"); + }; + } + register!(UPLOAD_QUEUE_SIZE); + register!(UPLOAD_SQS_REQUESTS); + register!(UPLOAD_SQS_TOTAL); + register!(UPLOAD_S3_REQUESTS); + register!(UPLOAD_S3_TOTAL); + }); + + let (tx, rx) = oneshot::channel(); + if let Some(ConfigPrometheus { address }) = config { + runtime.spawn(async move { + let make_service = make_service_fn(move |_: &AddrStream| async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { + let response = match req.uri().path() { + "/metrics" => metrics_handler(), + _ => not_found_handler(), + }; + Ok::<_, hyper::Error>(response) + })) + }); + let server = Server::bind(&address).serve(make_service); + if let Err(error) = tokio::try_join!(server, rx.map(|_| Ok(()))) { + error!("prometheus service failed: {}", error); + } + }); + } + + PrometheusService { + shutdown_signal: tx, + } + } + + pub fn shutdown(self) { + let _ = self.shutdown_signal.send(()); + } +} + +fn metrics_handler() -> Response { + let metrics = TextEncoder::new() + .encode_to_string(®ISTRY.gather()) + .unwrap_or_else(|error| { + error!("could not encode custom metrics: {}", error); + String::new() + }); + Response::builder().body(Body::from(metrics)).unwrap() +} + +fn not_found_handler() -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} diff --git a/src/sqs.rs b/src/sqs.rs index 9cc7b99..e0fa693 100644 --- a/src/sqs.rs +++ b/src/sqs.rs @@ -1,24 +1,19 @@ use { super::{ - config::{Config, ConfigAwsAuth, ConfigAwsSqs}, + aws::{AwsError, S3Client, SqsClient}, + config::{AccountsDataCompression, Config}, filter::{AccountsFilter, TransactionsFilter}, + prom::{UploadMessagesStatus, UPLOAD_MESSAGES_TOTAL, UPLOAD_QUEUE_SIZE}, }, arrayref::array_ref, - async_trait::async_trait, - futures::future::FutureExt, - humantime::format_duration, - log::*, - rusoto_core::{HttpClient, RusotoError}, - rusoto_credential::{ - AutoRefreshingProvider, AwsCredentials, ChainProvider, CredentialsError, ProfileProvider, - ProvideAwsCredentials, StaticProvider, - }, - rusoto_sqs::{ - GetQueueAttributesError, GetQueueAttributesRequest, SendMessageBatchError, - SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, SqsClient as RusotoSqsClient, + futures::{ + future::FutureExt, + stream::{self, StreamExt}, }, + log::*, + rusoto_sqs::{MessageAttributeValue, SendMessageBatchRequestEntry}, serde::{Deserialize, Serialize}, - serde_json::json, + serde_json::{json, Value}, solana_geyser_plugin_interface::geyser_plugin_interface::{ ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, SlotStatus as GeyserSlotStatus, @@ -34,18 +29,18 @@ use { solana_transaction_status::UiTransactionStatusMeta, spl_token::state::Account as SplTokenAccount, std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap, LinkedList}, convert::TryInto, + io::Result as IoResult, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, thread::sleep, - time::{Duration, Instant}, + time::Duration, }, thiserror::Error, tokio::{ - runtime::Runtime, sync::{mpsc, Semaphore}, time::sleep as sleep_async, }, @@ -176,22 +171,110 @@ enum SendMessage { Transaction(ReplicaTransactionInfo), } +impl SendMessage { + fn payload(&self, compression: &AccountsDataCompression) -> IoResult { + Ok(match self { + SendMessage::Slot((status, slot)) => json!({ + "type": "slot", + "status": status, + "slot": slot, + }), + SendMessage::Account((account, filters)) => json!({ + "type": "account", + "filters": filters, + "pubkey": account.pubkey.to_string(), + "lamports": account.lamports, + "owner": account.owner.to_string(), + "executable": account.executable, + "rent_epoch": account.rent_epoch, + "data": base64::encode(compression.compress(&account.data)?.as_ref()), + "write_version": account.write_version, + "slot": account.slot, + }), + SendMessage::Transaction(transaction) => json!({ + "type": "transaction", + "signature": transaction.signature.to_string(), + "transaction": base64::encode(&bincode::serialize(&transaction.transaction).unwrap()), + "meta": transaction.meta, + "slot": transaction.slot, + "block_time": transaction.block_time.unwrap_or_default(), + }), + } + .to_string()) + } + + fn s3_key(&self) -> String { + match self { + Self::Slot((status, slot)) => { + warn!("Slot is not expected to be uploaded to S3"); + format!("slot-{}-{:?}", slot, status) + } + Self::Account((account, _filters)) => { + format!( + "account-{}-{}-{}", + account.slot, account.pubkey, account.write_version + ) + } + Self::Transaction(transaction) => { + warn!("Transaction is not expected to be uploaded to S3"); + format!("transaction-{}-{}", transaction.slot, transaction.signature) + } + } + } + + fn update_info(&self, mut value: Value) -> Value { + match self { + SendMessage::Slot((status, slot)) => { + value["status"] = json!(status); + value["slot"] = json!(slot); + } + SendMessage::Account((account, _filters)) => { + value["pubkey"] = json!(account.pubkey.to_string()); + value["slot"] = json!(account.slot); + value["write_version"] = json!(account.write_version); + } + SendMessage::Transaction(transaction) => { + value["signature"] = json!(transaction.signature.to_string()); + value["slot"] = json!(transaction.slot); + } + }; + value + } +} + +#[derive(Debug)] +struct SendMessageWithPayload { + message: SendMessage, + s3: bool, + payload: String, +} + +impl SendMessageWithPayload { + const S3_SIZE: usize = 250; + + fn new(message: SendMessage, payload: String) -> Self { + Self { + message, + s3: payload.len() > SqsClient::REQUEST_LIMIT, + payload, + } + } + + fn payload_size(&self) -> usize { + if self.s3 { + Self::S3_SIZE + } else { + self.payload.len() + } + } +} + #[derive(Debug, Error)] pub enum SqsClientError { #[error("invalid commitment_level: {0:?}")] InvalidCommitmentLevel(SlotStatus), - #[error("failed to create Runtime: {0}")] - RuntimeCreate(std::io::Error), - #[error("aws credential error: {0}")] - AwsCredentials(#[from] CredentialsError), - #[error("HttpClient error: {0}")] - AwsHttpClientTls(#[from] rusoto_core::request::TlsError), - #[error("failed to get queue attributes: {0}")] - SqsGetAttributes(#[from] RusotoError), - #[error("failed to send messages: {0}")] - SqsSendMessageBatch(#[from] RusotoError), - #[error("failed to send some messages: {0}")] - SqsSendMessageBatchEntry(String), + #[error("aws error: {0}")] + Aws(#[from] AwsError), #[error("send message through send queue failed: channel is closed")] UpdateQueueChannelClosed, } @@ -200,80 +283,55 @@ pub type SqsClientResult = Result; #[derive(Debug)] pub struct AwsSqsClient { - runtime: Runtime, send_queue: mpsc::UnboundedSender, startup_job: Arc, send_job: Arc, } impl AwsSqsClient { - pub fn new(config: Config) -> SqsClientResult { + pub async fn new(config: Config) -> SqsClientResult { if !matches!( - config.sqs.commitment_level, + config.messages.commitment_level, SlotStatus::Confirmed | SlotStatus::Finalized ) { return Err(SqsClientError::InvalidCommitmentLevel( - config.sqs.commitment_level, + config.messages.commitment_level, )); } + SqsClient::new(config.sqs.clone())?.check().await?; + let startup_job = Arc::new(AtomicBool::new(true)); let startup_job_loop = Arc::clone(&startup_job); let send_job = Arc::new(AtomicBool::new(true)); let send_job_loop = Arc::clone(&send_job); - let runtime = Runtime::new().map_err(SqsClientError::RuntimeCreate)?; - let send_queue = runtime.block_on(async move { - // Check that SQS is available - let (client, queue_url) = Self::create_sqs(config.sqs.clone())?; - client - .get_queue_attributes(GetQueueAttributesRequest { - attribute_names: None, - queue_url: queue_url.clone(), - }) - .await?; - - let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - if let Err(error) = - Self::send_loop(config, rx, startup_job_loop, send_job_loop).await - { - error!("update_loop failed: {:?}", error); - } - }); - Ok::<_, SqsClientError>(tx) - })?; + let (send_queue, rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { + if let Err(error) = Self::send_loop(config, rx, startup_job_loop, send_job_loop).await { + error!("update_loop failed: {:?}", error); + } + }); Ok(Self { - runtime, send_queue, startup_job, send_job, }) } - pub fn create_sqs(config: ConfigAwsSqs) -> SqsClientResult<(RusotoSqsClient, String)> { - let request_dispatcher = HttpClient::new()?; - let credentials_provider = AwsCredentialsProvider::new(config.auth)?; - let sqs = - RusotoSqsClient::new_with(request_dispatcher, credentials_provider, config.region); - Ok((sqs, config.url)) - } - fn send_message(&self, message: Message) -> SqsClientResult { self.send_queue .send(message) .map_err(|_| SqsClientError::UpdateQueueChannelClosed) } - pub fn shutdown(self) { + pub async fn shutdown(self) { if self.send_message(Message::Shutdown).is_ok() { while self.send_job.load(Ordering::Relaxed) { - sleep(Duration::from_micros(10)); + sleep_async(Duration::from_micros(10)).await; } } - - self.runtime.shutdown_timeout(Duration::from_secs(10)); } pub fn startup_finished(&self) -> SqsClientResult { @@ -335,9 +393,11 @@ impl AwsSqsClient { startup_job: Arc, send_job: Arc, ) -> SqsClientResult { - let max_requests = config.sqs.max_requests; - let commitment_level = config.sqs.commitment_level; - let (client, queue_url) = Self::create_sqs(config.sqs)?; + let sqs_max_requests = config.sqs.max_requests; + let commitment_level = config.messages.commitment_level; + let accounts_data_compression = config.messages.accounts_data_compression; + let sqs = SqsClient::new(config.sqs)?; + let s3 = S3Client::new(config.s3)?; let is_slot_messages_enabled = config.slots.enabled; let accounts_filter = AccountsFilter::new(config.accounts_filters); let transactions_filter = TransactionsFilter::new(config.transactions_filter); @@ -382,65 +442,8 @@ impl AwsSqsClient { } info!("startup finished"); - // Spawn simple stats - let send_job_stats = Arc::clone(&send_job); - let (stats_queued_tx, mut stats_queued_rx) = mpsc::unbounded_channel(); - let (stats_inprocess_tx, mut stats_inprocess_rx) = mpsc::unbounded_channel(); - let (stats_processed_tx, mut stats_processed_rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - let mut last_update = Instant::now(); - let mut queued = 0; - let mut inprocess = 0; - let mut processed = 0; - let mut processed_time = Duration::from_secs(0); - loop { - if !send_job_stats.load(Ordering::Relaxed) { - break; - } - - while let Ok(count) = stats_queued_rx.try_recv() { - queued += count; - } - while let Ok(count) = stats_inprocess_rx.try_recv() { - queued -= count; - inprocess += count; - } - while let Ok((count, elapsed)) = stats_processed_rx.try_recv() { - inprocess -= count; - processed += count; - processed_time += elapsed * count as u32; - } - - if last_update.elapsed() < Duration::from_secs(10) { - sleep_async(Duration::from_micros(1_000)).await; - continue; - } - - log!( - if inprocess > 200 { - Level::Warn - } else { - Level::Info - }, - "queued: {}, in process: {}, processed: {}, avg processing time: {}", - queued, - inprocess, - processed, - format_duration( - processed_time - .checked_div(processed as u32) - .unwrap_or_default() - ) - ); - - last_update = Instant::now(); - processed = 0; - processed_time = Duration::from_secs(0); - } - }); - // Messages, accounts and tokenkeg history changes - let mut messages = vec![]; + let mut messages: LinkedList = LinkedList::new(); let mut accounts: BTreeMap> = BTreeMap::new(); type TokenkegHist = BTreeMap>>; let mut tokenkeg_owner_accounts_hist: TokenkegHist = BTreeMap::new(); @@ -448,6 +451,26 @@ impl AwsSqsClient { let mut transactions: BTreeMap> = BTreeMap::new(); let mut blocks: BTreeMap = BTreeMap::new(); + // Add message to the tail and increase counter + fn add_message( + messages: &mut LinkedList, + message: SendMessage, + accounts_data_compression: &AccountsDataCompression, + ) { + match message.payload(accounts_data_compression) { + Ok(payload) => { + messages.push_back(SendMessageWithPayload::new(message, payload)); + UPLOAD_QUEUE_SIZE.inc(); + } + Err(error) => { + error!("failed to create payload: {:?}", error); + UPLOAD_MESSAGES_TOTAL + .with_label_values(&[UploadMessagesStatus::Dropped.as_str()]) + .inc(); + } + } + } + // Remove outdated slots from `BTreeMap` (accounts, tokenkeg history) fn remove_outdated_slots(map: &mut BTreeMap, min_slot: u64) { loop { @@ -461,7 +484,7 @@ impl AwsSqsClient { // Add new messages for an accounts on commitment_level #[allow(clippy::too_many_arguments)] fn generate_messages( - messages: &mut Vec, + messages: &mut LinkedList, accounts: &BTreeSet, accounts_filter: &AccountsFilter, tokenkeg_owner_accounts: &mut HashMap, @@ -470,6 +493,7 @@ impl AwsSqsClient { tokenkeg_delegate_accounts_hist: &mut HashMap>, transactions: &[ReplicaTransactionInfo], transactions_filter: &TransactionsFilter, + accounts_data_compression: &AccountsDataCompression, ) { for account in accounts { let mut filters = accounts_filter.create_match(); @@ -555,13 +579,21 @@ impl AwsSqsClient { let filters = filters.get_filters(); if !filters.is_empty() { - messages.push(SendMessage::Account((account.clone(), filters))); + add_message( + messages, + SendMessage::Account((account.clone(), filters)), + accounts_data_compression, + ); } } for transaction in transactions { if transactions_filter.contains(transaction) { - messages.push(SendMessage::Transaction(transaction.clone())) + add_message( + messages, + SendMessage::Transaction(transaction.clone()), + accounts_data_compression, + ); } } } @@ -580,27 +612,35 @@ impl AwsSqsClient { } // Handle messages - let send_jobs = Arc::new(Semaphore::new(max_requests)); + let send_jobs = Arc::new(Semaphore::new(sqs_max_requests)); let mut status_current_slot = 0; let mut account_current_slot = 0; let mut transaction_current_slot = 0; loop { if !messages.is_empty() && send_jobs.available_permits() > 0 { - let slice = 0..messages.len().min(10); - let messages = messages.drain(slice).collect::>(); - let messages_count = messages.len(); - let _ = stats_inprocess_tx.send(messages_count); - let client = client.clone(); - let queue_url = queue_url.clone(); - let stats_processed_tx = stats_processed_tx.clone(); + let mut messages_batch_size = 0; + let mut messages_batch = Vec::with_capacity(10); + while messages_batch.len() < 10 { + if let Some(message) = messages.pop_front() { + if messages_batch_size + message.payload_size() <= SqsClient::REQUEST_LIMIT + { + messages_batch_size += message.payload_size(); + messages_batch.push(message); + } else { + messages.push_front(message); + break; + } + } else { + break; + } + } + UPLOAD_QUEUE_SIZE.sub(messages_batch.len() as i64); + let sqs = sqs.clone(); + let s3 = s3.clone(); let send_jobs = Arc::clone(&send_jobs); let send_permit = send_jobs.try_acquire_owned().expect("available permit"); tokio::spawn(async move { - let ts = Instant::now(); - if let Err(error) = Self::send_messages(client, queue_url, messages).await { - error!("failed to send data: {:?}", error); - } - let _ = stats_processed_tx.send((messages_count, ts.elapsed())); + Self::send_messages(sqs, s3, messages_batch).await; drop(send_permit); }); continue; @@ -617,8 +657,7 @@ impl AwsSqsClient { message = rx.recv() => match message { Some(Message::UpdateSlot((status, slot))) => { if is_slot_messages_enabled { - messages.push(SendMessage::Slot((status, slot))); - let _ = stats_queued_tx.send(1); + add_message(&mut messages, SendMessage::Slot((status, slot)), &accounts_data_compression); } if status == commitment_level { @@ -652,7 +691,6 @@ impl AwsSqsClient { transaction.block_time = block.block_time; } - let size = messages.len(); generate_messages( &mut messages, accounts, @@ -663,8 +701,8 @@ impl AwsSqsClient { tokenkeg_delegate_accounts_hist.entry(slot).or_default(), transactions, &transactions_filter, + &accounts_data_compression ); - let _ = stats_queued_tx.send(messages.len() - size); } _ => error!( "send_loop error: accounts/transactions/block for slot {} does not exists", @@ -702,7 +740,7 @@ impl AwsSqsClient { } } let _ = send_jobs - .acquire_many(max_requests.try_into().expect("valid size")) + .acquire_many(sqs_max_requests.try_into().expect("valid size")) .await .expect("alive"); send_job.store(false, Ordering::Relaxed); @@ -711,139 +749,86 @@ impl AwsSqsClient { Ok(()) } - async fn send_messages( - client: RusotoSqsClient, - queue_url: String, - messages: Vec, - ) -> SqsClientResult { - let entries = messages - .iter() - .enumerate() - .map(|(id, message)| SendMessageBatchRequestEntry { - id: id.to_string(), - message_body: match message { - SendMessage::Slot((status, slot)) => json!({ - "type": "slot", - "status": status, - "slot": slot, - }), - SendMessage::Account((account, filters)) => json!({ - "type": "account", - "filters": filters, - "pubkey": account.pubkey.to_string(), - "lamports": account.lamports, - "owner": account.owner.to_string(), - "executable": account.executable, - "rent_epoch": account.rent_epoch, - "data": base64::encode(&account.data), - "write_version": account.write_version, - "slot": account.slot, - }), - SendMessage::Transaction(transaction) => json!({ - "type": "transaction", - "signature": transaction.signature.to_string(), - "transaction": base64::encode(&bincode::serialize(&transaction.transaction).unwrap()), - "meta": transaction.meta, - "slot": transaction.slot, - "block_time": transaction.block_time.unwrap_or_default(), - }), - } - .to_string(), - ..Default::default() - }) - .collect::>(); - - let failed = client - .send_message_batch(SendMessageBatchRequest { entries, queue_url }) - .await? - .failed; - - if failed.is_empty() { - Ok(()) - } else { - Err(SqsClientError::SqsSendMessageBatchEntry( - serde_json::to_string( - &failed - .into_iter() - .map(|entry| { - let mut value = json!({ - "code": entry.code, - "message": entry.message, - "sender_fault": entry.sender_fault, - }); - - let index = entry.id.parse::().ok(); - match index.and_then(|index| messages.get(index)).unwrap() { - SendMessage::Slot((status, slot)) => { - value["status"] = json!(status); - value["slot"] = json!(slot); - } - SendMessage::Account((account, _filters)) => { - value["pubkey"] = json!(account.pubkey.to_string()); - value["slot"] = json!(account.slot); - value["write_version"] = json!(account.write_version); - } - SendMessage::Transaction(transaction) => { - value["signature"] = json!(transaction.signature.to_string()); - value["slot"] = json!(transaction.slot); + async fn send_messages(sqs: SqsClient, s3: S3Client, messages: Vec) { + let mut success_count = 0; + let mut failed_count = 0; + + let messages_initial_count = messages.len(); + let (messages, entries): (Vec, Vec<_>) = + stream::iter(messages.into_iter().enumerate()) + .filter_map(|(id, message)| { + let s3 = if message.s3 { Some(s3.clone()) } else { None }; + async move { + let (message_body, message_attributes) = match s3 { + Some(s3) => { + let key = message.message.s3_key(); + if let Err(error) = s3 + .put_object(key.clone(), message.payload.into_bytes()) + .await + { + let value = message.message.update_info(json!({})); + error!( + "failed to upload payload to s3 ({:?}): {:?}", + error, + serde_json::to_string(&value) + ); + return None; } + ( + "s3".to_owned(), + Some(maplit::hashmap! { + "s3".to_owned() => MessageAttributeValue { + data_type: "String".to_owned(), + string_value: Some(key), + ..Default::default() + } + }), + ) } - - value - }) - .collect::>(), - ) - .unwrap(), - )) - } - } -} - -#[allow(clippy::large_enum_variant)] -enum AwsCredentialsProvider { - Static(StaticProvider), - Chain(AutoRefreshingProvider), -} - -impl AwsCredentialsProvider { - fn new(config: ConfigAwsAuth) -> SqsClientResult { - match config { - ConfigAwsAuth::Static { - access_key_id, - secret_access_key, - } => Ok(Self::Static(StaticProvider::new_minimal( - access_key_id, - secret_access_key, - ))), - ConfigAwsAuth::Chain { - credentials_file, - profile, - } => { - let profile_provider = match (credentials_file, profile) { - (Some(file_path), Some(profile)) => { - ProfileProvider::with_configuration(file_path, profile) - } - (Some(file_path), None) => { - ProfileProvider::with_default_configuration(file_path) + None => (message.payload, None), + }; + Some(( + message.message, + SendMessageBatchRequestEntry { + id: id.to_string(), + message_body, + message_attributes, + ..Default::default() + }, + )) } - (None, Some(profile)) => ProfileProvider::with_default_credentials(profile)?, - (None, None) => ProfileProvider::new()?, - }; - Ok(Self::Chain(AutoRefreshingProvider::new( - ChainProvider::with_profile_provider(profile_provider), - )?)) + }) + .unzip() + .await; + failed_count += messages_initial_count - messages.len(); + + let failed = sqs.send_batch(entries).await; + success_count += messages.len() - failed.len(); + failed_count += failed.len(); + for entry in failed { + let index = entry.id.parse::().ok(); + if let Some(message) = index.and_then(|index| messages.get(index)) { + let value = message.update_info(json!({ + "code": entry.code, + "message": entry.message, + "sender_fault": entry.sender_fault, + })); + error!( + "failed to send sqs message: {:?}", + serde_json::to_string(&value) + ); } } - } -} -#[async_trait] -impl ProvideAwsCredentials for AwsCredentialsProvider { - async fn credentials(&self) -> Result { - match self { - Self::Static(p) => p.credentials(), - Self::Chain(p) => p.credentials(), + for (status, count) in [ + (UploadMessagesStatus::Success, success_count), + (UploadMessagesStatus::Failed, failed_count), + ] { + if count > 0 { + UPLOAD_MESSAGES_TOTAL + .with_label_values(&[status.as_str()]) + .inc_by(count as u64); + } } - .await } }