Skip to content

Commit

Permalink
Add prometheus, compression, s3 for large payloads (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored May 29, 2022
1 parent a3737b3 commit 2ffa458
Show file tree
Hide file tree
Showing 11 changed files with 1,189 additions and 438 deletions.
190 changes: 143 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@ arrayref = "0.3.6"
async-trait = "0.1"
base64 = "0.13.0"
bincode = "1.3"
clap = { version = "3.1.8", features = ["cargo"] }
clap = { version = "3.1.8", features = ["cargo", "derive"] }
derivative = "2"
flate2 = "1"
futures = "0.3"
humantime = "2.1.0"
hyper = { version = "0.14", features = ["server"] }
lazy_static = "1"
log = "0.4.14"
rusoto_core = "0.47"
rusoto_credential = "0.47"
rusoto_sqs = "0.47"
maplit = "1"
prometheus = "0.13"
rusoto_core = "0.48"
rusoto_credential = "0.48"
rusoto_s3 = "0.48"
rusoto_sqs = "0.48"
serde = { version = "1.0.132", features = ["derive"] }
serde_json = "1.0.73"
solana-geyser-plugin-interface = "=1.9.22"
Expand All @@ -29,7 +35,8 @@ solana-sdk = "=1.9.22"
solana-transaction-status = "=1.9.22"
spl-token = "3.2.0"
thiserror = "1.0.30"
tokio = { version = "1.15.0", features = ["rt-multi-thread", "time", "macros"] }
tokio = { version = "1.15.0", features = ["rt-multi-thread", "time", "macros", "io-util"] }
zstd = "0.9"

[profile.release]
codegen-units = 1
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ Now you can run validator with plugin:
$ solana-validator --geyser-plugin-config ./config.json
```

### Accounts data compression

Since SQS payload can be only 256KB long and Solana accounts can be up to 10MB not all accounts will fit to SQS message. Currently messages with these accounts are dropped. But it's possible to decrease number of dropped messages if we will compress data. Right now `zstd` and `gzip` are supported.

```json
"messages": {
"commitment_level": "confirmed",
"accounts_data_compression": {"algo": "none"}
}
"messages": {
"commitment_level": "confirmed",
"accounts_data_compression": {"algo": "zstd", "level": 3}
}
"messages": {
"commitment_level": "confirmed",
"accounts_data_compression": {"algo": "gzip", "level": 6}
}
```

`level` field is optional for both compression algorithms.

### AWS Credentials

**Required permissions:**
Expand Down Expand Up @@ -71,7 +92,6 @@ All fields in filter are optional but at least 1 is required. Fields works as lo

##### Examples


Filter accounts:

```json
Expand Down
25 changes: 20 additions & 5 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,30 @@
"level": "info",
"filters": true
},
"prometheus": {
"address": "0.0.0.0:8999"
},
"sqs": {
"url": "https://sqs.us-east-2.amazonaws.com/<account_id>/<accounts_queue>",
"region": "us-east-2",
"commitment_level": "finalized",
"max_requests": 10000,
"auth": {
"access_key_id": "access_key_id",
"secret_access_key": "secret_access_key"
}
},
"region": "us-east-2",
"url": "https://sqs.us-east-2.amazonaws.com/<account_id>/<accounts_queue>",
"max_requests": "10_000"
},
"s3": {
"auth": {
"access_key_id": "access_key_id",
"secret_access_key": "secret_access_key"
},
"region": "us-east-2",
"bucket": "solana-geyser",
"max_requests": "10_000"
},
"messages": {
"commitment_level": "confirmed",
"accounts_data_compression": {"algo": "none"}
},
"slots": {
"enabled": false
Expand Down
217 changes: 217 additions & 0 deletions src/aws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
use {
crate::{
config::{ConfigAwsAuth, ConfigAwsS3, ConfigAwsSqs},
prom::{
UploadAwsStatus, UPLOAD_S3_REQUESTS, UPLOAD_S3_TOTAL, UPLOAD_SQS_REQUESTS,
UPLOAD_SQS_TOTAL,
},
},
rusoto_core::{request::TlsError, ByteStream, Client as RusotoClient, HttpClient, RusotoError},
rusoto_credential::{
AutoRefreshingProvider, AwsCredentials, ChainProvider, CredentialsError, ProfileProvider,
ProvideAwsCredentials, StaticProvider,
},
rusoto_s3::{PutObjectError, PutObjectRequest, S3Client as RusotoS3Client, S3},
rusoto_sqs::{
BatchResultErrorEntry, GetQueueAttributesError, GetQueueAttributesRequest,
SendMessageBatchError, SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs,
SqsClient as RusotoSqsClient,
},
std::sync::Arc,
thiserror::Error,
tokio::sync::Semaphore,
};

#[derive(Debug, Error)]
pub enum AwsError {
#[error("credential error: {0}")]
Credentials(#[from] CredentialsError),
#[error("http client error: {0}")]
HttpClientTls(#[from] TlsError),
#[error("failed to get sqs queue attributes: {0}")]
SqsGetAttributes(#[from] RusotoError<GetQueueAttributesError>),
#[error("failed to send messages to sqs: {0}")]
SqsSendMessageBatch(#[from] RusotoError<SendMessageBatchError>),
#[error("failed to upload payload to s3: {0}")]
S3PutObject(#[from] RusotoError<PutObjectError>),
}

pub type AwsResult<T = ()> = Result<T, AwsError>;

#[derive(derivative::Derivative)]
#[derivative(Debug, Clone)]
pub struct SqsClient {
#[derivative(Debug = "ignore")]
pub client: RusotoSqsClient,
pub queue_url: String,
}

impl SqsClient {
// The maximum allowed individual message size and the maximum total payload size (the sum of the
// individual lengths of all of the batched messages) are both 256 KB (262,144 bytes).
pub const REQUEST_LIMIT: usize = 250_000;

pub fn new(config: ConfigAwsSqs) -> AwsResult<Self> {
let client = aws_create_client(config.auth)?;
Ok(Self {
client: RusotoSqsClient::new_with_client(client, config.region),
queue_url: config.url,
})
}

pub async fn check(self) -> AwsResult {
UPLOAD_SQS_REQUESTS.inc();
let result = self
.client
.get_queue_attributes(GetQueueAttributesRequest {
attribute_names: None,
queue_url: self.queue_url,
})
.await;
UPLOAD_SQS_REQUESTS.dec();

result.map_err(Into::into).map(|_| ())
}

pub async fn send_batch(
self,
entries: Vec<SendMessageBatchRequestEntry>,
) -> Vec<BatchResultErrorEntry> {
let entries_count = entries.len();

UPLOAD_SQS_REQUESTS.inc();
let result = self
.client
.send_message_batch(SendMessageBatchRequest {
entries,
queue_url: self.queue_url,
})
.await;
UPLOAD_SQS_REQUESTS.dec();

let failed = match result {
Ok(rusoto_sqs::SendMessageBatchResult { successful, failed }) => {
if !successful.is_empty() {
UPLOAD_SQS_TOTAL
.with_label_values(&[UploadAwsStatus::Success.as_str()])
.inc_by(successful.len() as u64);
}
failed
}
Err(_error) => (0..entries_count)
.map(|id| BatchResultErrorEntry {
id: id.to_string(),
..Default::default()
})
.collect(),
};
if !failed.is_empty() {
UPLOAD_SQS_TOTAL
.with_label_values(&[UploadAwsStatus::Failed.as_str()])
.inc_by(failed.len() as u64);
}
failed
}
}

#[derive(derivative::Derivative)]
#[derivative(Debug, Clone)]
pub struct S3Client {
#[derivative(Debug = "ignore")]
pub client: RusotoS3Client,
pub bucket: String,
pub permits: Arc<Semaphore>,
}

impl S3Client {
pub fn new(config: ConfigAwsS3) -> AwsResult<Self> {
let client = aws_create_client(config.auth)?;
Ok(Self {
client: RusotoS3Client::new_with_client(client, config.region),
bucket: config.bucket,
permits: Arc::new(Semaphore::new(config.max_requests)),
})
}

pub async fn put_object<B: Into<ByteStream>>(self, key: String, body: B) -> AwsResult {
let permit = self.permits.acquire().await.expect("alive");
UPLOAD_S3_REQUESTS.inc();
let result = self
.client
.put_object(PutObjectRequest {
body: Some(body.into()),
bucket: self.bucket,
key,
..Default::default()
})
.await;
UPLOAD_S3_REQUESTS.dec();
drop(permit);

let status = match result {
Ok(_) => UploadAwsStatus::Success,
Err(_) => UploadAwsStatus::Failed,
};
UPLOAD_S3_TOTAL.with_label_values(&[status.as_str()]).inc();

result.map_err(Into::into).map(|_| ())
}
}

fn aws_create_client(config: ConfigAwsAuth) -> AwsResult<RusotoClient> {
let request_dispatcher = HttpClient::new()?;
let credentials_provider = AwsCredentialsProvider::new(config)?;
Ok(RusotoClient::new_with(
credentials_provider,
request_dispatcher,
))
}

#[allow(clippy::large_enum_variant)]
enum AwsCredentialsProvider {
Static(StaticProvider),
Chain(AutoRefreshingProvider<ChainProvider>),
}

impl AwsCredentialsProvider {
pub fn new(config: ConfigAwsAuth) -> AwsResult<Self> {
match config {
ConfigAwsAuth::Static {
access_key_id,
secret_access_key,
} => Ok(Self::Static(StaticProvider::new_minimal(
access_key_id,
secret_access_key,
))),
ConfigAwsAuth::Chain {
credentials_file,
profile,
} => {
let profile_provider = match (credentials_file, profile) {
(Some(file_path), Some(profile)) => {
ProfileProvider::with_configuration(file_path, profile)
}
(Some(file_path), None) => {
ProfileProvider::with_default_configuration(file_path)
}
(None, Some(profile)) => ProfileProvider::with_default_credentials(profile)?,
(None, None) => ProfileProvider::new()?,
};
Ok(Self::Chain(AutoRefreshingProvider::new(
ChainProvider::with_profile_provider(profile_provider),
)?))
}
}
}
}

#[async_trait::async_trait]
impl ProvideAwsCredentials for AwsCredentialsProvider {
async fn credentials(&self) -> Result<AwsCredentials, CredentialsError> {
match self {
Self::Static(p) => p.credentials(),
Self::Chain(p) => p.credentials(),
}
.await
}
}
Loading

0 comments on commit 2ffa458

Please sign in to comment.