Skip to content

Commit

Permalink
Merge branch 'helius-labs:main' into triton
Browse files Browse the repository at this point in the history
  • Loading branch information
golebiewsky authored Nov 5, 2024
2 parents 801beca + 15495a9 commit 4f88e39
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 123 deletions.
16 changes: 6 additions & 10 deletions src/api/api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;

use sea_orm::{ConnectionTrait, DatabaseConnection, Statement};
use solana_client::nonblocking::rpc_client::RpcClient;
use utoipa::openapi::{ObjectBuilder, RefOr, Schema, SchemaType};
use utoipa::ToSchema;

use crate::api::method::utils::GetNonPaginatedSignaturesResponse;
use crate::common::typedefs::rpc_client_with_uri::RpcClientWithUri;
use crate::common::typedefs::unsigned_integer::UnsignedInteger;

use super::method::get_compressed_account::AccountResponse;
Expand Down Expand Up @@ -78,14 +78,14 @@ use super::{

pub struct PhotonApi {
db_conn: Arc<DatabaseConnection>,
rpc_client: Arc<RpcClientWithUri>,
rpc_client: Arc<RpcClient>,
prover_url: String,
}

impl PhotonApi {
pub fn new(
db_conn: Arc<DatabaseConnection>,
rpc_client: Arc<RpcClientWithUri>,
rpc_client: Arc<RpcClient>,
prover_url: String,
) -> Self {
Self {
Expand Down Expand Up @@ -203,7 +203,7 @@ impl PhotonApi {
}

pub async fn get_indexer_health(&self) -> Result<String, PhotonApiError> {
get_indexer_health(self.db_conn.as_ref(), &self.rpc_client.client).await
get_indexer_health(self.db_conn.as_ref(), &self.rpc_client).await
}

pub async fn get_indexer_slot(&self) -> Result<UnsignedInteger, PhotonApiError> {
Expand Down Expand Up @@ -256,12 +256,8 @@ impl PhotonApi {
&self,
request: GetTransactionRequest,
) -> Result<GetTransactionResponse, PhotonApiError> {
get_transaction_with_compression_info(
self.db_conn.as_ref(),
&self.rpc_client.client,
request,
)
.await
get_transaction_with_compression_info(self.db_conn.as_ref(), &self.rpc_client, request)
.await
}

pub async fn get_validity_proof(
Expand Down
10 changes: 9 additions & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::fmt;
use std::{env, net::UdpSocket, path::PathBuf, thread::sleep, time::Duration};
use std::{env, net::UdpSocket, path::PathBuf, sync::Arc, thread::sleep, time::Duration};

use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient};
use cadence_macros::set_global_default;
Expand Down Expand Up @@ -144,3 +144,11 @@ pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 {
}
}
}

pub fn get_rpc_client(rpc_url: &str) -> Arc<RpcClient> {
Arc::new(RpcClient::new_with_timeout_and_commitment(
rpc_url.to_string(),
Duration::from_secs(90),
CommitmentConfig::confirmed(),
))
}
1 change: 0 additions & 1 deletion src/common/typedefs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod account;
pub mod bs58_string;
pub mod bs64_string;
pub mod hash;
pub mod rpc_client_with_uri;
pub mod serializable_pubkey;
pub mod serializable_signature;
pub mod token_data;
Expand Down
21 changes: 0 additions & 21 deletions src/common/typedefs/rpc_client_with_uri.rs

This file was deleted.

11 changes: 9 additions & 2 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::{pin_mut, Stream, StreamExt};
use log::info;
use rand::distributions::Alphanumeric;
use rand::Rng;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use tokio::time::sleep;
Expand All @@ -27,7 +28,6 @@ use yellowstone_grpc_proto::solana::storage::confirmed_block::InnerInstructions;

use crate::api::method::get_indexer_health::HEALTH_CHECK_SLOT_DISTANCE;
use crate::common::typedefs::hash::Hash;
use crate::common::typedefs::rpc_client_with_uri::RpcClientWithUri;
use crate::ingester::fetchers::poller::get_block_poller_stream;
use crate::ingester::typedefs::block_info::{
BlockInfo, BlockMetadata, Instruction, InstructionGroup, TransactionInfo,
Expand All @@ -39,7 +39,7 @@ use crate::monitor::{start_latest_slot_updater, LATEST_SLOT};
pub fn get_grpc_stream_with_rpc_fallback(
endpoint: String,
auth_header: String,
rpc_client: Arc<RpcClientWithUri>,
rpc_client: Arc<RpcClient>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
) -> impl Stream<Item = Vec<BlockInfo>> {
Expand Down Expand Up @@ -78,6 +78,13 @@ pub fn get_grpc_stream_with_rpc_fallback(
panic!("gRPC stream ended unexpectedly");
}
Either::Right((Some(rpc_blocks), _)) => {
let rpc_blocks: Vec<BlockInfo> = rpc_blocks
.into_iter()
.filter(|b| b.metadata.slot > last_indexed_slot)
.collect();
if rpc_blocks.is_empty() {
continue;
}
let blocks_len = rpc_blocks.len();
let parent_slot = rpc_blocks.first().unwrap().metadata.parent_slot;
let last_slot = rpc_blocks.last().unwrap().metadata.slot;
Expand Down
4 changes: 2 additions & 2 deletions src/ingester/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::sync::Arc;

use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use solana_client::nonblocking::rpc_client::RpcClient;

use crate::common::typedefs::rpc_client_with_uri::RpcClientWithUri;

use super::typedefs::block_info::BlockInfo;

Expand All @@ -14,7 +14,7 @@ use grpc::get_grpc_stream_with_rpc_fallback;
use poller::get_block_poller_stream;

pub struct BlockStreamConfig {
pub rpc_client: Arc<RpcClientWithUri>,
pub rpc_client: Arc<RpcClient>,
pub geyser_url: Option<String>,
pub max_concurrent_block_fetches: usize,
pub last_indexed_slot: u64,
Expand Down
25 changes: 9 additions & 16 deletions src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::BTreeMap,
sync::{atomic::Ordering, Arc},
time::Duration,
};

use async_stream::stream;
Expand All @@ -15,15 +14,14 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};

use crate::{
common::typedefs::rpc_client_with_uri::RpcClientWithUri,
ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
metric,
monitor::{start_latest_slot_updater, LATEST_SLOT},
};

const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];

fn get_slot_stream(rpc_client: Arc<RpcClientWithUri>, start_slot: u64) -> impl Stream<Item = u64> {
fn get_slot_stream(rpc_client: Arc<RpcClient>, start_slot: u64) -> impl Stream<Item = u64> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
let mut next_slot_to_fetch = start_slot;
Expand All @@ -39,7 +37,7 @@ fn get_slot_stream(rpc_client: Arc<RpcClientWithUri>, start_slot: u64) -> impl S
}

pub fn get_block_poller_stream(
rpc_client: Arc<RpcClientWithUri>,
rpc_client: Arc<RpcClient>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
) -> impl Stream<Item = Vec<BlockInfo>> {
Expand All @@ -53,7 +51,7 @@ pub fn get_block_poller_stream(
let block_stream = slot_stream
.map(|slot| {
let rpc_client = rpc_client.clone();
async move { fetch_block_with_infinite_retries(rpc_client.uri.clone(), slot).await }
async move { fetch_block_with_infinite_retries(rpc_client.clone(), slot).await }
})
.buffer_unordered(max_concurrent_block_fetches);
pin_mut!(block_stream);
Expand Down Expand Up @@ -102,17 +100,12 @@ fn pop_cached_blocks_to_index(
(blocks, last_indexed_slot)
}

pub async fn fetch_block_with_infinite_retries(rpc_uri: String, slot: u64) -> Option<BlockInfo> {
let mut attempt_counter = 0;
pub async fn fetch_block_with_infinite_retries(
rpc_client: Arc<RpcClient>,
slot: u64,
) -> Option<BlockInfo> {
loop {
let timeout_sec = if attempt_counter <= 1 { 5 } else { 30 };
attempt_counter += 1;
let client = RpcClient::new_with_timeout_and_commitment(
rpc_uri.clone(),
Duration::from_secs(timeout_sec),
CommitmentConfig::confirmed(),
);
match client
match rpc_client
.get_block_with_config(
slot,
RpcBlockConfig {
Expand Down Expand Up @@ -144,7 +137,7 @@ pub async fn fetch_block_with_infinite_retries(rpc_uri: String, slot: u64) -> Op
return None;
}
}
log::debug!("Failed to fetch block: {}. {}", slot, e.to_string());
log::info!("Failed to fetch block: {}. {}", slot, e.to_string());
metric! {
statsd_count!("rpc_block_fetch_failed", 1);
}
Expand Down
5 changes: 3 additions & 2 deletions src/ingester/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ pub async fn fetch_last_indexed_slot_with_infinite_retry(
pub async fn index_block_stream(
block_stream: impl Stream<Item = Vec<BlockInfo>>,
db: Arc<DatabaseConnection>,
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
last_indexed_slot_at_start: u64,
end_slot: Option<u64>,
) {
pin_mut!(block_stream);
let current_slot = end_slot.unwrap_or(fetch_current_slot_with_infinite_retry(rpc_client).await);
let current_slot =
end_slot.unwrap_or(fetch_current_slot_with_infinite_retry(&rpc_client).await);
let number_of_blocks_to_backfill = if current_slot > last_indexed_slot_at_start {
current_slot - last_indexed_slot_at_start
} else {
Expand Down
28 changes: 12 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use jsonrpsee::server::ServerHandle;
use log::{error, info};
use photon_indexer::api::{self, api::PhotonApi};

use photon_indexer::common::typedefs::rpc_client_with_uri::RpcClientWithUri;
use photon_indexer::common::{
fetch_block_parent_slot, fetch_current_slot_with_infinite_retry, get_network_start_slot,
setup_logging, setup_metrics, setup_pg_pool, LoggingFormat,
get_rpc_client, setup_logging, setup_metrics, setup_pg_pool, LoggingFormat,
};

use photon_indexer::ingester::fetchers::BlockStreamConfig;
Expand All @@ -27,6 +26,7 @@ use photon_indexer::monitor::continously_monitor_photon;
use photon_indexer::snapshot::{
get_snapshot_files_with_metadata, load_block_stream_from_directory_adapter, DirectoryAdapter,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
SqlitePool,
Expand Down Expand Up @@ -98,7 +98,7 @@ struct Args {

async fn start_api_server(
db: Arc<DatabaseConnection>,
rpc_client: Arc<RpcClientWithUri>,
rpc_client: Arc<RpcClient>,
prover_url: String,
api_port: u16,
) -> ServerHandle {
Expand Down Expand Up @@ -168,15 +168,15 @@ async fn setup_database_connection(
fn continously_index_new_blocks(
block_stream_config: BlockStreamConfig,
db: Arc<DatabaseConnection>,
rpc_client: Arc<RpcClientWithUri>,
rpc_client: Arc<RpcClient>,
last_indexed_slot: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let block_stream = block_stream_config.load_block_stream();
index_block_stream(
block_stream,
db,
&rpc_client.client,
rpc_client.clone(),
last_indexed_slot,
None,
)
Expand All @@ -195,7 +195,8 @@ async fn main() {
info!("Running migrations...");
Migrator::up(db_conn.as_ref(), None).await.unwrap();
}
let rpc_client = Arc::new(RpcClientWithUri::new(args.rpc_url));
let is_rpc_node_local = args.rpc_url.contains("127.0.0.1");
let rpc_client = get_rpc_client(&args.rpc_url);

if let Some(snapshot_dir) = args.snapshot_dir {
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir));
Expand All @@ -219,16 +220,14 @@ async fn main() {
index_block_stream(
block_stream,
db_conn.clone(),
&rpc_client.client,
rpc_client.clone(),
last_indexed_slot,
Some(last_slot),
)
.await;
}
}

let is_rpc_node_local = rpc_client.uri.contains("127.0.0.1");

let (indexer_handle, monitor_handle) = match args.disable_indexing {
true => {
info!("Indexing is disabled");
Expand All @@ -249,19 +248,16 @@ async fn main() {
};
let last_indexed_slot = match args.start_slot {
Some(start_slot) => match start_slot.as_str() {
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client.client).await,
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
_ => {
fetch_block_parent_slot(
&rpc_client.client,
start_slot.parse::<u64>().unwrap(),
)
.await
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
.await
}
},
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
.await
.unwrap_or(
get_network_start_slot(&rpc_client.client)
get_network_start_slot(&rpc_client)
.await
.try_into()
.unwrap(),
Expand Down
Loading

0 comments on commit 4f88e39

Please sign in to comment.