Skip to content

Commit

Permalink
solana: bump to v2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Oct 14, 2024
1 parent 3a85f57 commit 0dfa88a
Show file tree
Hide file tree
Showing 12 changed files with 902 additions and 606 deletions.
1,349 changes: 805 additions & 544 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false
crate-type = ["cdylib", "rlib"]

[dependencies]
agave-geyser-plugin-interface = "~2.0.13"
anyhow = "1.0.44"
arrayref = "0.3.6"
async-trait = "0.1.73"
Expand All @@ -20,8 +21,12 @@ enumflags2 = "0.6.4"
flate2 = "1.0.27"
futures = "0.3.28"
hex = "0.4.3"
http = "1.1.0"
http-body-util = "0.1.2"
humantime = "2.1.0"
hyper = { version = "0.14.27", features = ["server"] }
hyper0 = { package = "hyper", version = "0.14.27" }
hyper = "1.4.1"
hyper-util = { version = "0.1.7", features = ["server-auto", "tokio"] }
hyper-tls = "0.5.0"
lazy_static = "1.4.0"
log = "0.4.14"
Expand All @@ -38,11 +43,10 @@ safe-transmute = "0.11.2"
serde = { version = "1.0.132", features = ["derive"] }
serde_json = "1.0.73"
serum_dex = "0.5.4"
solana-geyser-plugin-interface = "~1.18.23"
solana-logger = "~1.18.23"
solana-sdk = "~1.18.23"
solana-transaction-status = "~1.18.23"
spl-token = { version = "4.0.0", features = ["no-entrypoint"] }
solana-logger = "~2.0.13"
solana-sdk = "~2.0.13"
solana-transaction-status = "~2.0.13"
spl-token = { version = "6.0.0", features = ["no-entrypoint"] }
thiserror = "1.0.30"
tokio = { version = "1.32.0", features = ["rt-multi-thread", "time", "macros", "io-util"] }
zstd = "0.12.4"
Expand All @@ -51,7 +55,7 @@ zstd = "0.12.4"
anyhow = "1.0.62"
cargo-lock = "9.0.0"
git-version = "0.3.5"
vergen = { version = "8.2.1", features = ["build", "rustc"] }
vergen = { version = "9.0.1", features = ["build", "rustc"] }

[lints.clippy]
clone_on_ref_ptr = "deny"
Expand Down
7 changes: 4 additions & 3 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use {cargo_lock::Lockfile, std::collections::HashSet};

fn main() -> anyhow::Result<()> {
let mut envs = vergen::EmitBuilder::builder();
envs.all_build().all_rustc();
envs.emit()?;
vergen::Emitter::default()
.add_instructions(&vergen::BuildBuilder::all_build()?)?
.add_instructions(&vergen::RustcBuilder::all_rustc()?)?
.emit()?;

// vergen git version does not looks cool
println!(
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "1.75.0"
channel = "1.78.0"
components = ["clippy", "rustfmt"]
targets = []
profile = "minimal"
2 changes: 1 addition & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
config::{ConfigFilters, ConfigRedis, PubkeyWithSource, PubkeyWithSourceError},
prom::health::{set_health, HealthInfoType},
metrics::health::{set_health, HealthInfoType},
},
futures::stream::{Stream, StreamExt},
log::*,
Expand Down
5 changes: 2 additions & 3 deletions src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use {
crate::{
config::{ConfigAwsAuth, ConfigAwsS3, ConfigAwsSqs},
prom::{
metrics::{
UploadAwsStatus, UPLOAD_S3_REQUESTS, UPLOAD_S3_TOTAL, UPLOAD_SQS_REQUESTS,
UPLOAD_SQS_TOTAL,
},
sqs::SendMessageType,
},
futures::future::{try_join_all, BoxFuture},
hyper::Client,
hyper_tls::HttpsConnector,
log::*,
rusoto_core::{request::TlsError, ByteStream, Client as RusotoClient, HttpClient, RusotoError},
Expand Down Expand Up @@ -417,7 +416,7 @@ where
}

fn aws_create_client(config: ConfigAwsAuth) -> AwsResult<RusotoClient> {
let mut builder = Client::builder();
let mut builder = hyper0::Client::builder();
builder.pool_idle_timeout(Duration::from_secs(10));
builder.pool_max_idle_per_host(10);
// Fix: `connection closed before message completed` but introduce `dns error: failed to lookup address information`
Expand Down
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use {
serum::EventFlag,
sqs::SlotStatus,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPluginError, Result as PluginResult,
},
enumflags2::BitFlags,
flate2::{write::GzEncoder, Compression as GzCompression},
redis::{
aio::Connection as RedisConnection, AsyncCommands, Pipeline as RedisPipeline, RedisError,
},
rusoto_core::Region,
serde::{de, ser::SerializeSeq, Deserialize, Deserializer, Serialize, Serializer},
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPluginError, Result as PluginResult,
},
solana_sdk::pubkey::Pubkey,
std::{
borrow::Cow,
Expand Down
6 changes: 3 additions & 3 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
ConfigSlotsFilter, ConfigTransactionsAccountsFilter, ConfigTransactionsFilter,
PubkeyWithSource,
},
prom::health::{set_health, HealthInfoType},
metrics::health::{set_health, HealthInfoType},
serum::{self, EventFlag},
sqs::{ReplicaAccountInfo, ReplicaTransactionInfo},
version::VERSION,
Expand Down Expand Up @@ -678,11 +678,11 @@ impl<'a> AccountsFilterMatch<'a> {
}

pub fn contains_tokenkeg_owner(&self, owner: &Pubkey) -> bool {
self.accounts_filter.tokenkeg_owner.get(owner).is_some()
self.accounts_filter.tokenkeg_owner.contains_key(owner)
}

pub fn contains_tokenkeg_delegate(&self, owner: &Pubkey) -> bool {
self.accounts_filter.tokenkeg_delegate.get(owner).is_some()
self.accounts_filter.tokenkeg_delegate.contains_key(owner)
}

fn extend<Q: Hash + Eq>(
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ pub mod admin;
pub mod aws;
pub mod config;
pub mod filters;
pub mod metrics;
pub mod plugin;
pub mod prom;
pub mod serum;
pub mod sqs;
pub mod version;
86 changes: 55 additions & 31 deletions src/prom.rs → src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
use {
crate::{config::ConfigPrometheus, version::VERSION as VERSION_INFO},
futures::FutureExt,
http_body_util::{combinators::BoxBody, BodyExt, Empty as BodyEmpty, Full as BodyFull},
hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
body::{Bytes, Incoming as BodyIncoming},
service::service_fn,
Request, Response, StatusCode,
},
hyper_util::{
rt::tokio::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ServerBuilder,
},
log::*,
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
std::sync::Once,
tokio::{runtime::Runtime, sync::oneshot},
std::{
convert::Infallible,
sync::{Arc, Once},
},
tokio::{net::TcpListener, sync::Notify},
};

lazy_static::lazy_static! {
Expand Down Expand Up @@ -126,11 +133,11 @@ impl UploadAwsStatus {

#[derive(Debug)]
pub struct PrometheusService {
shutdown_signal: oneshot::Sender<()>,
shutdown: Arc<Notify>,
}

impl PrometheusService {
pub fn new(runtime: &Runtime, config: Option<ConfigPrometheus>) -> Self {
pub async fn new(config: Option<ConfigPrometheus>) -> std::io::Result<Self> {
static REGISTER: Once = Once::new();
REGISTER.call_once(|| {
macro_rules! register {
Expand Down Expand Up @@ -161,48 +168,65 @@ impl PrometheusService {
.inc()
});

let (tx, rx) = oneshot::channel();
let shutdown = Arc::new(Notify::new());
if let Some(ConfigPrometheus { address }) = config {
runtime.spawn(async move {
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::bind(&address).serve(make_service);
if let Err(error) = tokio::try_join!(server, rx.map(|_| Ok(()))) {
error!("prometheus service failed: {}", error);
let shutdown = Arc::clone(&shutdown);
let listener = TcpListener::bind(&address).await?;
tokio::spawn(async move {
loop {
let stream = tokio::select! {
() = shutdown.notified() => break,
maybe_conn = listener.accept() => match maybe_conn {
Ok((stream, _addr)) => stream,
Err(error) => {
error!("failed to accept new connection: {error}");
break;
}
}
};

tokio::spawn(async move {
if let Err(error) = ServerBuilder::new(TokioExecutor::new())
.serve_connection(
TokioIo::new(stream),
service_fn(move |req: Request<BodyIncoming>| async move {
match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
}
}),
)
.await
{
error!("failed to handle metrics request: {error}");
}
});
}
});
}

PrometheusService {
shutdown_signal: tx,
}
Ok(PrometheusService { shutdown })
}

pub fn shutdown(self) {
let _ = self.shutdown_signal.send(());
self.shutdown.notify_one();
}
}

fn metrics_handler() -> Response<Body> {
fn metrics_handler() -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
let metrics = TextEncoder::new()
.encode_to_string(&REGISTRY.gather())
.unwrap_or_else(|error| {
error!("could not encode custom metrics: {}", error);
String::new()
});
Response::builder().body(Body::from(metrics)).unwrap()
Response::builder()
.status(StatusCode::OK)
.body(BodyFull::new(Bytes::from(metrics)).boxed())
}

fn not_found_handler() -> Response<Body> {
fn not_found_handler() -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
.body(BodyEmpty::new().boxed())
}
10 changes: 7 additions & 3 deletions src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use {
crate::{
config::Config,
prom::PrometheusService,
metrics::PrometheusService,
sqs::{AwsSqsClient, SqsClientResult},
},
solana_geyser_plugin_interface::geyser_plugin_interface::{
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
},
Expand Down Expand Up @@ -62,7 +62,11 @@ impl GeyserPlugin for Plugin {
.build()
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;

let prometheus = PrometheusService::new(&runtime, config.prometheus);
let prometheus = runtime.block_on(async move {
PrometheusService::new(config.prometheus)
.await
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))
})?;
let client = runtime
.block_on(AwsSqsClient::new(config))
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Expand Down
15 changes: 9 additions & 6 deletions src/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ use {
aws::{AwsError, S3Client, SqsClient},
config::{AccountsDataCompression, Config},
filters::{Filters, FiltersError},
prom::{
metrics::{
health::{set_health, HealthInfoType},
UploadMessagesStatus, SLOTS_LAST_PROCESSED, UPLOAD_MESSAGES_TOTAL, UPLOAD_MISSIED_INFO,
UPLOAD_QUEUE_SIZE,
},
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions,
SlotStatus as GeyserSlotStatus,
},
arrayref::array_ref,
base64::{engine::general_purpose, Engine as _},
futures::{
Expand All @@ -20,10 +24,6 @@ use {
rusoto_sqs::SendMessageBatchRequestEntry,
serde::{Deserialize, Serialize},
serde_json::{json, Value},
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions,
SlotStatus as GeyserSlotStatus,
},
solana_sdk::{
clock::UnixTimestamp,
program_pack::Pack,
Expand Down Expand Up @@ -204,7 +204,10 @@ impl From<ReplicaBlockInfoVersions<'_>> for ReplicaBlockMetadata {
ReplicaBlockInfoVersions::V0_0_2(_info) => {
unreachable!("ReplicaBlockInfoVersions::V0_0_2 is not supported")
}
ReplicaBlockInfoVersions::V0_0_3(info) => Self {
ReplicaBlockInfoVersions::V0_0_3(_info) => {
unreachable!("ReplicaBlockInfoVersions::V0_0_3 is not supported")
}
ReplicaBlockInfoVersions::V0_0_4(info) => Self {
slot: info.slot,
block_time: info.block_time,
},
Expand Down

0 comments on commit 0dfa88a

Please sign in to comment.