Skip to content

Commit

Permalink
feat: update tap agent to latest subgraph client
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Nov 30, 2023
1 parent 348742a commit f201698
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 39 deletions.
10 changes: 8 additions & 2 deletions tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ reqwest = "0.11.20"
serde = "1.0.188"
serde_json = "1.0.104"
serde_yaml = "0.9.25"
sqlx = { version = "0.7.2", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal"] }
sqlx = { version = "0.7.2", features = [
"postgres",
"runtime-tokio",
"bigdecimal",
"rust_decimal",
] }
tap_aggregator = "0.1.6"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol.git", rev = "882ca394444b451538908b9996bf7d45869a1bb9" }
thiserror = "1.0.44"
tokio = { version = "1.33.0" }
toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main", features = ["graphql"] }
toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main" }
graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = [
"env-filter",
Expand Down
71 changes: 34 additions & 37 deletions tap-agent/src/tap/receipt_checks_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy_primitives::Address;
use async_trait::async_trait;
use ethereum_types::U256;
use eventuals::{timer, Eventual, EventualExt};
use indexer_common::subgraph_client::SubgraphClient;
use indexer_common::subgraph_client::{Query, SubgraphClient};
use serde_json::json;
use sqlx::PgPool;
use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait;
Expand Down Expand Up @@ -153,45 +153,42 @@ impl ReceiptChecksAdapter {
timer(Duration::from_millis(escrow_subgraph_polling_interval_ms)).map_with_retry(
move |_| async move {
let response = escrow_subgraph
.query::<TransactionsResponse>(&json!({
"query": r#"
query (
$sender_id: ID!,
$receiver_id: ID!,
$allocation_id: String!
) {
transactions(
where: {
and: [
{ type: "redeem" }
{ sender_: { id: $sender_id } }
{ receiver_: { id: $receiver_id } }
{ allocationID: $allocation_id }
]
}
) {
allocationID
sender {
id
}
}
.query::<TransactionsResponse>(Query::new_with_variables(
r#"
query (
$sender_id: ID!,
$receiver_id: ID!,
$allocation_id: String!
) {
transactions(
where: {
and: [
{ type: "redeem" }
{ sender_: { id: $sender_id } }
{ receiver_: { id: $receiver_id } }
{ allocationID: $allocation_id }
]
}
"#,
"variables": {
"sender_id": sender_address.to_string(),
"receiver_id": indexer_address.to_string(),
"allocation_id": allocation_id.to_string(),
}
}))
) {
allocationID
sender {
id
}
}
}
"#,
[
("sender_id", sender_address.to_string().into()),
("receiver_id", indexer_address.to_string().into()),
("allocation_id", allocation_id.to_string().into()),
],
))
.await
.map_err(|e| e.to_string())?;
let response = response.data.ok_or_else(|| {
format!(
"No data found in escrow subgraph response for allocation {} and sender {}",
allocation_id, sender_address
)
})?;
Ok(!response.transactions.is_empty())

response
.map_err(|e| e.to_string())
.map(|data| !data.transactions.is_empty())
},
move |error: String| {
error!(
Expand Down

0 comments on commit f201698

Please sign in to comment.