diff --git a/Cargo.lock b/Cargo.lock index 328ff47..2087bcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1271,12 +1271,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "maplit" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" - [[package]] name = "matches" version = "0.1.9" @@ -2354,7 +2348,6 @@ dependencies = [ "hyper", "lazy_static", "log", - "maplit", "prometheus", "rusoto_core", "rusoto_credential", diff --git a/Cargo.toml b/Cargo.toml index 686f212..e483e1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ humantime = "2.1.0" hyper = { version = "0.14", features = ["server"] } lazy_static = "1" log = "0.4.14" -maplit = "1" prometheus = "0.13" rusoto_core = "0.48" rusoto_credential = "0.48" diff --git a/src/aws.rs b/src/aws.rs index ee7a2ac..8c1eb75 100644 --- a/src/aws.rs +++ b/src/aws.rs @@ -14,10 +14,10 @@ use { rusoto_s3::{PutObjectError, PutObjectRequest, S3Client as RusotoS3Client, S3}, rusoto_sqs::{ BatchResultErrorEntry, GetQueueAttributesError, GetQueueAttributesRequest, - SendMessageBatchError, SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, - SqsClient as RusotoSqsClient, + MessageAttributeValue, SendMessageBatchError, SendMessageBatchRequest, + SendMessageBatchRequestEntry, Sqs, SqsClient as RusotoSqsClient, }, - std::sync::Arc, + std::{collections::HashMap, sync::Arc}, thiserror::Error, tokio::sync::Semaphore, }; @@ -38,6 +38,35 @@ pub enum AwsError { pub type AwsResult = Result; +#[derive(Debug, Default)] +pub struct SqsMessageAttributes { + map: HashMap, +} + +impl SqsMessageAttributes { + pub fn new, S2: Into>(key: S1, value: S2) -> Self { + let mut attributes = Self::default(); + attributes.insert(key, value); + attributes + } + + pub fn insert, S2: Into>(&mut self, key: S1, value: S2) -> &Self { + self.map.insert( + key.into(), + MessageAttributeValue { + data_type: "String".to_owned(), + string_value: Some(value.into()), + ..Default::default() + }, + ); + self + } + + pub fn into_inner(self) -> HashMap { + self.map + } +} + #[derive(derivative::Derivative)] #[derivative(Debug, Clone)] pub struct SqsClient { diff --git a/src/bin/consumer.rs b/src/bin/consumer.rs index a1f3b79..ab20f1b 100644 --- a/src/bin/consumer.rs +++ b/src/bin/consumer.rs @@ -5,14 +5,13 @@ use { humantime::format_rfc3339_millis, rusoto_s3::{DeleteObjectRequest, GetObjectOutput, GetObjectRequest, PutObjectRequest, S3}, rusoto_sqs::{ - DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, MessageAttributeValue, - ReceiveMessageRequest, ReceiveMessageResult, SendMessageBatchRequest, - SendMessageBatchRequestEntry, Sqs, + DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, ReceiveMessageRequest, + ReceiveMessageResult, SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, }, serde::Deserialize, solana_geyser_sqs::{ - aws::{S3Client, SqsClient}, - config::Config, + aws::{S3Client, SqsClient, SqsMessageAttributes}, + config::{AccountsDataCompression, Config}, }, std::{ collections::hash_map::DefaultHasher, @@ -88,20 +87,17 @@ async fn send_loop(config: Config) -> anyhow::Result<()> { .await; println!("Put s3 object ({}): {:?}", key, result); + let mut message_attributes = + SqsMessageAttributes::new("compression", AccountsDataCompression::None.as_str()); + message_attributes.insert("s3", key); + let result = sqs .client .send_message_batch(SendMessageBatchRequest { entries: vec![SendMessageBatchRequestEntry { id: "0".to_owned(), message_body: "s3".to_owned(), - message_attributes: Some(maplit::hashmap! { - "s3".to_owned() => - MessageAttributeValue { - data_type: "String".to_owned(), - string_value: Some(key), - ..Default::default() - }, - }), + message_attributes: Some(message_attributes.into_inner()), ..Default::default() }], queue_url: sqs.queue_url.clone(), diff --git a/src/config.rs b/src/config.rs index e2cc822..6ca12e4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -215,6 +215,10 @@ pub enum AccountsDataCompression { } impl AccountsDataCompression { + fn gzip_default_level() -> u32 { + GzCompression::default().level() + } + #[allow(clippy::ptr_arg)] pub fn compress<'a>(&self, data: &'a Vec) -> IoResult>> { Ok(match self { @@ -230,8 +234,12 @@ impl AccountsDataCompression { }) } - fn gzip_default_level() -> u32 { - GzCompression::default().level() + pub fn as_str(&self) -> &str { + match *self { + AccountsDataCompression::None => "none", + AccountsDataCompression::Zstd { .. } => "zstd", + AccountsDataCompression::Gzip { .. } => "gzip", + } } } diff --git a/src/sqs.rs b/src/sqs.rs index e0fa693..90bf261 100644 --- a/src/sqs.rs +++ b/src/sqs.rs @@ -1,6 +1,6 @@ use { super::{ - aws::{AwsError, S3Client, SqsClient}, + aws::{AwsError, S3Client, SqsClient, SqsMessageAttributes}, config::{AccountsDataCompression, Config}, filter::{AccountsFilter, TransactionsFilter}, prom::{UploadMessagesStatus, UPLOAD_MESSAGES_TOTAL, UPLOAD_QUEUE_SIZE}, @@ -11,7 +11,7 @@ use { stream::{self, StreamExt}, }, log::*, - rusoto_sqs::{MessageAttributeValue, SendMessageBatchRequestEntry}, + rusoto_sqs::SendMessageBatchRequestEntry, serde::{Deserialize, Serialize}, serde_json::{json, Value}, solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -247,17 +247,27 @@ struct SendMessageWithPayload { message: SendMessage, s3: bool, payload: String, + message_attributes: SqsMessageAttributes, } impl SendMessageWithPayload { const S3_SIZE: usize = 250; - fn new(message: SendMessage, payload: String) -> Self { - Self { - message, - s3: payload.len() > SqsClient::REQUEST_LIMIT, - payload, - } + fn new( + message: SendMessage, + accounts_data_compression: &AccountsDataCompression, + ) -> IoResult { + message + .payload(accounts_data_compression) + .map(|payload| Self { + message, + s3: payload.len() > SqsClient::REQUEST_LIMIT, + payload, + message_attributes: SqsMessageAttributes::new( + "compression", + accounts_data_compression.as_str(), + ), + }) } fn payload_size(&self) -> usize { @@ -457,9 +467,9 @@ impl AwsSqsClient { message: SendMessage, accounts_data_compression: &AccountsDataCompression, ) { - match message.payload(accounts_data_compression) { - Ok(payload) => { - messages.push_back(SendMessageWithPayload::new(message, payload)); + match SendMessageWithPayload::new(message, accounts_data_compression) { + Ok(message) => { + messages.push_back(message); UPLOAD_QUEUE_SIZE.inc(); } Err(error) => { @@ -759,7 +769,8 @@ impl AwsSqsClient { .filter_map(|(id, message)| { let s3 = if message.s3 { Some(s3.clone()) } else { None }; async move { - let (message_body, message_attributes) = match s3 { + let mut message_attributes = message.message_attributes; + let message_body = match s3 { Some(s3) => { let key = message.message.s3_key(); if let Err(error) = s3 @@ -774,25 +785,17 @@ impl AwsSqsClient { ); return None; } - ( - "s3".to_owned(), - Some(maplit::hashmap! { - "s3".to_owned() => MessageAttributeValue { - data_type: "String".to_owned(), - string_value: Some(key), - ..Default::default() - } - }), - ) + message_attributes.insert("s3", key); + "s3".to_owned() } - None => (message.payload, None), + None => message.payload, }; Some(( message.message, SendMessageBatchRequestEntry { id: id.to_string(), message_body, - message_attributes, + message_attributes: Some(message_attributes.into_inner()), ..Default::default() }, ))