From f2016989a2df0d93b815509d390ff293abcf3024 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 30 Nov 2023 11:26:21 +0100 Subject: [PATCH] feat: update tap agent to latest subgraph client --- tap-agent/Cargo.toml | 10 ++- tap-agent/src/tap/receipt_checks_adapter.rs | 71 ++++++++++----------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/tap-agent/Cargo.toml b/tap-agent/Cargo.toml index 2eb3d4a18..d65de7388 100644 --- a/tap-agent/Cargo.toml +++ b/tap-agent/Cargo.toml @@ -26,12 +26,18 @@ reqwest = "0.11.20" serde = "1.0.188" serde_json = "1.0.104" serde_yaml = "0.9.25" -sqlx = { version = "0.7.2", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal"] } +sqlx = { version = "0.7.2", features = [ + "postgres", + "runtime-tokio", + "bigdecimal", + "rust_decimal", +] } tap_aggregator = "0.1.6" tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol.git", rev = "882ca394444b451538908b9996bf7d45869a1bb9" } thiserror = "1.0.44" tokio = { version = "1.33.0" } -toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main", features = ["graphql"] } +toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } +graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = [ "env-filter", diff --git a/tap-agent/src/tap/receipt_checks_adapter.rs b/tap-agent/src/tap/receipt_checks_adapter.rs index 0c0b09662..2da4d5ea4 100644 --- a/tap-agent/src/tap/receipt_checks_adapter.rs +++ b/tap-agent/src/tap/receipt_checks_adapter.rs @@ -7,7 +7,7 @@ use alloy_primitives::Address; use async_trait::async_trait; use ethereum_types::U256; use eventuals::{timer, Eventual, EventualExt}; -use indexer_common::subgraph_client::SubgraphClient; +use indexer_common::subgraph_client::{Query, SubgraphClient}; use serde_json::json; use sqlx::PgPool; use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait; @@ -153,45 +153,42 @@ impl ReceiptChecksAdapter { timer(Duration::from_millis(escrow_subgraph_polling_interval_ms)).map_with_retry( move |_| async move { let response = escrow_subgraph - .query::(&json!({ - "query": r#" - query ( - $sender_id: ID!, - $receiver_id: ID!, - $allocation_id: String! - ) { - transactions( - where: { - and: [ - { type: "redeem" } - { sender_: { id: $sender_id } } - { receiver_: { id: $receiver_id } } - { allocationID: $allocation_id } - ] - } - ) { - allocationID - sender { - id - } - } + .query::(Query::new_with_variables( + r#" + query ( + $sender_id: ID!, + $receiver_id: ID!, + $allocation_id: String! + ) { + transactions( + where: { + and: [ + { type: "redeem" } + { sender_: { id: $sender_id } } + { receiver_: { id: $receiver_id } } + { allocationID: $allocation_id } + ] } - "#, - "variables": { - "sender_id": sender_address.to_string(), - "receiver_id": indexer_address.to_string(), - "allocation_id": allocation_id.to_string(), - } - })) + ) { + allocationID + sender { + id + } + } + } + "#, + [ + ("sender_id", sender_address.to_string().into()), + ("receiver_id", indexer_address.to_string().into()), + ("allocation_id", allocation_id.to_string().into()), + ], + )) .await .map_err(|e| e.to_string())?; - let response = response.data.ok_or_else(|| { - format!( - "No data found in escrow subgraph response for allocation {} and sender {}", - allocation_id, sender_address - ) - })?; - Ok(!response.transactions.is_empty()) + + response + .map_err(|e| e.to_string()) + .map(|data| !data.transactions.is_empty()) }, move |error: String| { error!(