Skip to content

Commit

Permalink
Add compression to message attributes (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jun 1, 2022
1 parent 2ffa458 commit 99b64c6
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 50 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ humantime = "2.1.0"
hyper = { version = "0.14", features = ["server"] }
lazy_static = "1"
log = "0.4.14"
maplit = "1"
prometheus = "0.13"
rusoto_core = "0.48"
rusoto_credential = "0.48"
Expand Down
35 changes: 32 additions & 3 deletions src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use {
rusoto_s3::{PutObjectError, PutObjectRequest, S3Client as RusotoS3Client, S3},
rusoto_sqs::{
BatchResultErrorEntry, GetQueueAttributesError, GetQueueAttributesRequest,
SendMessageBatchError, SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs,
SqsClient as RusotoSqsClient,
MessageAttributeValue, SendMessageBatchError, SendMessageBatchRequest,
SendMessageBatchRequestEntry, Sqs, SqsClient as RusotoSqsClient,
},
std::sync::Arc,
std::{collections::HashMap, sync::Arc},
thiserror::Error,
tokio::sync::Semaphore,
};
Expand All @@ -38,6 +38,35 @@ pub enum AwsError {

pub type AwsResult<T = ()> = Result<T, AwsError>;

#[derive(Debug, Default)]
pub struct SqsMessageAttributes {
map: HashMap<String, MessageAttributeValue>,
}

impl SqsMessageAttributes {
pub fn new<S1: Into<String>, S2: Into<String>>(key: S1, value: S2) -> Self {
let mut attributes = Self::default();
attributes.insert(key, value);
attributes
}

pub fn insert<S1: Into<String>, S2: Into<String>>(&mut self, key: S1, value: S2) -> &Self {
self.map.insert(
key.into(),
MessageAttributeValue {
data_type: "String".to_owned(),
string_value: Some(value.into()),
..Default::default()
},
);
self
}

pub fn into_inner(self) -> HashMap<String, MessageAttributeValue> {
self.map
}
}

#[derive(derivative::Derivative)]
#[derivative(Debug, Clone)]
pub struct SqsClient {
Expand Down
22 changes: 9 additions & 13 deletions src/bin/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use {
humantime::format_rfc3339_millis,
rusoto_s3::{DeleteObjectRequest, GetObjectOutput, GetObjectRequest, PutObjectRequest, S3},
rusoto_sqs::{
DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, MessageAttributeValue,
ReceiveMessageRequest, ReceiveMessageResult, SendMessageBatchRequest,
SendMessageBatchRequestEntry, Sqs,
DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, ReceiveMessageRequest,
ReceiveMessageResult, SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs,
},
serde::Deserialize,
solana_geyser_sqs::{
aws::{S3Client, SqsClient},
config::Config,
aws::{S3Client, SqsClient, SqsMessageAttributes},
config::{AccountsDataCompression, Config},
},
std::{
collections::hash_map::DefaultHasher,
Expand Down Expand Up @@ -88,20 +87,17 @@ async fn send_loop(config: Config) -> anyhow::Result<()> {
.await;
println!("Put s3 object ({}): {:?}", key, result);

let mut message_attributes =
SqsMessageAttributes::new("compression", AccountsDataCompression::None.as_str());
message_attributes.insert("s3", key);

let result = sqs
.client
.send_message_batch(SendMessageBatchRequest {
entries: vec![SendMessageBatchRequestEntry {
id: "0".to_owned(),
message_body: "s3".to_owned(),
message_attributes: Some(maplit::hashmap! {
"s3".to_owned() =>
MessageAttributeValue {
data_type: "String".to_owned(),
string_value: Some(key),
..Default::default()
},
}),
message_attributes: Some(message_attributes.into_inner()),
..Default::default()
}],
queue_url: sqs.queue_url.clone(),
Expand Down
12 changes: 10 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ pub enum AccountsDataCompression {
}

impl AccountsDataCompression {
fn gzip_default_level() -> u32 {
GzCompression::default().level()
}

#[allow(clippy::ptr_arg)]
pub fn compress<'a>(&self, data: &'a Vec<u8>) -> IoResult<Cow<'a, Vec<u8>>> {
Ok(match self {
Expand All @@ -230,8 +234,12 @@ impl AccountsDataCompression {
})
}

fn gzip_default_level() -> u32 {
GzCompression::default().level()
pub fn as_str(&self) -> &str {
match *self {
AccountsDataCompression::None => "none",
AccountsDataCompression::Zstd { .. } => "zstd",
AccountsDataCompression::Gzip { .. } => "gzip",
}
}
}

Expand Down
51 changes: 27 additions & 24 deletions src/sqs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
super::{
aws::{AwsError, S3Client, SqsClient},
aws::{AwsError, S3Client, SqsClient, SqsMessageAttributes},
config::{AccountsDataCompression, Config},
filter::{AccountsFilter, TransactionsFilter},
prom::{UploadMessagesStatus, UPLOAD_MESSAGES_TOTAL, UPLOAD_QUEUE_SIZE},
Expand All @@ -11,7 +11,7 @@ use {
stream::{self, StreamExt},
},
log::*,
rusoto_sqs::{MessageAttributeValue, SendMessageBatchRequestEntry},
rusoto_sqs::SendMessageBatchRequestEntry,
serde::{Deserialize, Serialize},
serde_json::{json, Value},
solana_geyser_plugin_interface::geyser_plugin_interface::{
Expand Down Expand Up @@ -247,17 +247,27 @@ struct SendMessageWithPayload {
message: SendMessage,
s3: bool,
payload: String,
message_attributes: SqsMessageAttributes,
}

impl SendMessageWithPayload {
const S3_SIZE: usize = 250;

fn new(message: SendMessage, payload: String) -> Self {
Self {
message,
s3: payload.len() > SqsClient::REQUEST_LIMIT,
payload,
}
fn new(
message: SendMessage,
accounts_data_compression: &AccountsDataCompression,
) -> IoResult<Self> {
message
.payload(accounts_data_compression)
.map(|payload| Self {
message,
s3: payload.len() > SqsClient::REQUEST_LIMIT,
payload,
message_attributes: SqsMessageAttributes::new(
"compression",
accounts_data_compression.as_str(),
),
})
}

fn payload_size(&self) -> usize {
Expand Down Expand Up @@ -457,9 +467,9 @@ impl AwsSqsClient {
message: SendMessage,
accounts_data_compression: &AccountsDataCompression,
) {
match message.payload(accounts_data_compression) {
Ok(payload) => {
messages.push_back(SendMessageWithPayload::new(message, payload));
match SendMessageWithPayload::new(message, accounts_data_compression) {
Ok(message) => {
messages.push_back(message);
UPLOAD_QUEUE_SIZE.inc();
}
Err(error) => {
Expand Down Expand Up @@ -759,7 +769,8 @@ impl AwsSqsClient {
.filter_map(|(id, message)| {
let s3 = if message.s3 { Some(s3.clone()) } else { None };
async move {
let (message_body, message_attributes) = match s3 {
let mut message_attributes = message.message_attributes;
let message_body = match s3 {
Some(s3) => {
let key = message.message.s3_key();
if let Err(error) = s3
Expand All @@ -774,25 +785,17 @@ impl AwsSqsClient {
);
return None;
}
(
"s3".to_owned(),
Some(maplit::hashmap! {
"s3".to_owned() => MessageAttributeValue {
data_type: "String".to_owned(),
string_value: Some(key),
..Default::default()
}
}),
)
message_attributes.insert("s3", key);
"s3".to_owned()
}
None => (message.payload, None),
None => message.payload,
};
Some((
message.message,
SendMessageBatchRequestEntry {
id: id.to_string(),
message_body,
message_attributes,
message_attributes: Some(message_attributes.into_inner()),
..Default::default()
},
))
Expand Down

0 comments on commit 99b64c6

Please sign in to comment.