Skip to content

Commit

Permalink
feat: add tap v2 receipt parser
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Dec 27, 2024
1 parent 83334d5 commit f86b213
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 81 deletions.
216 changes: 152 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ uuid = { version = "1.11.0", features = ["v7"] }
tracing = { version = "0.1.40", default-features = false }
bigdecimal = "0.4.3"
build-info = "0.0.39"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false }
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3c56018", default-features = false }
tap_core_v2 = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1dada3e", package = "tap_core" }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3c56018", default-features = false }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
"ansi",
], default-features = false }
thegraph-core = { version = "0.9.6", features = [
thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev= "d710e05", features = [
"attestation",
"alloy-eip712",
"alloy-sol-types",
Expand Down
1 change: 1 addition & 0 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async-graphql-axum = "7.0.11"
base64.workspace = true
graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" }
tap_core.workspace = true
tap_core_v2.workspace = true
uuid.workspace = true
typed-builder.workspace = true
tower_governor = { version = "0.5.0", features = ["axum"] }
Expand Down
13 changes: 9 additions & 4 deletions crates/service/src/middleware/tap_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ use crate::service::TapReceipt;
///
/// This is useful to not deserialize multiple times the same receipt
pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {
if let Ok(TypedHeader(TapReceipt(receipt))) =
request.extract_parts::<TypedHeader<TapReceipt>>().await
{
request.extensions_mut().insert(receipt);
if let Ok(TypedHeader(receipt)) = request.extract_parts::<TypedHeader<TapReceipt>>().await {
match receipt {
TapReceipt::V1(receipt) => {
request.extensions_mut().insert(receipt);
}
TapReceipt::V2(receipt) => {
request.extensions_mut().insert(receipt);
}
}
}
next.run(request).await
}
Expand Down
121 changes: 113 additions & 8 deletions crates/service/src/service/tap_receipt_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,81 @@
use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
use lazy_static::lazy_static;
use prometheus::{register_counter, Counter};
use tap_core::receipt::SignedReceipt;
use serde::de;
use serde_json::Value;
use tap_core::receipt::SignedReceipt as SignedReceiptV1;
use tap_core_v2::receipt::SignedReceipt as SignedReceiptV2;

#[derive(Debug, PartialEq)]
pub struct TapReceipt(pub SignedReceipt);
#[derive(Debug, PartialEq, Clone, serde::Serialize)]
#[serde(untagged)]
pub enum TapReceipt {
V1(SignedReceiptV1),
V2(SignedReceiptV2),
}

impl<'de> serde::Deserialize<'de> for TapReceipt {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let temp = Value::deserialize(deserializer)?;

let is_v1 = temp
.as_object()
.ok_or(de::Error::custom("Didn't receive an object"))?
.get("message")
.ok_or(de::Error::custom("There's no message in the object"))?
.as_object()
.ok_or(de::Error::custom("Message is not an object"))?
.contains_key("allocation_id");

if is_v1 {
// Try V1 first
serde_json::from_value::<SignedReceiptV1>(temp)
.map(TapReceipt::V1)
.map_err(de::Error::custom)
} else {
// Fall back to V2
serde_json::from_value::<SignedReceiptV2>(temp)
.map(TapReceipt::V2)
.map_err(de::Error::custom)
}
}
}

impl From<SignedReceiptV1> for TapReceipt {
fn from(value: SignedReceiptV1) -> Self {
Self::V1(value)
}
}

impl From<SignedReceiptV2> for TapReceipt {
fn from(value: SignedReceiptV2) -> Self {
Self::V2(value)
}
}

impl TryFrom<TapReceipt> for SignedReceiptV1 {
type Error = anyhow::Error;

fn try_from(value: TapReceipt) -> Result<Self, Self::Error> {
match value {
TapReceipt::V2(_) => Err(anyhow::anyhow!("TapReceipt is V2")),
TapReceipt::V1(receipt) => Ok(receipt),
}
}
}

impl TryFrom<TapReceipt> for SignedReceiptV2 {
type Error = anyhow::Error;

fn try_from(value: TapReceipt) -> Result<Self, Self::Error> {
match value {
TapReceipt::V1(_) => Err(anyhow::anyhow!("TapReceipt is V1")),
TapReceipt::V2(receipt) => Ok(receipt),
}
}
}

lazy_static! {
static ref TAP_RECEIPT: HeaderName = HeaderName::from_static("tap-receipt");
Expand All @@ -30,9 +101,9 @@ impl Header for TapReceipt {
let raw_receipt = raw_receipt
.to_str()
.map_err(|_| headers::Error::invalid())?;
let parsed_receipt =
let parsed_receipt: TapReceipt =
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
Ok(TapReceipt(parsed_receipt))
Ok(parsed_receipt)
};
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
}
Expand All @@ -49,20 +120,54 @@ impl Header for TapReceipt {
mod test {
use axum::http::HeaderValue;
use axum_extra::headers::Header;
use test_assets::{create_signed_receipt, SignedReceiptRequest};
use test_assets::{
create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest,
SignedReceiptV2Request,
};

use super::TapReceipt;

#[tokio::test]
async fn test_decode_valid_tap_receipt_header() {
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();

let original_receipt_v1: TapReceipt = original_receipt.clone().into();
let serialized_receipt_v1 = serde_json::to_string(&original_receipt_v1).unwrap();

assert_eq!(serialized_receipt, serialized_receipt_v1);

println!("Was able to serialize properly: {serialized_receipt_v1:?}");
let deserialized: TapReceipt = serde_json::from_str(&serialized_receipt_v1).unwrap();
println!("Was able to deserialize properly: {deserialized:?}");
let header_value = HeaderValue::from_str(&serialized_receipt_v1).unwrap();
let header_values = vec![&header_value];
let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter())
.expect("tap receipt header value should be valid");

assert_eq!(decoded_receipt, original_receipt.into());
}

#[tokio::test]
async fn test_decode_valid_tap_v2_receipt_header() {
let original_receipt =
create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await;
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();

let original_receipt_v1: TapReceipt = original_receipt.clone().into();
let serialized_receipt_v1 = serde_json::to_string(&original_receipt_v1).unwrap();

assert_eq!(serialized_receipt, serialized_receipt_v1);

println!("Was able to serialize properly: {serialized_receipt_v1:?}");
let deserialized: TapReceipt = serde_json::from_str(&serialized_receipt_v1).unwrap();
println!("Was able to deserialize properly: {deserialized:?}");
let header_value = HeaderValue::from_str(&serialized_receipt_v1).unwrap();
let header_values = vec![&header_value];
let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter())
.expect("tap receipt header value should be valid");

assert_eq!(decoded_receipt, TapReceipt(original_receipt));
assert_eq!(decoded_receipt, original_receipt.into());
}

#[test]
Expand Down
3 changes: 1 addition & 2 deletions crates/tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1258,8 +1258,7 @@ pub mod tests {
));

// Stop the TAP aggregator server.
handle.stop().unwrap();
handle.stopped().await;
handle.abort();
}

#[sqlx::test(migrations = "../../migrations")]
Expand Down
1 change: 1 addition & 0 deletions crates/test-assets/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ indexer-allocation = { path = "../allocation" }
bip39 = "2.0.0"
lazy_static.workspace = true
tap_core.workspace = true
tap_core_v2.workspace = true
thegraph-core.workspace = true
typed-builder.workspace = true
tokio.workspace = true
53 changes: 53 additions & 0 deletions crates/test-assets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ use tap_core::{
signed_message::EIP712SignedMessage,
tap_eip712_domain,
};

use tap_core_v2::{
receipt::{Receipt as ReceiptV2, SignedReceipt as SignedReceiptV2},
signed_message::EIP712SignedMessage as EIP712SignedMessageV2,
// tap_eip712_domain as tap_eip712_domain_v2,
};
use thegraph_core::{
alloy::{
primitives::{address, Address, U256},
Expand Down Expand Up @@ -353,6 +359,53 @@ pub async fn create_signed_receipt(
.unwrap()
}

#[derive(TypedBuilder)]
pub struct SignedReceiptV2Request {
#[builder(default = Address::ZERO)]
payer: Address,
#[builder(default = Address::ZERO)]
data_service: Address,
#[builder(default = Address::ZERO)]
service_provider: Address,
#[builder(default)]
nonce: u64,
#[builder(default_code = r#"SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64"#)]
timestamp_ns: u64,
#[builder(default = 1)]
value: u128,
}

pub async fn create_signed_receipt_v2(
SignedReceiptV2Request {
payer,
data_service,
service_provider,
nonce,
timestamp_ns,
value,
}: SignedReceiptV2Request,
) -> SignedReceiptV2 {
let (wallet, _) = &*self::TAP_SIGNER;

EIP712SignedMessageV2::new(
&self::TAP_EIP712_DOMAIN,
ReceiptV2 {
payer,
data_service,
service_provider,
nonce,
timestamp_ns,
value,
},
wallet,
)
.unwrap()
}


pub async fn flush_messages(notify: &Notify) {
loop {
if tokio::time::timeout(Duration::from_millis(10), notify.notified())
Expand Down

0 comments on commit f86b213

Please sign in to comment.