From 69cedf3ad3d759629d729930de65d50abc98ae50 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Mon, 2 Dec 2024 21:47:47 +0000 Subject: [PATCH] feat: dips grpc --- .github/workflows/license_headers_check.yml | 1 + .github/workflows/tests.yml | 8 + Cargo.lock | 77 +++ Cargo.toml | 6 +- Dockerfile.indexer-service-rs | 5 +- Dockerfile.indexer-tap-agent | 2 + crates/config/maximal-config-example.toml | 3 + crates/config/src/config.rs | 18 +- crates/dips/Cargo.toml | 10 + crates/dips/build.rs | 12 + crates/dips/proto/dips.proto | 59 ++ crates/dips/src/lib.rs | 186 ++++-- .../src/proto/graphprotocol.indexer.dips.rs | 621 ++++++++++++++++++ crates/dips/src/proto/mod.rs | 8 + crates/dips/src/server.rs | 88 +++ crates/dips/src/store.rs | 56 ++ crates/service/Cargo.toml | 1 + crates/service/src/database/dips.rs | 55 +- crates/service/src/routes/dips.rs | 334 ---------- crates/service/src/routes/mod.rs | 1 - crates/service/src/service.rs | 60 +- crates/service/src/service/router.rs | 32 +- crates/test-assets/src/lib.rs | 1 - 23 files changed, 1179 insertions(+), 465 deletions(-) create mode 100644 crates/dips/build.rs create mode 100644 crates/dips/proto/dips.proto create mode 100644 crates/dips/src/proto/graphprotocol.indexer.dips.rs create mode 100644 crates/dips/src/proto/mod.rs create mode 100644 crates/dips/src/server.rs create mode 100644 crates/dips/src/store.rs delete mode 100644 crates/service/src/routes/dips.rs diff --git a/.github/workflows/license_headers_check.yml b/.github/workflows/license_headers_check.yml index c4bd8363..1e588284 100644 --- a/.github/workflows/license_headers_check.yml +++ b/.github/workflows/license_headers_check.yml @@ -29,4 +29,5 @@ jobs: -ignore '.github/workflows/*.yaml' \ -ignore '.github/*.yaml' \ -ignore 'migrations/*.sql' \ + -ignore 'crates/dips/src/proto/*' \ . diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ec8242e8..c70232e1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -50,6 +50,8 @@ jobs: - name: Run sccache-cache uses: mozilla-actions/sccache-action@9e326ebed976843c9932b3aa0e021c6f50310eb4 # v0.0.6 if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Install sqlx run: cargo install sqlx-cli --no-default-features --features postgres - name: Run the test sqlx migrations @@ -78,6 +80,8 @@ jobs: - name: Run sccache-cache uses: mozilla-actions/sccache-action@9e326ebed976843c9932b3aa0e021c6f50310eb4 # v0.0.6 if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - run: | rustup component add clippy # Temporarily allowing dead-code, while denying all other warnings @@ -116,6 +120,8 @@ jobs: echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Run sccache-cache uses: mozilla-actions/sccache-action@9e326ebed976843c9932b3aa0e021c6f50310eb4 # v0.0.6 if: ${{ !startsWith(github.head_ref, 'renovate/') }} @@ -166,6 +172,8 @@ jobs: echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Run sccache-cache uses: mozilla-actions/sccache-action@9e326ebed976843c9932b3aa0e021c6f50310eb4 # v0.0.6 if: ${{ !startsWith(github.head_ref, 'renovate/') }} diff --git a/Cargo.lock b/Cargo.lock index b708a3b1..6309f0bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2601,6 +2601,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.35" @@ -3601,8 +3607,16 @@ dependencies = [ "alloy-rlp", "alloy-sol-types", "anyhow", + "async-trait", + "base64 0.22.1", + "prost", + "prost-types", "thegraph-core", "thiserror 1.0.69", + "tokio", + "tonic", + "tonic-build", + "uuid", ] [[package]] @@ -3688,6 +3702,7 @@ dependencies = [ "tokio", "tokio-test", "tokio-util", + "tonic", "tower 0.5.1", "tower-http", "tower-service", @@ -4337,6 +4352,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.12" @@ -4818,6 +4839,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.7.0", +] + [[package]] name = "pharos" version = "0.5.3" @@ -4938,6 +4969,16 @@ dependencies = [ "yansi", ] +[[package]] +name = "prettyplease" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +dependencies = [ + "proc-macro2", + "syn 2.0.90", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -5070,6 +5111,27 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.90", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.13.3" @@ -7140,6 +7202,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.7", "http 1.1.0", "http-body 1.0.1", @@ -7162,6 +7225,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.90", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index 21f073b5..9cbf689c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ tokio = "1.40" prometheus = "0.13.3" anyhow = { version = "1.0.72" } thiserror = "1.0.49" -async-trait = "0.1.72" +async-trait = "0.1.83" eventuals = "0.6.7" base64 = "0.22.1" reqwest = { version = "0.12", features = [ @@ -78,3 +78,7 @@ bip39 = "2.0.0" rstest = "0.23.0" wiremock = "0.6.1" typed-builder = "0.20.0" +tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] } +tonic-build = { version = "0.12.3", features = ["prost"] } +prost = "0.13.3" +prost-types = "0.13.3" diff --git a/Dockerfile.indexer-service-rs b/Dockerfile.indexer-service-rs index a6518f27..39eb32d8 100644 --- a/Dockerfile.indexer-service-rs +++ b/Dockerfile.indexer-service-rs @@ -6,6 +6,9 @@ COPY . . # Force SQLx to use the offline mode to statically check the database queries against # the prepared files in the `.sqlx` directory. ENV SQLX_OFFLINE=true + +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler && rm -rf /var/lib/apt/lists/* RUN cargo build --release --bin indexer-service-rs ######################################################################################## @@ -13,7 +16,7 @@ RUN cargo build --release --bin indexer-service-rs FROM debian:bookworm-slim RUN apt-get update && apt-get install -y --no-install-recommends \ - openssl ca-certificates \ + openssl ca-certificates protobuf-compiler \ && rm -rf /var/lib/apt/lists/* COPY --from=build /root/target/release/indexer-service-rs /usr/local/bin/indexer-service-rs diff --git a/Dockerfile.indexer-tap-agent b/Dockerfile.indexer-tap-agent index 306944b3..3663c9d4 100644 --- a/Dockerfile.indexer-tap-agent +++ b/Dockerfile.indexer-tap-agent @@ -6,6 +6,8 @@ COPY . . # Force SQLx to use the offline mode to statically check the database queries against # the prepared files in the `.sqlx` directory. ENV SQLX_OFFLINE=true +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler && rm -rf /var/lib/apt/lists/* RUN cargo build --release --bin indexer-tap-agent ######################################################################################## diff --git a/crates/config/maximal-config-example.toml b/crates/config/maximal-config-example.toml index 0efa9d57..b5c15332 100644 --- a/crates/config/maximal-config-example.toml +++ b/crates/config/maximal-config-example.toml @@ -147,4 +147,7 @@ max_receipts_per_request = 10000 0x0123456789abcdef0123456789abcdef01234567 = "https://other.example.com/aggregate-receipts" [dips] +host = "0.0.0.0" +port = "7601" allowed_payers = ["0x3333333333333333333333333333333333333333"] +expected_payee = "0x0000000000000000000000000000000000000000" diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 1703d4e8..a74c90e0 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -386,10 +386,25 @@ pub struct TapConfig { #[derive(Debug, Deserialize)] #[cfg_attr(test, derive(PartialEq))] pub struct DipsConfig { + pub host: String, + pub port: String, + pub expected_payee: Address, pub allowed_payers: Vec
, pub cancellation_time_tolerance: Option, } +impl Default for DipsConfig { + fn default() -> Self { + DipsConfig { + host: "0.0.0.0".to_string(), + port: "7601".to_string(), + expected_payee: Address::ZERO, + allowed_payers: vec![], + cancellation_time_tolerance: None, + } + } +} + impl TapConfig { pub fn get_trigger_value(&self) -> u128 { let grt_wei = self.max_amount_willing_to_lose_grt.get_value(); @@ -450,7 +465,8 @@ mod tests { allowed_payers: vec![thegraph_core::Address( FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(), )], - cancellation_time_tolerance: None, + expected_payee: thegraph_core::Address::ZERO, + ..Default::default() }); let max_config_file: Config = toml::from_str( diff --git a/crates/dips/Cargo.toml b/crates/dips/Cargo.toml index 5ab10a85..c6d79e62 100644 --- a/crates/dips/Cargo.toml +++ b/crates/dips/Cargo.toml @@ -10,3 +10,13 @@ anyhow.workspace = true alloy-sol-types = "=0.8.13" alloy-rlp = "0.3.9" thegraph-core.workspace = true +tonic.workspace = true +async-trait.workspace = true +prost.workspace = true +prost-types.workspace = true +uuid.workspace = true +base64.workspace = true +tokio.workspace = true + +[build-dependencies] +tonic-build = { workspace = true } diff --git a/crates/dips/build.rs b/crates/dips/build.rs new file mode 100644 index 00000000..1d761845 --- /dev/null +++ b/crates/dips/build.rs @@ -0,0 +1,12 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +fn main() { + println!("cargo:rerun-if-changed=proto"); + tonic_build::configure() + .out_dir("src/proto") + .include_file("mod.rs") + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["proto/dips.proto"], &["proto"]) + .expect("Failed to compile dips proto(s)"); +} diff --git a/crates/dips/proto/dips.proto b/crates/dips/proto/dips.proto new file mode 100644 index 00000000..400c484a --- /dev/null +++ b/crates/dips/proto/dips.proto @@ -0,0 +1,59 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package graphprotocol.indexer.dips; + +service AgreementService { + rpc CreateAgreement(CreateAgreementRequest) returns (CreateAgreementResponse); + rpc CancelAgreement(CancelAgreementRequest) returns (AgreementCanellationResponse); + rpc GetAgreementById(GetAgreementByIdRequest) returns (GetAgreementByIdResponse); + rpc GetPrice(PriceRequest) returns (PriceResponse); +} + +message GetAgreementByIdRequest { + +} + +message GetAgreementByIdResponse { + +} + +message CreateAgreementRequest { + string id = 1; + bytes signed_voucher = 2; +} + +message CancelAgreementRequest { + string id = 1; + bytes signed_voucher = 2; +} + +message CreateAgreementResponse { + string uuid = 1; +} + +message AgreementCanellationResponse { + string uuid = 1; +} + +message PriceRequest { + ProtocolNetwork protocol = 1; + string chain_id = 2; +} + +message PriceResponse { + optional Price price = 1; +} + +message Price { + string price_per_block = 1; + string chain_id = 2; + ProtocolNetwork protocol = 3; +} + +enum ProtocolNetwork { + UNKNOWN = 0; + EVM = 1; +} diff --git a/crates/dips/src/lib.rs b/crates/dips/src/lib.rs index c073c439..e1eb4b84 100644 --- a/crates/dips/src/lib.rs +++ b/crates/dips/src/lib.rs @@ -2,15 +2,21 @@ // SPDX-License-Identifier: Apache-2.0 use std::str::FromStr; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use store::AgreementStore; +use uuid::Uuid; pub use alloy; pub use alloy_rlp; +pub mod proto; +pub mod server; +pub mod store; use alloy::core::primitives::Address; use alloy::rlp::{RlpDecodable, RlpEncodable}; use alloy::signers::{Signature, SignerSync}; -use alloy_rlp::Encodable; +use alloy_rlp::{Decodable, Encodable}; use thegraph_core::alloy_sol_types::{sol, Eip712Domain, SolStruct}; use thiserror::Error; @@ -75,14 +81,34 @@ sol! { } } -#[derive(Error, Debug, PartialEq)] -pub enum AgreementVoucherValidationError { +#[derive(Error, Debug)] +pub enum DipsError { + // agreement cration #[error("signature is not valid, error: {0}")] InvalidSignature(String), #[error("payer {0} not authorised")] PayerNotAuthorised(Address), #[error("voucher payee {actual} does not match the expected address {expected}")] UnexpectedPayee { expected: Address, actual: Address }, + // cancellation + #[error("cancelled_by is expected to match the signer")] + UnexpectedSigner, + #[error("signer {0} not authorised")] + SignerNotAuthorised(Address), + #[error("cancellation request has expired")] + ExpiredRequest, + // misc + #[error("rlp (de)serialisation failed")] + RlpSerializationError(#[from] alloy_rlp::Error), + #[error("unknown error: {0}")] + UnknownError(#[from] anyhow::Error), +} + +// TODO: send back messages +impl From for tonic::Status { + fn from(_val: DipsError) -> Self { + tonic::Status::internal("unknown errr") + } } impl IndexingAgreementVoucher { @@ -107,22 +133,22 @@ impl SignedIndexingAgreementVoucher { domain: &Eip712Domain, expected_payee: &Address, allowed_payers: impl AsRef<[Address]>, - ) -> Result<(), AgreementVoucherValidationError> { + ) -> Result<(), DipsError> { let sig = Signature::from_str(&self.signature.to_string()) - .map_err(|err| AgreementVoucherValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; let payer = sig .recover_address_from_prehash(&self.voucher.eip712_signing_hash(domain)) - .map_err(|err| AgreementVoucherValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; if allowed_payers.as_ref().is_empty() || !allowed_payers.as_ref().iter().any(|addr| addr.eq(&payer)) { - return Err(AgreementVoucherValidationError::PayerNotAuthorised(payer)); + return Err(DipsError::PayerNotAuthorised(payer)); } if !self.voucher.payee.eq(expected_payee) { - return Err(AgreementVoucherValidationError::UnexpectedPayee { + return Err(DipsError::UnexpectedPayee { expected: *expected_payee, actual: self.voucher.payee, }); @@ -139,18 +165,6 @@ impl SignedIndexingAgreementVoucher { } } -#[derive(Error, Debug, PartialEq)] -pub enum CancellationRequestValidationError { - #[error("signature is not valid, error: {0}")] - InvalidSignature(String), - #[error("cancelled_by is expected to match the signer")] - UnexpectedSigner, - #[error("signer {0} not authorised")] - SignerNotAuthorised(Address), - #[error("cancellation request has expired")] - ExpiredRequest, -} - impl CancellationRequest { pub fn sign( &self, @@ -172,30 +186,28 @@ impl SignedCancellationRequest { &self, domain: &Eip712Domain, time_tolerance: Duration, - ) -> Result<(), CancellationRequestValidationError> { + ) -> Result<(), DipsError> { let sig = Signature::from_str(&self.signature.to_string()) - .map_err(|err| CancellationRequestValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; let signer = sig .recover_address_from_prehash(&self.request.eip712_signing_hash(domain)) - .map_err(|err| CancellationRequestValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; if signer.ne(&self.request.cancellled_by) { - return Err(CancellationRequestValidationError::UnexpectedSigner); + return Err(DipsError::UnexpectedSigner); } if signer.ne(&self.request.payer) && signer.ne(&self.request.payee) { - return Err(CancellationRequestValidationError::SignerNotAuthorised( - signer, - )); + return Err(DipsError::SignerNotAuthorised(signer)); } let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .map_err(|_| CancellationRequestValidationError::ExpiredRequest)? + .map_err(|_| DipsError::ExpiredRequest)? .as_secs(); if now - self.request.timestamp >= time_tolerance.as_secs() { - return Err(CancellationRequestValidationError::ExpiredRequest); + return Err(DipsError::ExpiredRequest); } Ok(()) @@ -208,8 +220,46 @@ impl SignedCancellationRequest { } } +pub async fn validate_and_create_agreement( + store: Arc, + domain: &Eip712Domain, + id: Uuid, + expected_payee: &Address, + allowed_payers: impl AsRef<[Address]>, + voucher: Vec, +) -> Result { + let voucher = SignedIndexingAgreementVoucher::decode(&mut voucher.as_ref())?; + let metadata = SubgraphIndexingVoucherMetadata::decode(&mut voucher.voucher.metadata.as_ref())?; + + voucher.validate(domain, expected_payee, allowed_payers)?; + + store + .create_agreement(id, voucher, metadata.protocolNetwork.to_string()) + .await?; + + Ok(id) +} + +pub async fn validate_and_cancel_agreement( + store: Arc, + domain: &Eip712Domain, + id: Uuid, + agreement: Vec, + time_tolerance: Duration, +) -> Result { + let voucher = SignedCancellationRequest::decode(&mut agreement.as_ref())?; + + voucher.validate(domain, time_tolerance)?; + + store.cancel_agreement(id, voucher).await?; + + Ok(id) +} + #[cfg(test)] mod test { + use std::sync::Arc; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; use alloy::primitives::{Address, FixedBytes, U256}; @@ -218,9 +268,66 @@ mod test { use thegraph_core::attestation::eip712_domain; use crate::{ - AgreementVoucherValidationError, CancellationRequest, CancellationRequestValidationError, + alloy_rlp::{self}, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata, }; + use crate::{CancellationRequest, DipsError}; + use uuid::Uuid; + + pub use crate::store::{AgreementStore, InMemoryAgreementStore}; + + #[tokio::test] + async fn test_validate_and_create_agreement() -> anyhow::Result<()> { + let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string(); + let payee = PrivateKeySigner::random(); + let payee_addr = payee.address(); + let payer = PrivateKeySigner::random(); + let payer_addr = payer.address(); + + let metadata = SubgraphIndexingVoucherMetadata { + pricePerBlock: U256::from(10000_u64), + protocolNetwork: FixedBytes::left_padding_from("arbitrum-one".as_bytes()), + chainId: FixedBytes::left_padding_from("mainnet".as_bytes()), + deployment_ipfs_hash: deployment_id, + }; + + let voucher = IndexingAgreementVoucher { + payer: payer_addr, + payee: payee_addr, + service: Address(FixedBytes::ZERO), + maxInitialAmount: U256::from(10000_u64), + maxOngoingAmountPerEpoch: U256::from(10000_u64), + deadline: 1000, + maxEpochsPerCollection: 1000, + minEpochsPerCollection: 1000, + durationEpochs: 1000, + metadata: alloy_rlp::encode(metadata).into(), + }; + let domain = eip712_domain(0, Address::ZERO); + + let voucher = voucher.sign(&domain, payer)?; + let rlp_voucher = alloy_rlp::encode(voucher.clone()); + let id = Uuid::now_v7(); + + let store = Arc::new(InMemoryAgreementStore::default()); + + let actual_id = super::validate_and_create_agreement( + store.clone(), + &domain, + id, + &payee_addr, + vec![payer_addr], + rlp_voucher, + ) + .await + .unwrap(); + + let actual = store.get_by_id(actual_id).await.unwrap(); + + let actual_voucher = actual.unwrap(); + assert_eq!(voucher, actual_voucher); + Ok(()) + } #[test] fn voucher_signature_verification() { @@ -253,8 +360,11 @@ mod test { let domain = eip712_domain(0, Address::ZERO); let signed = voucher.sign(&domain, payer).unwrap(); assert_eq!( - signed.validate(&domain, &payee_addr, vec![]).unwrap_err(), - AgreementVoucherValidationError::PayerNotAuthorised(voucher.payer) + signed + .validate(&domain, &payee_addr, vec![]) + .unwrap_err() + .to_string(), + DipsError::PayerNotAuthorised(voucher.payer).to_string() ); assert!(signed .validate(&domain, &payee_addr, vec![payer_addr]) @@ -297,7 +407,7 @@ mod test { signed .validate(&domain, &payee_addr, vec![payer_addr]) .unwrap_err(), - AgreementVoucherValidationError::PayerNotAuthorised(_) + DipsError::PayerNotAuthorised(_) )); } @@ -325,7 +435,7 @@ mod test { name: &'a str, signer: PrivateKeySigner, timestamp: u64, - error: Option, + error: Option, } let cases: Vec = vec![ @@ -345,17 +455,13 @@ mod test { name: "invalid signer", signer: other_signer.clone(), timestamp: now, - error: Some(CancellationRequestValidationError::SignerNotAuthorised( - other_signer.address(), - )), + error: Some(DipsError::SignerNotAuthorised(other_signer.address())), }, Case { name: "expired timestamp", signer: payee, timestamp: 100, - error: Some(CancellationRequestValidationError::SignerNotAuthorised( - other_signer.address(), - )), + error: Some(DipsError::SignerNotAuthorised(other_signer.address())), }, ]; diff --git a/crates/dips/src/proto/graphprotocol.indexer.dips.rs b/crates/dips/src/proto/graphprotocol.indexer.dips.rs new file mode 100644 index 00000000..57fce10e --- /dev/null +++ b/crates/dips/src/proto/graphprotocol.indexer.dips.rs @@ -0,0 +1,621 @@ +// This file is @generated by prost-build. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetAgreementByIdRequest {} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetAgreementByIdResponse {} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreateAgreementRequest { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub signed_voucher: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CancelAgreementRequest { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub signed_voucher: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreateAgreementResponse { + #[prost(string, tag = "1")] + pub uuid: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AgreementCanellationResponse { + #[prost(string, tag = "1")] + pub uuid: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PriceRequest { + #[prost(enumeration = "ProtocolNetwork", tag = "1")] + pub protocol: i32, + #[prost(string, tag = "2")] + pub chain_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PriceResponse { + #[prost(message, optional, tag = "1")] + pub price: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Price { + #[prost(string, tag = "1")] + pub price_per_block: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub chain_id: ::prost::alloc::string::String, + #[prost(enumeration = "ProtocolNetwork", tag = "3")] + pub protocol: i32, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ProtocolNetwork { + Unknown = 0, + Evm = 1, +} +impl ProtocolNetwork { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unknown => "UNKNOWN", + Self::Evm => "EVM", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNKNOWN" => Some(Self::Unknown), + "EVM" => Some(Self::Evm), + _ => None, + } + } +} +/// Generated client implementations. +pub mod agreement_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct AgreementServiceClient { + inner: tonic::client::Grpc, + } + impl AgreementServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl AgreementServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> AgreementServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + AgreementServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn create_agreement( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/CreateAgreement", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "CreateAgreement", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn cancel_agreement( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/CancelAgreement", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "CancelAgreement", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_agreement_by_id( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/GetAgreementById", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "GetAgreementById", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_price( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/GetPrice", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "GetPrice", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod agreement_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with AgreementServiceServer. + #[async_trait] + pub trait AgreementService: std::marker::Send + std::marker::Sync + 'static { + async fn create_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn cancel_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_agreement_by_id( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_price( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct AgreementServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl AgreementServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for AgreementServiceServer + where + T: AgreementService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/graphprotocol.indexer.dips.AgreementService/CreateAgreement" => { + #[allow(non_camel_case_types)] + struct CreateAgreementSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for CreateAgreementSvc { + type Response = super::CreateAgreementResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::create_agreement(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CreateAgreementSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.indexer.dips.AgreementService/CancelAgreement" => { + #[allow(non_camel_case_types)] + struct CancelAgreementSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for CancelAgreementSvc { + type Response = super::AgreementCanellationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::cancel_agreement(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CancelAgreementSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.indexer.dips.AgreementService/GetAgreementById" => { + #[allow(non_camel_case_types)] + struct GetAgreementByIdSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for GetAgreementByIdSvc { + type Response = super::GetAgreementByIdResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_agreement_by_id( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetAgreementByIdSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.indexer.dips.AgreementService/GetPrice" => { + #[allow(non_camel_case_types)] + struct GetPriceSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for GetPriceSvc { + type Response = super::PriceResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_price(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetPriceSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for AgreementServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.AgreementService"; + impl tonic::server::NamedService for AgreementServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/crates/dips/src/proto/mod.rs b/crates/dips/src/proto/mod.rs new file mode 100644 index 00000000..8edb4be2 --- /dev/null +++ b/crates/dips/src/proto/mod.rs @@ -0,0 +1,8 @@ +// This file is @generated by prost-build. +pub mod graphprotocol { + pub mod indexer { + pub mod dips { + include!("graphprotocol.indexer.dips.rs"); + } + } +} diff --git a/crates/dips/src/server.rs b/crates/dips/src/server.rs new file mode 100644 index 00000000..3cd5dc58 --- /dev/null +++ b/crates/dips/src/server.rs @@ -0,0 +1,88 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{str::FromStr, sync::Arc, time::Duration}; + +use crate::{ + proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement, + validate_and_create_agreement, DipsError, +}; +use alloy_sol_types::Eip712Domain; +use anyhow::anyhow; +use async_trait::async_trait; +use thegraph_core::Address; +use uuid::Uuid; + +#[derive(Debug)] +pub struct DipsServer { + pub agreement_store: Arc, + pub expected_payee: Address, + pub allowed_payers: Vec
, + pub domain: Eip712Domain, + pub cancel_voucher_time_tolerance: Duration, +} + +#[async_trait] +impl agreement_service_server::AgreementService for DipsServer { + async fn create_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let CreateAgreementRequest { id, signed_voucher } = request.into_inner(); + let uid = Uuid::from_str(&id).map_err(|_| { + Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) + })?; + + validate_and_create_agreement( + self.agreement_store.clone(), + &self.domain, + uid, + &self.expected_payee, + &self.allowed_payers, + signed_voucher, + ) + .await + .map_err(Into::::into)?; + + Ok(tonic::Response::new(CreateAgreementResponse { + uuid: uid.to_string(), + })) + } + + async fn cancel_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let CancelAgreementRequest { id, signed_voucher } = request.into_inner(); + let uid = Uuid::from_str(&id).map_err(|_| { + Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) + })?; + + validate_and_cancel_agreement( + self.agreement_store.clone(), + &self.domain, + uid, + signed_voucher, + self.cancel_voucher_time_tolerance, + ) + .await + .map_err(Into::::into)?; + + Ok(tonic::Response::new(AgreementCanellationResponse { + uuid: uid.to_string(), + })) + } + async fn get_agreement_by_id( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } + + async fn get_price( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } +} diff --git a/crates/dips/src/store.rs b/crates/dips/src/store.rs new file mode 100644 index 00000000..7453b484 --- /dev/null +++ b/crates/dips/src/store.rs @@ -0,0 +1,56 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::{SignedCancellationRequest, SignedIndexingAgreementVoucher}; + +#[async_trait] +pub trait AgreementStore: Sync + Send + std::fmt::Debug { + async fn get_by_id(&self, id: Uuid) -> anyhow::Result>; + async fn create_agreement( + &self, + id: Uuid, + agreement: SignedIndexingAgreementVoucher, + protocol: String, + ) -> anyhow::Result<()>; + async fn cancel_agreement( + &self, + id: Uuid, + signed_cancellation: SignedCancellationRequest, + ) -> anyhow::Result; +} + +#[derive(Default, Debug)] +pub struct InMemoryAgreementStore { + pub data: tokio::sync::RwLock>, +} + +#[async_trait] +impl AgreementStore for InMemoryAgreementStore { + async fn get_by_id(&self, id: Uuid) -> anyhow::Result> { + Ok(self.data.try_read()?.get(&id).cloned()) + } + async fn create_agreement( + &self, + id: Uuid, + agreement: SignedIndexingAgreementVoucher, + _protocol: String, + ) -> anyhow::Result<()> { + self.data.try_write()?.insert(id, agreement.clone()); + + Ok(()) + } + async fn cancel_agreement( + &self, + id: Uuid, + _signed_cancellation: SignedCancellationRequest, + ) -> anyhow::Result { + self.data.try_write()?.remove(&id); + + Ok(id) + } +} diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index cbec46af..feac3b10 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -57,6 +57,7 @@ cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" } bip39.workspace = true tower = "0.5.1" pin-project = "1.1.7" +tonic.workspace = true [dev-dependencies] hex-literal = "0.4.1" diff --git a/crates/service/src/database/dips.rs b/crates/service/src/database/dips.rs index 317b341f..80db7828 100644 --- a/crates/service/src/database/dips.rs +++ b/crates/service/src/database/dips.rs @@ -1,67 +1,20 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; - use alloy::rlp::Decodable; use anyhow::bail; use axum::async_trait; use build_info::chrono::Utc; -use indexer_dips::{SignedCancellationRequest, SignedIndexingAgreementVoucher}; +use indexer_dips::{ + store::AgreementStore, SignedCancellationRequest, SignedIndexingAgreementVoucher, +}; use sqlx::PgPool; use uuid::Uuid; -#[async_trait] -pub trait AgreementStore: Sync + Send { - async fn get_by_id(&self, id: Uuid) -> anyhow::Result>; - async fn create_agreement( - &self, - id: Uuid, - agreement: SignedIndexingAgreementVoucher, - protocol: String, - ) -> anyhow::Result<()>; - async fn cancel_agreement( - &self, - id: Uuid, - signed_cancellation: SignedCancellationRequest, - ) -> anyhow::Result; -} - -#[derive(Default)] -pub struct InMemoryAgreementStore { - pub data: tokio::sync::RwLock>, -} - -#[async_trait] -impl AgreementStore for InMemoryAgreementStore { - async fn get_by_id(&self, id: Uuid) -> anyhow::Result> { - Ok(self.data.try_read()?.get(&id).cloned()) - } - async fn create_agreement( - &self, - id: Uuid, - agreement: SignedIndexingAgreementVoucher, - _protocol: String, - ) -> anyhow::Result<()> { - self.data.try_write()?.insert(id, agreement.clone()); - - Ok(()) - } - async fn cancel_agreement( - &self, - id: Uuid, - _signed_cancellation: SignedCancellationRequest, - ) -> anyhow::Result { - self.data.try_write()?.remove(&id); - - Ok(id) - } -} - #[derive(Debug)] pub struct PsqlAgreementStore { - pool: PgPool, + pub pool: PgPool, } #[async_trait] diff --git a/crates/service/src/routes/dips.rs b/crates/service/src/routes/dips.rs deleted file mode 100644 index 378be816..00000000 --- a/crates/service/src/routes/dips.rs +++ /dev/null @@ -1,334 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::time::Duration; -use std::{str::FromStr, sync::Arc}; - -use anyhow::bail; -use async_graphql::{Context, EmptySubscription, FieldResult, Object, Schema, SimpleObject}; -use base64::{engine::general_purpose::STANDARD, Engine}; -use indexer_config::{BlockchainConfig, DipsConfig}; -use indexer_dips::alloy::dyn_abi::Eip712Domain; -use indexer_dips::SignedCancellationRequest; -use indexer_dips::{ - alloy::core::primitives::Address, alloy_rlp::Decodable, SignedIndexingAgreementVoucher, - SubgraphIndexingVoucherMetadata, -}; -use thegraph_core::attestation::eip712_domain; -use uuid::Uuid; - -use crate::database::dips::AgreementStore; - -pub type DipsSchema = Schema; -pub type DipsStore = Arc; - -pub fn build_schema( - indexer_address: Address, - DipsConfig { - allowed_payers, - cancellation_time_tolerance, - }: &DipsConfig, - BlockchainConfig { - chain_id, - receipts_verifier_address, - }: &BlockchainConfig, - agreement_store: DipsStore, - prices: Vec, -) -> DipsSchema { - Schema::build( - AgreementQuery {}, - AgreementMutation { - expected_payee: indexer_address, - allowed_payers: allowed_payers.clone(), - domain: eip712_domain(*chain_id as u64, *receipts_verifier_address), - cancel_voucher_time_tolerance: cancellation_time_tolerance - .unwrap_or(Duration::from_secs(5)), - }, - EmptySubscription, - ) - .data(agreement_store) - .data(prices) - .finish() -} - -pub enum NetworkProtocol { - ArbitrumMainnet, -} - -impl FromStr for NetworkProtocol { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let p = match s { - "arbitrum-mainnet" => NetworkProtocol::ArbitrumMainnet, - _ => bail!("unknown network protocol"), - }; - - Ok(p) - } -} - -#[derive(SimpleObject, Debug, Clone, PartialEq)] -pub struct Agreement { - pub id: String, - pub signed_payload: String, - pub protocol_network: String, -} - -#[derive(SimpleObject, Debug, Clone, PartialEq)] -pub struct Cancellation { - pub signed_payload: String, - pub protocol_network: String, -} - -impl TryInto for (Uuid, SignedIndexingAgreementVoucher) { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let signed_payload = STANDARD.encode(self.1.encode_vec()); - let metadata = self.1.voucher.metadata; - let metadata: SubgraphIndexingVoucherMetadata = - SubgraphIndexingVoucherMetadata::decode(&mut metadata.as_ref())?; - - Ok(Agreement { - id: self.0.to_string(), - signed_payload, - protocol_network: metadata.protocolNetwork.to_string(), - }) - } -} - -impl TryInto for Agreement { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let rlp_bytes = STANDARD.decode(self.signed_payload)?; - let signed_voucher = SignedIndexingAgreementVoucher::decode(&mut rlp_bytes.as_ref())?; - - Ok(signed_voucher) - } -} - -#[derive(SimpleObject, Debug, Clone)] -pub struct Price { - price_per_block: String, - chain_id: String, - protocol_network: String, -} - -#[derive(Debug)] -pub struct AgreementQuery {} - -#[Object] -impl AgreementQuery { - pub async fn get_agreement_by_id<'a>( - &self, - ctx: &'a Context<'_>, - id: String, - ) -> FieldResult> { - let store: &Arc = ctx.data()?; - let id = Uuid::from_str(&id)?; - - match store - .get_by_id(id) - .await - .map_err(async_graphql::Error::from)? - { - Some(a) => (id, a) - .try_into() - .map(Some) - .map_err(async_graphql::Error::from), - None => Ok(None), - } - } - - pub async fn get_price<'a>( - &self, - ctx: &'a Context<'_>, - protocol_network: String, - chain_id: String, - ) -> FieldResult> { - let prices: &Vec = ctx.data()?; - - let p = prices - .iter() - .find(|p| p.protocol_network.eq(&protocol_network) && p.chain_id.eq(&chain_id)); - - Ok(p.cloned()) - } - - pub async fn get_all_prices<'a>(&self, ctx: &'a Context<'_>) -> FieldResult> { - let prices: &Vec = ctx.data()?; - - Ok(prices.clone()) - } -} - -#[derive(Debug)] -pub struct AgreementMutation { - pub expected_payee: Address, - pub allowed_payers: Vec
, - pub domain: Eip712Domain, - pub cancel_voucher_time_tolerance: Duration, -} - -#[Object] -impl AgreementMutation { - // create_agreements returns the UUID under which the agreement was stored. - pub async fn create_agreement<'a>( - &self, - ctx: &'a Context<'_>, - // uuid v7 that this agreement should use. - id: String, - // data should be the signed voucher, eip712 signed, rlp and base64 encoded. - signed_voucher: String, - ) -> FieldResult { - let store: &Arc = ctx.data()?; - let uid = Uuid::from_str(&id)?; - - validate_and_create_agreement( - store.clone(), - &self.domain, - uid, - &self.expected_payee, - &self.allowed_payers, - signed_voucher, - ) - .await - .map_err(async_graphql::Error::from)?; - - Ok(id) - } - - pub async fn cancel_agreement<'a>( - &self, - ctx: &'a Context<'_>, - id: String, - // data should be the signed voucher, eip712 signed, rlp and base64 encoded. - signed_request: String, - ) -> FieldResult { - let store: &Arc = ctx.data()?; - let uid = Uuid::from_str(&id)?; - - validate_and_cancel_agreement( - store.clone(), - &self.domain, - uid, - signed_request.clone(), - self.cancel_voucher_time_tolerance, - ) - .await - .map_err(async_graphql::Error::from)?; - - Ok(id) - } -} - -async fn validate_and_create_agreement( - store: Arc, - domain: &Eip712Domain, - id: Uuid, - expected_payee: &Address, - allowed_payers: impl AsRef<[Address]>, - agreement: String, -) -> anyhow::Result { - let rlp_bs = STANDARD.decode(agreement.clone())?; - let voucher = SignedIndexingAgreementVoucher::decode(&mut rlp_bs.as_ref())?; - let metadata = SubgraphIndexingVoucherMetadata::decode(&mut voucher.voucher.metadata.as_ref())?; - - voucher.validate(domain, expected_payee, allowed_payers)?; - - store - .create_agreement(id, voucher, metadata.protocolNetwork.to_string()) - .await?; - - Ok(id) -} - -async fn validate_and_cancel_agreement( - store: Arc, - domain: &Eip712Domain, - id: Uuid, - agreement: String, - time_tolerance: Duration, -) -> anyhow::Result { - let rlp_bs = STANDARD.decode(agreement.clone())?; - let voucher = SignedCancellationRequest::decode(&mut rlp_bs.as_ref())?; - - voucher.validate(domain, time_tolerance)?; - - store.cancel_agreement(id, voucher).await?; - - Ok(id) -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use alloy::signers::local::PrivateKeySigner; - use base64::{engine::general_purpose::STANDARD, Engine}; - use indexer_dips::{ - alloy::core::primitives::{Address, FixedBytes, U256}, - alloy_rlp::{self}, - IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata, - }; - use thegraph_core::attestation::eip712_domain; - use uuid::Uuid; - - use crate::database::dips::{AgreementStore, InMemoryAgreementStore}; - - #[tokio::test] - async fn test_validate_and_create_agreement() -> anyhow::Result<()> { - let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string(); - let payee = PrivateKeySigner::random(); - let payee_addr = payee.address(); - let payer = PrivateKeySigner::random(); - let payer_addr = payer.address(); - - let metadata = SubgraphIndexingVoucherMetadata { - pricePerBlock: U256::from(10000_u64), - protocolNetwork: FixedBytes::left_padding_from("arbitrum-one".as_bytes()), - chainId: FixedBytes::left_padding_from("mainnet".as_bytes()), - deployment_ipfs_hash: deployment_id, - }; - - let voucher = IndexingAgreementVoucher { - payer: payer_addr, - payee: payee_addr, - service: Address(FixedBytes::ZERO), - maxInitialAmount: U256::from(10000_u64), - maxOngoingAmountPerEpoch: U256::from(10000_u64), - deadline: 1000, - maxEpochsPerCollection: 1000, - minEpochsPerCollection: 1000, - durationEpochs: 1000, - metadata: alloy_rlp::encode(metadata).into(), - }; - let domain = eip712_domain(0, Address::ZERO); - - let voucher = voucher.sign(&domain, payer)?; - let rlp_voucher = alloy_rlp::encode(voucher.clone()); - let b64 = STANDARD.encode(rlp_voucher); - let id = Uuid::now_v7(); - - let store = Arc::new(InMemoryAgreementStore::default()); - - let actual_id = super::validate_and_create_agreement( - store.clone(), - &domain, - id, - &payee_addr, - vec![payer_addr], - b64, - ) - .await - .unwrap(); - - let actual = store.get_by_id(actual_id).await.unwrap(); - - let actual_voucher = actual.unwrap(); - assert_eq!(voucher, actual_voucher); - Ok(()) - } -} diff --git a/crates/service/src/routes/mod.rs b/crates/service/src/routes/mod.rs index c68fef04..c9a4a50a 100644 --- a/crates/service/src/routes/mod.rs +++ b/crates/service/src/routes/mod.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub mod cost; -pub mod dips; mod health; mod request_handler; mod static_subgraph; diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index e44b661a..49e38c99 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -1,11 +1,18 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; +use core::time; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::anyhow; use axum::{extract::Request, serve, ServiceExt}; -use indexer_config::{Config, GraphNodeConfig, SubgraphConfig}; +use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig}; +use indexer_dips::{ + proto::graphprotocol::indexer::dips::agreement_service_server::{ + AgreementService, AgreementServiceServer, + }, + server::DipsServer, +}; use indexer_monitor::{DeploymentDetails, SubgraphClient}; use release::IndexerServiceRelease; use reqwest::Url; @@ -13,7 +20,11 @@ use tap_core::tap_eip712_domain; use tokio::{net::TcpListener, signal}; use tower_http::normalize_path::NormalizePath; -use crate::{cli::Cli, database, metrics::serve_metrics}; +use crate::{ + cli::Cli, + database::{self, dips::PsqlAgreementStore}, + metrics::serve_metrics, +}; use clap::Parser; use tracing::{error, info}; @@ -92,14 +103,13 @@ pub async fn run() -> anyhow::Result<()> { let host_and_port = config.service.host_and_port; let router = ServiceRouter::builder() - .database(database) - .domain_separator(domain_separator) + .database(database.clone()) + .domain_separator(domain_separator.clone()) .graph_node(config.graph_node) .http_client(http_client) .release(release) .indexer(config.indexer) .service(config.service) - .dips(config.dips) .blockchain(config.blockchain) .timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs) .network_subgraph(network_subgraph, config.subgraphs.network) @@ -108,6 +118,37 @@ pub async fn run() -> anyhow::Result<()> { serve_metrics(config.metrics.get_socket_addr()); + if let Some(dips) = config.dips.as_ref() { + let DipsConfig { + host, + port, + allowed_payers, + cancellation_time_tolerance, + expected_payee, + } = dips; + + let addr = format!("{}:{}", host, port) + .parse() + .expect("invalid dips host port"); + + let dips = DipsServer { + agreement_store: Arc::new(PsqlAgreementStore { pool: database }), + expected_payee: *expected_payee, + allowed_payers: allowed_payers.clone(), + domain: domain_separator, + cancel_voucher_time_tolerance: cancellation_time_tolerance + .unwrap_or(time::Duration::from_secs(60 * 5)), + }; + + info!("starting dips grpc server on {}", addr); + + tokio::spawn(async move { + info!("starting dips grpc server on {}", addr); + + start_dips_server(addr, dips).await; + }); + } + info!( address = %host_and_port, "Serving requests", @@ -124,6 +165,13 @@ pub async fn run() -> anyhow::Result<()> { .with_graceful_shutdown(shutdown_handler()) .await?) } +async fn start_dips_server(addr: SocketAddr, service: impl AgreementService) { + tonic::transport::Server::builder() + .add_service(AgreementServiceServer::new(service)) + .serve(addr) + .await + .expect("unable to start dips grpc"); +} async fn create_subgraph_client( http_client: reqwest::Client, diff --git a/crates/service/src/service/router.rs b/crates/service/src/service/router.rs index 3ecd957c..fa01c6c9 100644 --- a/crates/service/src/service/router.rs +++ b/crates/service/src/service/router.rs @@ -14,8 +14,8 @@ use axum::{ }; use governor::{clock::QuantaInstant, middleware::NoOpMiddleware}; use indexer_config::{ - BlockchainConfig, DipsConfig, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, - NetworkSubgraphConfig, ServiceConfig, ServiceTapConfig, + BlockchainConfig, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig, + ServiceConfig, ServiceTapConfig, }; use indexer_monitor::{ attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts, @@ -38,7 +38,6 @@ use tracing::{info, info_span, warn}; use typed_builder::TypedBuilder; use crate::{ - database::dips::{AgreementStore, InMemoryAgreementStore}, metrics::{FAILED_RECEIPT, HANDLER_HISTOGRAM}, middleware::{ allocation_middleware, attestation_middleware, @@ -47,11 +46,7 @@ use crate::{ sender_middleware, signer_middleware, AllocationState, AttestationState, PrometheusMetricsMiddlewareLayer, SenderState, }, - routes::{ - self, - dips::{self, Price}, - health, request_handler, static_subgraph_request_handler, - }, + routes::{self, health, request_handler, static_subgraph_request_handler}, tap::IndexerTapContext, wallet::public_key, }; @@ -76,8 +71,6 @@ pub struct ServiceRouter { service: ServiceConfig, blockchain: BlockchainConfig, timestamp_buffer_secs: Duration, - #[builder(default)] - dips: Option, // either provide subgraph or watcher #[builder(default, setter(transform = @@ -135,24 +128,6 @@ impl ServiceRouter { // STATUS let post_status = post(routes::status); - // DIPS - let agreement_store: Arc = Arc::new(InMemoryAgreementStore::default()); - let prices: Vec = vec![]; - - let dips = match self.dips.as_ref() { - Some(dips_config) => { - let schema = dips::build_schema( - indexer_address, - dips_config, - &self.blockchain, - agreement_store, - prices, - ); - Router::new().route(DEFAULT_ROUTE, post_service(GraphQL::new(schema))) - } - None => Router::new(), - }; - // Monitor the indexer's own allocations // if not provided, create monitor from subgraph let allocations = match (self.allocations, self.network_subgraph.as_ref()) { @@ -409,7 +384,6 @@ impl ServiceRouter { .nest("/version", version) .nest("/escrow", serve_escrow_subgraph) .nest("/network", serve_network_subgraph) - .nest("/dips", dips) .route( "/subgraph/health/:deployment_id", get(health).with_state(graphnode_state.clone()), diff --git a/crates/test-assets/src/lib.rs b/crates/test-assets/src/lib.rs index bd766383..47499b17 100644 --- a/crates/test-assets/src/lib.rs +++ b/crates/test-assets/src/lib.rs @@ -65,7 +65,6 @@ macro_rules! assert_while_retry { /// - (createdAtEpoch-1, 1) /// /// Using https://github.com/graphprotocol/indexer/blob/f8786c979a8ed0fae93202e499f5ce25773af473/packages/indexer-common/src/allocations/keys.ts#L41-L71 - pub const ESCROW_QUERY_RESPONSE: &str = r#" { "data": {