Skip to content

Commit

Permalink
Merge branch 'helius-labs:main' into triton
Browse files Browse the repository at this point in the history
  • Loading branch information
bruswejn authored Nov 14, 2024
2 parents 4f88e39 + 380ea2a commit 50ea928
Show file tree
Hide file tree
Showing 40 changed files with 653 additions and 174 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "photon-indexer"
publish = true
readme = "README.md"
repository = "https://github.com/helius-labs/photon"
version = "0.45.0"
version = "0.48.0"

[[bin]]
name = "photon"
Expand Down
15 changes: 15 additions & 0 deletions src/api/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use super::method::get_compressed_account::AccountResponse;
use super::method::get_compressed_balance_by_owner::{
get_compressed_balance_by_owner, GetCompressedBalanceByOwnerRequest,
};
use super::method::get_compressed_mint_token_holders::{
get_compressed_mint_token_holders, GetCompressedMintTokenHoldersRequest, OwnerBalancesResponse,
};
use super::method::get_compressed_token_balances_by_owner::{
get_compressed_token_balances_by_owner, GetCompressedTokenBalancesByOwnerRequest,
TokenBalancesResponse,
Expand Down Expand Up @@ -217,6 +220,13 @@ impl PhotonApi {
get_compressed_accounts_by_owner(self.db_conn.as_ref(), request).await
}

pub async fn get_compressed_mint_token_holders(
&self,
request: GetCompressedMintTokenHoldersRequest,
) -> Result<OwnerBalancesResponse, PhotonApiError> {
get_compressed_mint_token_holders(self.db_conn.as_ref(), request).await
}

pub async fn get_multiple_compressed_accounts(
&self,
request: GetMultipleCompressedAccountsRequest,
Expand Down Expand Up @@ -313,6 +323,11 @@ impl PhotonApi {
request: Some(GetCompressedAccountsByOwnerRequest::schema().1),
response: GetCompressedAccountsByOwnerResponse::schema().1,
},
OpenApiSpec {
name: "getCompressedMintTokenHolders".to_string(),
request: Some(GetCompressedMintTokenHoldersRequest::schema().1),
response: OwnerBalancesResponse::schema().1,
},
OpenApiSpec {
name: "getMultipleCompressedAccounts".to_string(),
request: Some(GetMultipleCompressedAccountsRequest::adjusted_schema()),
Expand Down
112 changes: 112 additions & 0 deletions src/api/method/get_compressed_mint_token_holders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use byteorder::{ByteOrder, LittleEndian};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::common::typedefs::bs58_string::Base58String;
use crate::common::typedefs::serializable_pubkey::SerializablePubkey;
use crate::common::typedefs::unsigned_integer::UnsignedInteger;
use crate::dao::generated::token_owner_balances;

use super::super::error::PhotonApiError;
use super::utils::{parse_decimal, Context, Limit, PAGE_LIMIT};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct OwnerBalance {
pub owner: SerializablePubkey,
pub balance: UnsignedInteger,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct OwnerBalanceList {
pub items: Vec<OwnerBalance>,
pub cursor: Option<Base58String>,
}

// We do not use generics to simplify documentation generation.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct OwnerBalancesResponse {
pub context: Context,
pub value: OwnerBalanceList,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, Default)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct GetCompressedMintTokenHoldersRequest {
pub mint: SerializablePubkey,
pub cursor: Option<Base58String>,
pub limit: Option<Limit>,
}

pub async fn get_compressed_mint_token_holders(
conn: &DatabaseConnection,
request: GetCompressedMintTokenHoldersRequest,
) -> Result<OwnerBalancesResponse, PhotonApiError> {
let context = Context::extract(conn).await?;
let GetCompressedMintTokenHoldersRequest {
mint,
cursor,
limit,
} = request;
let mut filter = token_owner_balances::Column::Mint.eq::<Vec<u8>>(mint.into());

if let Some(cursor) = cursor {
let bytes = cursor.0;
let expected_cursor_length = 40;
let (balance, owner) = if bytes.len() == expected_cursor_length {
let (balance, owner) = bytes.split_at(8);
(balance, owner)
} else {
return Err(PhotonApiError::ValidationError(format!(
"Invalid cursor length. Expected {}. Received {}.",
expected_cursor_length,
bytes.len()
)));
};
let balance = LittleEndian::read_u64(&balance);

filter = filter.and(
token_owner_balances::Column::Amount.lt(balance).or(
token_owner_balances::Column::Amount
.eq(balance)
.and(token_owner_balances::Column::Owner.lt::<Vec<u8>>(owner.into())),
),
);
}
let limit = limit.map(|l| l.value()).unwrap_or(PAGE_LIMIT);

let items = token_owner_balances::Entity::find()
.filter(filter)
.order_by_desc(token_owner_balances::Column::Amount)
.order_by_desc(token_owner_balances::Column::Owner)
.limit(limit)
.all(conn)
.await?
.drain(..)
.map(|token_owner_balance| {
Ok(OwnerBalance {
owner: token_owner_balance.owner.try_into()?,
balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?),
})
})
.collect::<Result<Vec<OwnerBalance>, PhotonApiError>>()?;

let mut cursor = items.last().map(|item| {
Base58String({
let item = item.clone();
let mut bytes: Vec<u8> = Vec::new();
bytes.extend_from_slice(&item.balance.0.to_le_bytes());
bytes.extend_from_slice(&item.owner.0.to_bytes());
bytes
})
});
if items.len() < limit as usize {
cursor = None;
}

Ok(OwnerBalancesResponse {
value: OwnerBalanceList { items, cursor },
context,
})
}
81 changes: 54 additions & 27 deletions src/api/method/get_compressed_token_balances_by_owner.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::collections::HashMap;

use sea_orm::DatabaseConnection;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::common::typedefs::bs58_string::Base58String;
use crate::common::typedefs::serializable_pubkey::SerializablePubkey;
use crate::common::typedefs::unsigned_integer::UnsignedInteger;
use crate::dao::generated::token_owner_balances;

use super::utils::{Authority, Context, GetCompressedTokenAccountsByAuthorityOptions, Limit};
use super::{super::error::PhotonApiError, utils::fetch_token_accounts};
use super::utils::{
parse_decimal, Context, Limit,
PAGE_LIMIT,
};
use super::super::error::PhotonApiError;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct TokenBalance {
Expand All @@ -20,7 +23,7 @@ pub struct TokenBalance {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct TokenBalanceList {
pub token_balances: Vec<TokenBalance>,
pub cursor: Option<String>,
pub cursor: Option<Base58String>,
}

// We do not use generics to simplify documentation generation.
Expand All @@ -44,40 +47,64 @@ pub async fn get_compressed_token_balances_by_owner(
conn: &DatabaseConnection,
request: GetCompressedTokenBalancesByOwnerRequest,
) -> Result<TokenBalancesResponse, PhotonApiError> {
let context = Context::extract(conn).await?;
let GetCompressedTokenBalancesByOwnerRequest {
owner,
mint,
cursor,
limit,
} = request;
let mut filter = token_owner_balances::Column::Owner.eq::<Vec<u8>>(owner.into());
if let Some(mint) = mint {
filter = filter.and(token_owner_balances::Column::Mint.eq::<Vec<u8>>(mint.into()));
}
if let Some(cursor) = cursor {
let bytes = cursor.0;
let expected_cursor_length = 32;
let mint = if bytes.len() == expected_cursor_length {
bytes.to_vec()
} else {
return Err(PhotonApiError::ValidationError(format!(
"Invalid cursor length. Expected {}. Received {}.",
expected_cursor_length,
bytes.len()
)));
};
filter = filter.and(token_owner_balances::Column::Mint.gte::<Vec<u8>>(mint.into()));
}
let limit = limit.map(|l| l.value()).unwrap_or(PAGE_LIMIT);

let options = GetCompressedTokenAccountsByAuthorityOptions {
mint,
cursor,
limit,
};
let token_accounts = fetch_token_accounts(conn, Authority::Owner(owner), options).await?;
let mut mint_to_balance: HashMap<SerializablePubkey, u64> = HashMap::new();
let items = token_owner_balances::Entity::find()
.filter(filter)
.order_by_asc(token_owner_balances::Column::Mint)
.limit(limit)
.all(conn)
.await?
.drain(..)
.map(|token_owner_balance| {
Ok(TokenBalance {
mint: token_owner_balance.mint.try_into()?,
balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?),
})
})
.collect::<Result<Vec<TokenBalance>, PhotonApiError>>()?;

for token_account in token_accounts.value.items.iter() {
let balance = mint_to_balance
.entry(token_account.token_data.mint)
.or_insert(0);
*balance += token_account.token_data.amount.0;
}
let token_balances: Vec<TokenBalance> = mint_to_balance
.into_iter()
.map(|(mint, balance)| TokenBalance {
mint,
balance: UnsignedInteger(balance),
let mut cursor = items.last().map(|item| {
Base58String({
let item = item.clone();
let bytes: Vec<u8> = item.mint.into();
bytes
})
.collect();
});
if items.len() < limit as usize {
cursor = None;
}

Ok(TokenBalancesResponse {
context: token_accounts.context,
value: TokenBalanceList {
token_balances,
cursor: None,
token_balances: items,
cursor,
},
context,
})
}
1 change: 1 addition & 0 deletions src/api/method/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod get_compressed_account_balance;
pub mod get_compressed_account_proof;
pub mod get_compressed_accounts_by_owner;
pub mod get_compressed_balance_by_owner;
pub mod get_compressed_mint_token_holders;
pub mod get_compressed_token_account_balance;
pub mod get_compressed_token_accounts_by_delegate;
pub mod get_compressed_token_accounts_by_owner;
Expand Down
24 changes: 15 additions & 9 deletions src/api/method/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,14 @@ pub async fn fetch_token_accounts(
)));
}
let (mint, hash) = bytes.split_at(32);
filter = filter
.and(token_accounts::Column::Mint.gte::<Vec<u8>>(mint.into()))
.and(token_accounts::Column::Hash.gt::<Vec<u8>>(hash.into()));

filter = filter.and(
token_accounts::Column::Mint.gt::<Vec<u8>>(mint.into()).or(
token_accounts::Column::Mint
.eq::<Vec<u8>>(mint.into())
.and(token_accounts::Column::Hash.gt::<Vec<u8>>(hash.into())),
),
);
}
if let Some(l) = options.limit {
limit = l.value();
Expand All @@ -282,9 +287,11 @@ pub async fn fetch_token_accounts(
let items = token_accounts::Entity::find()
.find_also_related(accounts::Entity)
.filter(filter)
.order_by_asc(token_accounts::Column::Mint)
.order_by_asc(token_accounts::Column::Hash)
.order_by(token_accounts::Column::Mint, sea_orm::Order::Asc)
.order_by(token_accounts::Column::Hash, sea_orm::Order::Asc)
.limit(limit)
.order_by(token_accounts::Column::Mint, sea_orm::Order::Asc)
.order_by(token_accounts::Column::Hash, sea_orm::Order::Asc)
.all(conn)
.await?
.drain(..)
Expand Down Expand Up @@ -572,15 +579,14 @@ fn compute_cursor_filter(
let signature = Signature::try_from(signature).map_err(|_| {
PhotonApiError::ValidationError("Invalid signature in cursor".to_string())
})?;
let (slot_arg_index, signature_arg_index) =
(num_preceding_args + 1, num_preceding_args + 2);

Ok((
format!(
"AND transactions.slot <= ${} AND transactions.signature < ${}",
slot_arg_index, signature_arg_index
"AND (transactions.slot < ${} OR (transactions.slot = ${} AND transactions.signature < ${}))",
num_preceding_args + 1, num_preceding_args + 2, num_preceding_args + 3
),
vec![
slot.into(),
slot.into(),
Into::<Vec<u8>>::into(Into::<[u8; 64]>::into(signature)).into(),
],
Expand Down
10 changes: 10 additions & 0 deletions src/api/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result<RpcModule<PhotonApi>,
.map_err(Into::into)
},
)?;
module.register_async_method(
"getCompressedMintTokenHolders",
|rpc_params, rpc_context| async move {
let api = rpc_context.as_ref();
let payload = rpc_params.parse()?;
api.get_compressed_mint_token_holders(payload)
.await
.map_err(Into::into)
},
)?;

Ok(module)
}
5 changes: 1 addition & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ pub fn setup_metrics(metrics_endpoint: Option<String>) {
let udp_sink = BufferedUdpMetricSink::from((host, port), socket).unwrap();
let queuing_sink = QueuingMetricSink::from(udp_sink);
let builder = StatsdClient::builder("photon", queuing_sink);
let client = builder
.with_tag("env", env)
.with_tag("version", env!("CARGO_PKG_VERSION"))
.build();
let client = builder.with_tag("env", env).build();
set_global_default(client);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/common/typedefs/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ impl<'__s> ToSchema<'__s> for Hash {
.schema_type(SchemaType::String)
.description(Some("A 32-byte hash represented as a base58 string."))
.example(Some(serde_json::Value::String(
Hash::new_unique().to_base58(),
Hash::try_from("11111112cMQwSC9qirWGjZM6gLGwW69X22mqwLLGP")
.unwrap()
.to_base58(),
)))
.build(),
);
Expand Down
Loading

0 comments on commit 50ea928

Please sign in to comment.