diff --git a/.github/workflows/license_headers_check.yml b/.github/workflows/license_headers_check.yml
index c4bd8363..9b1baf9b 100644
--- a/.github/workflows/license_headers_check.yml
+++ b/.github/workflows/license_headers_check.yml
@@ -29,4 +29,6 @@ jobs:
-ignore '.github/workflows/*.yaml' \
-ignore '.github/*.yaml' \
-ignore 'migrations/*.sql' \
+ -ignore 'crates/dips/proto/*.proto' \
+ -ignore 'crates/dips/src/proto/*.rs' \
.
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index ec8242e8..2f265f3c 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -50,6 +50,9 @@ jobs:
- name: Run sccache-cache
uses: mozilla-actions/sccache-action@9e326ebed976843c9932b3aa0e021c6f50310eb4 # v0.0.6
if: ${{ !startsWith(github.head_ref, 'renovate/') }}
+ - name: Install protobuf
+ run: |
+ apt-get update && apt-get install protobuf-compiler -y
- name: Install sqlx
run: cargo install sqlx-cli --no-default-features --features postgres
- name: Run the test sqlx migrations
@@ -66,6 +69,9 @@ jobs:
DATABASE_URL: postgres://postgres@postgres:5432
SQLX_OFFLINE: true
steps:
+ - name: Install protobuf
+ run: |
+ apt-get update && apt-get install protobuf-compiler -y
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
- name: Cache dependencies
uses: Swatinem/rust-cache@82a92a6e8fbeee089604da2575dc567ae9ddeaab # v2.7.5
diff --git a/Cargo.lock b/Cargo.lock
index b708a3b1..6309f0bf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2601,6 +2601,12 @@ dependencies = [
"static_assertions",
]
+[[package]]
+name = "fixedbitset"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
+
[[package]]
name = "flate2"
version = "1.0.35"
@@ -3601,8 +3607,16 @@ dependencies = [
"alloy-rlp",
"alloy-sol-types",
"anyhow",
+ "async-trait",
+ "base64 0.22.1",
+ "prost",
+ "prost-types",
"thegraph-core",
"thiserror 1.0.69",
+ "tokio",
+ "tonic",
+ "tonic-build",
+ "uuid",
]
[[package]]
@@ -3688,6 +3702,7 @@ dependencies = [
"tokio",
"tokio-test",
"tokio-util",
+ "tonic",
"tower 0.5.1",
"tower-http",
"tower-service",
@@ -4337,6 +4352,12 @@ dependencies = [
"version_check",
]
+[[package]]
+name = "multimap"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
+
[[package]]
name = "native-tls"
version = "0.2.12"
@@ -4818,6 +4839,16 @@ dependencies = [
"ucd-trie",
]
+[[package]]
+name = "petgraph"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
+dependencies = [
+ "fixedbitset",
+ "indexmap 2.7.0",
+]
+
[[package]]
name = "pharos"
version = "0.5.3"
@@ -4938,6 +4969,16 @@ dependencies = [
"yansi",
]
+[[package]]
+name = "prettyplease"
+version = "0.2.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033"
+dependencies = [
+ "proc-macro2",
+ "syn 2.0.90",
+]
+
[[package]]
name = "primitive-types"
version = "0.12.2"
@@ -5070,6 +5111,27 @@ dependencies = [
"prost-derive",
]
+[[package]]
+name = "prost-build"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
+dependencies = [
+ "bytes",
+ "heck 0.5.0",
+ "itertools 0.13.0",
+ "log",
+ "multimap",
+ "once_cell",
+ "petgraph",
+ "prettyplease",
+ "prost",
+ "prost-types",
+ "regex",
+ "syn 2.0.90",
+ "tempfile",
+]
+
[[package]]
name = "prost-derive"
version = "0.13.3"
@@ -7140,6 +7202,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
+ "flate2",
"h2 0.4.7",
"http 1.1.0",
"http-body 1.0.1",
@@ -7162,6 +7225,20 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "tonic-build"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11"
+dependencies = [
+ "prettyplease",
+ "proc-macro2",
+ "prost-build",
+ "prost-types",
+ "quote",
+ "syn 2.0.90",
+]
+
[[package]]
name = "tower"
version = "0.4.13"
diff --git a/Cargo.toml b/Cargo.toml
index 21f073b5..9cbf689c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -38,7 +38,7 @@ tokio = "1.40"
prometheus = "0.13.3"
anyhow = { version = "1.0.72" }
thiserror = "1.0.49"
-async-trait = "0.1.72"
+async-trait = "0.1.83"
eventuals = "0.6.7"
base64 = "0.22.1"
reqwest = { version = "0.12", features = [
@@ -78,3 +78,7 @@ bip39 = "2.0.0"
rstest = "0.23.0"
wiremock = "0.6.1"
typed-builder = "0.20.0"
+tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
+tonic-build = { version = "0.12.3", features = ["prost"] }
+prost = "0.13.3"
+prost-types = "0.13.3"
diff --git a/Dockerfile.indexer-service-rs b/Dockerfile.indexer-service-rs
index a6518f27..90a6454c 100644
--- a/Dockerfile.indexer-service-rs
+++ b/Dockerfile.indexer-service-rs
@@ -6,6 +6,8 @@ COPY . .
# Force SQLx to use the offline mode to statically check the database queries against
# the prepared files in the `.sqlx` directory.
ENV SQLX_OFFLINE=true
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ protobuf-compiler && rm -rf /var/lib/apt/lists/*
RUN cargo build --release --bin indexer-service-rs
########################################################################################
@@ -13,7 +15,7 @@ RUN cargo build --release --bin indexer-service-rs
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
- openssl ca-certificates \
+ openssl ca-certificates protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*
COPY --from=build /root/target/release/indexer-service-rs /usr/local/bin/indexer-service-rs
diff --git a/Dockerfile.indexer-tap-agent b/Dockerfile.indexer-tap-agent
index 306944b3..3663c9d4 100644
--- a/Dockerfile.indexer-tap-agent
+++ b/Dockerfile.indexer-tap-agent
@@ -6,6 +6,8 @@ COPY . .
# Force SQLx to use the offline mode to statically check the database queries against
# the prepared files in the `.sqlx` directory.
ENV SQLX_OFFLINE=true
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ protobuf-compiler && rm -rf /var/lib/apt/lists/*
RUN cargo build --release --bin indexer-tap-agent
########################################################################################
diff --git a/crates/config/maximal-config-example.toml b/crates/config/maximal-config-example.toml
index 0efa9d57..b5c15332 100644
--- a/crates/config/maximal-config-example.toml
+++ b/crates/config/maximal-config-example.toml
@@ -147,4 +147,7 @@ max_receipts_per_request = 10000
0x0123456789abcdef0123456789abcdef01234567 = "https://other.example.com/aggregate-receipts"
[dips]
+host = "0.0.0.0"
+port = "7601"
allowed_payers = ["0x3333333333333333333333333333333333333333"]
+expected_payee = "0x0000000000000000000000000000000000000000"
diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs
index 1703d4e8..a74c90e0 100644
--- a/crates/config/src/config.rs
+++ b/crates/config/src/config.rs
@@ -386,10 +386,25 @@ pub struct TapConfig {
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct DipsConfig {
+ pub host: String,
+ pub port: String,
+ pub expected_payee: Address,
pub allowed_payers: Vec
,
pub cancellation_time_tolerance: Option,
}
+impl Default for DipsConfig {
+ fn default() -> Self {
+ DipsConfig {
+ host: "0.0.0.0".to_string(),
+ port: "7601".to_string(),
+ expected_payee: Address::ZERO,
+ allowed_payers: vec![],
+ cancellation_time_tolerance: None,
+ }
+ }
+}
+
impl TapConfig {
pub fn get_trigger_value(&self) -> u128 {
let grt_wei = self.max_amount_willing_to_lose_grt.get_value();
@@ -450,7 +465,8 @@ mod tests {
allowed_payers: vec![thegraph_core::Address(
FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(),
)],
- cancellation_time_tolerance: None,
+ expected_payee: thegraph_core::Address::ZERO,
+ ..Default::default()
});
let max_config_file: Config = toml::from_str(
diff --git a/crates/dips/Cargo.toml b/crates/dips/Cargo.toml
index 5ab10a85..c6d79e62 100644
--- a/crates/dips/Cargo.toml
+++ b/crates/dips/Cargo.toml
@@ -10,3 +10,13 @@ anyhow.workspace = true
alloy-sol-types = "=0.8.13"
alloy-rlp = "0.3.9"
thegraph-core.workspace = true
+tonic.workspace = true
+async-trait.workspace = true
+prost.workspace = true
+prost-types.workspace = true
+uuid.workspace = true
+base64.workspace = true
+tokio.workspace = true
+
+[build-dependencies]
+tonic-build = { workspace = true }
diff --git a/crates/dips/build.rs b/crates/dips/build.rs
new file mode 100644
index 00000000..2d5bfeb4
--- /dev/null
+++ b/crates/dips/build.rs
@@ -0,0 +1,9 @@
+fn main() {
+ println!("cargo:rerun-if-changed=proto");
+ tonic_build::configure()
+ .out_dir("src/proto")
+ .include_file("mod.rs")
+ .protoc_arg("--experimental_allow_proto3_optional")
+ .compile_protos(&["proto/dips.proto"], &["proto"])
+ .expect("Failed to compile dips proto(s)");
+}
diff --git a/crates/dips/proto/dips.proto b/crates/dips/proto/dips.proto
new file mode 100644
index 00000000..2f998f5e
--- /dev/null
+++ b/crates/dips/proto/dips.proto
@@ -0,0 +1,56 @@
+syntax = "proto3";
+
+package graphprotocol.indexer.dips;
+
+service AgreementService {
+ rpc CreateAgreement(CreateAgreementRequest) returns (CreateAgreementResponse);
+ rpc CancelAgreement(CancelAgreementRequest) returns (AgreementCanellationResponse);
+ rpc GetAgreementById(GetAgreementByIdRequest) returns (GetAgreementByIdResponse);
+ rpc GetPrice(PriceRequest) returns (PriceResponse);
+}
+
+message GetAgreementByIdRequest {
+
+}
+
+message GetAgreementByIdResponse {
+
+}
+
+message CreateAgreementRequest {
+ string id = 1;
+ bytes signed_voucher = 2;
+}
+
+message CancelAgreementRequest {
+ string id = 1;
+ bytes signed_voucher = 2;
+}
+
+message CreateAgreementResponse {
+ string uuid = 1;
+}
+
+message AgreementCanellationResponse {
+ string uuid = 1;
+}
+
+message PriceRequest {
+ ProtocolNetwork protocol = 1;
+ string chain_id = 2;
+}
+
+message PriceResponse {
+ optional Price price = 1;
+}
+
+message Price {
+ string price_per_block = 1;
+ string chain_id = 2;
+ ProtocolNetwork protocol = 3;
+}
+
+enum ProtocolNetwork {
+ UNKNOWN = 0;
+ EVM = 1;
+}
diff --git a/crates/dips/src/lib.rs b/crates/dips/src/lib.rs
index c073c439..4c103cd6 100644
--- a/crates/dips/src/lib.rs
+++ b/crates/dips/src/lib.rs
@@ -2,15 +2,21 @@
// SPDX-License-Identifier: Apache-2.0
use std::str::FromStr;
+use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use store::AgreementStore;
+use uuid::Uuid;
pub use alloy;
pub use alloy_rlp;
+pub mod proto;
+pub mod server;
+pub mod store;
use alloy::core::primitives::Address;
use alloy::rlp::{RlpDecodable, RlpEncodable};
use alloy::signers::{Signature, SignerSync};
-use alloy_rlp::Encodable;
+use alloy_rlp::{Decodable, Encodable};
use thegraph_core::alloy_sol_types::{sol, Eip712Domain, SolStruct};
use thiserror::Error;
@@ -75,14 +81,34 @@ sol! {
}
}
-#[derive(Error, Debug, PartialEq)]
-pub enum AgreementVoucherValidationError {
+#[derive(Error, Debug)]
+pub enum DipsError {
+ // agreement cration
#[error("signature is not valid, error: {0}")]
InvalidSignature(String),
#[error("payer {0} not authorised")]
PayerNotAuthorised(Address),
#[error("voucher payee {actual} does not match the expected address {expected}")]
UnexpectedPayee { expected: Address, actual: Address },
+ // cancellation
+ #[error("cancelled_by is expected to match the signer")]
+ UnexpectedSigner,
+ #[error("signer {0} not authorised")]
+ SignerNotAuthorised(Address),
+ #[error("cancellation request has expired")]
+ ExpiredRequest,
+ // misc
+ #[error("rlp (de)serialisation failed")]
+ RlpSerializationError(#[from] alloy_rlp::Error),
+ #[error("unknown error: {0}")]
+ UnknownError(#[from] anyhow::Error),
+}
+
+// TODO: send back messages
+impl Into for DipsError {
+ fn into(self) -> tonic::Status {
+ tonic::Status::internal("unknown errr")
+ }
}
impl IndexingAgreementVoucher {
@@ -107,22 +133,22 @@ impl SignedIndexingAgreementVoucher {
domain: &Eip712Domain,
expected_payee: &Address,
allowed_payers: impl AsRef<[Address]>,
- ) -> Result<(), AgreementVoucherValidationError> {
+ ) -> Result<(), DipsError> {
let sig = Signature::from_str(&self.signature.to_string())
- .map_err(|err| AgreementVoucherValidationError::InvalidSignature(err.to_string()))?;
+ .map_err(|err| DipsError::InvalidSignature(err.to_string()))?;
let payer = sig
.recover_address_from_prehash(&self.voucher.eip712_signing_hash(domain))
- .map_err(|err| AgreementVoucherValidationError::InvalidSignature(err.to_string()))?;
+ .map_err(|err| DipsError::InvalidSignature(err.to_string()))?;
if allowed_payers.as_ref().is_empty()
|| !allowed_payers.as_ref().iter().any(|addr| addr.eq(&payer))
{
- return Err(AgreementVoucherValidationError::PayerNotAuthorised(payer));
+ return Err(DipsError::PayerNotAuthorised(payer));
}
if !self.voucher.payee.eq(expected_payee) {
- return Err(AgreementVoucherValidationError::UnexpectedPayee {
+ return Err(DipsError::UnexpectedPayee {
expected: *expected_payee,
actual: self.voucher.payee,
});
@@ -139,18 +165,6 @@ impl SignedIndexingAgreementVoucher {
}
}
-#[derive(Error, Debug, PartialEq)]
-pub enum CancellationRequestValidationError {
- #[error("signature is not valid, error: {0}")]
- InvalidSignature(String),
- #[error("cancelled_by is expected to match the signer")]
- UnexpectedSigner,
- #[error("signer {0} not authorised")]
- SignerNotAuthorised(Address),
- #[error("cancellation request has expired")]
- ExpiredRequest,
-}
-
impl CancellationRequest {
pub fn sign(
&self,
@@ -172,30 +186,28 @@ impl SignedCancellationRequest {
&self,
domain: &Eip712Domain,
time_tolerance: Duration,
- ) -> Result<(), CancellationRequestValidationError> {
+ ) -> Result<(), DipsError> {
let sig = Signature::from_str(&self.signature.to_string())
- .map_err(|err| CancellationRequestValidationError::InvalidSignature(err.to_string()))?;
+ .map_err(|err| DipsError::InvalidSignature(err.to_string()))?;
let signer = sig
.recover_address_from_prehash(&self.request.eip712_signing_hash(domain))
- .map_err(|err| CancellationRequestValidationError::InvalidSignature(err.to_string()))?;
+ .map_err(|err| DipsError::InvalidSignature(err.to_string()))?;
if signer.ne(&self.request.cancellled_by) {
- return Err(CancellationRequestValidationError::UnexpectedSigner);
+ return Err(DipsError::UnexpectedSigner);
}
if signer.ne(&self.request.payer) && signer.ne(&self.request.payee) {
- return Err(CancellationRequestValidationError::SignerNotAuthorised(
- signer,
- ));
+ return Err(DipsError::SignerNotAuthorised(signer));
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
- .map_err(|_| CancellationRequestValidationError::ExpiredRequest)?
+ .map_err(|_| DipsError::ExpiredRequest)?
.as_secs();
if now - self.request.timestamp >= time_tolerance.as_secs() {
- return Err(CancellationRequestValidationError::ExpiredRequest);
+ return Err(DipsError::ExpiredRequest);
}
Ok(())
@@ -208,8 +220,46 @@ impl SignedCancellationRequest {
}
}
+pub async fn validate_and_create_agreement(
+ store: Arc,
+ domain: &Eip712Domain,
+ id: Uuid,
+ expected_payee: &Address,
+ allowed_payers: impl AsRef<[Address]>,
+ voucher: Vec,
+) -> Result {
+ let voucher = SignedIndexingAgreementVoucher::decode(&mut voucher.as_ref())?;
+ let metadata = SubgraphIndexingVoucherMetadata::decode(&mut voucher.voucher.metadata.as_ref())?;
+
+ voucher.validate(domain, expected_payee, allowed_payers)?;
+
+ store
+ .create_agreement(id, voucher, metadata.protocolNetwork.to_string())
+ .await?;
+
+ Ok(id)
+}
+
+pub async fn validate_and_cancel_agreement(
+ store: Arc,
+ domain: &Eip712Domain,
+ id: Uuid,
+ agreement: Vec,
+ time_tolerance: Duration,
+) -> Result {
+ let voucher = SignedCancellationRequest::decode(&mut agreement.as_ref())?;
+
+ voucher.validate(domain, time_tolerance)?;
+
+ store.cancel_agreement(id, voucher).await?;
+
+ Ok(id)
+}
+
#[cfg(test)]
mod test {
+ use std::sync::Arc;
+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use alloy::primitives::{Address, FixedBytes, U256};
@@ -218,9 +268,66 @@ mod test {
use thegraph_core::attestation::eip712_domain;
use crate::{
- AgreementVoucherValidationError, CancellationRequest, CancellationRequestValidationError,
+ alloy_rlp::{self},
IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
};
+ use crate::{CancellationRequest, DipsError};
+ use uuid::Uuid;
+
+ pub use crate::store::{AgreementStore, InMemoryAgreementStore};
+
+ #[tokio::test]
+ async fn test_validate_and_create_agreement() -> anyhow::Result<()> {
+ let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string();
+ let payee = PrivateKeySigner::random();
+ let payee_addr = payee.address();
+ let payer = PrivateKeySigner::random();
+ let payer_addr = payer.address();
+
+ let metadata = SubgraphIndexingVoucherMetadata {
+ pricePerBlock: U256::from(10000_u64),
+ protocolNetwork: FixedBytes::left_padding_from("arbitrum-one".as_bytes()),
+ chainId: FixedBytes::left_padding_from("mainnet".as_bytes()),
+ deployment_ipfs_hash: deployment_id,
+ };
+
+ let voucher = IndexingAgreementVoucher {
+ payer: payer_addr,
+ payee: payee_addr,
+ service: Address(FixedBytes::ZERO),
+ maxInitialAmount: U256::from(10000_u64),
+ maxOngoingAmountPerEpoch: U256::from(10000_u64),
+ deadline: 1000,
+ maxEpochsPerCollection: 1000,
+ minEpochsPerCollection: 1000,
+ durationEpochs: 1000,
+ metadata: alloy_rlp::encode(metadata).into(),
+ };
+ let domain = eip712_domain(0, Address::ZERO);
+
+ let voucher = voucher.sign(&domain, payer)?;
+ let rlp_voucher = alloy_rlp::encode(voucher.clone());
+ let id = Uuid::now_v7();
+
+ let store = Arc::new(InMemoryAgreementStore::default());
+
+ let actual_id = super::validate_and_create_agreement(
+ store.clone(),
+ &domain,
+ id,
+ &payee_addr,
+ vec![payer_addr],
+ rlp_voucher,
+ )
+ .await
+ .unwrap();
+
+ let actual = store.get_by_id(actual_id).await.unwrap();
+
+ let actual_voucher = actual.unwrap();
+ assert_eq!(voucher, actual_voucher);
+ Ok(())
+ }
#[test]
fn voucher_signature_verification() {
@@ -253,8 +360,11 @@ mod test {
let domain = eip712_domain(0, Address::ZERO);
let signed = voucher.sign(&domain, payer).unwrap();
assert_eq!(
- signed.validate(&domain, &payee_addr, vec![]).unwrap_err(),
- AgreementVoucherValidationError::PayerNotAuthorised(voucher.payer)
+ signed
+ .validate(&domain, &payee_addr, vec![])
+ .unwrap_err()
+ .to_string(),
+ DipsError::PayerNotAuthorised(voucher.payer).to_string()
);
assert!(signed
.validate(&domain, &payee_addr, vec![payer_addr])
@@ -297,7 +407,7 @@ mod test {
signed
.validate(&domain, &payee_addr, vec![payer_addr])
.unwrap_err(),
- AgreementVoucherValidationError::PayerNotAuthorised(_)
+ DipsError::PayerNotAuthorised(_)
));
}
@@ -325,7 +435,7 @@ mod test {
name: &'a str,
signer: PrivateKeySigner,
timestamp: u64,
- error: Option,
+ error: Option,
}
let cases: Vec = vec![
@@ -345,17 +455,13 @@ mod test {
name: "invalid signer",
signer: other_signer.clone(),
timestamp: now,
- error: Some(CancellationRequestValidationError::SignerNotAuthorised(
- other_signer.address(),
- )),
+ error: Some(DipsError::SignerNotAuthorised(other_signer.address())),
},
Case {
name: "expired timestamp",
signer: payee,
timestamp: 100,
- error: Some(CancellationRequestValidationError::SignerNotAuthorised(
- other_signer.address(),
- )),
+ error: Some(DipsError::SignerNotAuthorised(other_signer.address())),
},
];
diff --git a/crates/dips/src/proto/graphprotocol.indexer.dips.rs b/crates/dips/src/proto/graphprotocol.indexer.dips.rs
new file mode 100644
index 00000000..57fce10e
--- /dev/null
+++ b/crates/dips/src/proto/graphprotocol.indexer.dips.rs
@@ -0,0 +1,621 @@
+// This file is @generated by prost-build.
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
+pub struct GetAgreementByIdRequest {}
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
+pub struct GetAgreementByIdResponse {}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CreateAgreementRequest {
+ #[prost(string, tag = "1")]
+ pub id: ::prost::alloc::string::String,
+ #[prost(bytes = "vec", tag = "2")]
+ pub signed_voucher: ::prost::alloc::vec::Vec,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CancelAgreementRequest {
+ #[prost(string, tag = "1")]
+ pub id: ::prost::alloc::string::String,
+ #[prost(bytes = "vec", tag = "2")]
+ pub signed_voucher: ::prost::alloc::vec::Vec,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CreateAgreementResponse {
+ #[prost(string, tag = "1")]
+ pub uuid: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct AgreementCanellationResponse {
+ #[prost(string, tag = "1")]
+ pub uuid: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PriceRequest {
+ #[prost(enumeration = "ProtocolNetwork", tag = "1")]
+ pub protocol: i32,
+ #[prost(string, tag = "2")]
+ pub chain_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PriceResponse {
+ #[prost(message, optional, tag = "1")]
+ pub price: ::core::option::Option,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Price {
+ #[prost(string, tag = "1")]
+ pub price_per_block: ::prost::alloc::string::String,
+ #[prost(string, tag = "2")]
+ pub chain_id: ::prost::alloc::string::String,
+ #[prost(enumeration = "ProtocolNetwork", tag = "3")]
+ pub protocol: i32,
+}
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
+#[repr(i32)]
+pub enum ProtocolNetwork {
+ Unknown = 0,
+ Evm = 1,
+}
+impl ProtocolNetwork {
+ /// String value of the enum field names used in the ProtoBuf definition.
+ ///
+ /// The values are not transformed in any way and thus are considered stable
+ /// (if the ProtoBuf definition does not change) and safe for programmatic use.
+ pub fn as_str_name(&self) -> &'static str {
+ match self {
+ Self::Unknown => "UNKNOWN",
+ Self::Evm => "EVM",
+ }
+ }
+ /// Creates an enum from field names used in the ProtoBuf definition.
+ pub fn from_str_name(value: &str) -> ::core::option::Option {
+ match value {
+ "UNKNOWN" => Some(Self::Unknown),
+ "EVM" => Some(Self::Evm),
+ _ => None,
+ }
+ }
+}
+/// Generated client implementations.
+pub mod agreement_service_client {
+ #![allow(
+ unused_variables,
+ dead_code,
+ missing_docs,
+ clippy::wildcard_imports,
+ clippy::let_unit_value,
+ )]
+ use tonic::codegen::*;
+ use tonic::codegen::http::Uri;
+ #[derive(Debug, Clone)]
+ pub struct AgreementServiceClient {
+ inner: tonic::client::Grpc,
+ }
+ impl AgreementServiceClient {
+ /// Attempt to create a new client by connecting to a given endpoint.
+ pub async fn connect(dst: D) -> Result
+ where
+ D: TryInto,
+ D::Error: Into,
+ {
+ let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
+ Ok(Self::new(conn))
+ }
+ }
+ impl AgreementServiceClient
+ where
+ T: tonic::client::GrpcService,
+ T::Error: Into,
+ T::ResponseBody: Body + std::marker::Send + 'static,
+ ::Error: Into + std::marker::Send,
+ {
+ pub fn new(inner: T) -> Self {
+ let inner = tonic::client::Grpc::new(inner);
+ Self { inner }
+ }
+ pub fn with_origin(inner: T, origin: Uri) -> Self {
+ let inner = tonic::client::Grpc::with_origin(inner, origin);
+ Self { inner }
+ }
+ pub fn with_interceptor(
+ inner: T,
+ interceptor: F,
+ ) -> AgreementServiceClient>
+ where
+ F: tonic::service::Interceptor,
+ T::ResponseBody: Default,
+ T: tonic::codegen::Service<
+ http::Request,
+ Response = http::Response<
+ >::ResponseBody,
+ >,
+ >,
+ ,
+ >>::Error: Into + std::marker::Send + std::marker::Sync,
+ {
+ AgreementServiceClient::new(InterceptedService::new(inner, interceptor))
+ }
+ /// Compress requests with the given encoding.
+ ///
+ /// This requires the server to support it otherwise it might respond with an
+ /// error.
+ #[must_use]
+ pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.inner = self.inner.send_compressed(encoding);
+ self
+ }
+ /// Enable decompressing responses.
+ #[must_use]
+ pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.inner = self.inner.accept_compressed(encoding);
+ self
+ }
+ /// Limits the maximum size of a decoded message.
+ ///
+ /// Default: `4MB`
+ #[must_use]
+ pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+ self.inner = self.inner.max_decoding_message_size(limit);
+ self
+ }
+ /// Limits the maximum size of an encoded message.
+ ///
+ /// Default: `usize::MAX`
+ #[must_use]
+ pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+ self.inner = self.inner.max_encoding_message_size(limit);
+ self
+ }
+ pub async fn create_agreement(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result<
+ tonic::Response,
+ tonic::Status,
+ > {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::unknown(
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/graphprotocol.indexer.dips.AgreementService/CreateAgreement",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "graphprotocol.indexer.dips.AgreementService",
+ "CreateAgreement",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
+ }
+ pub async fn cancel_agreement(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result<
+ tonic::Response,
+ tonic::Status,
+ > {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::unknown(
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/graphprotocol.indexer.dips.AgreementService/CancelAgreement",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "graphprotocol.indexer.dips.AgreementService",
+ "CancelAgreement",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
+ }
+ pub async fn get_agreement_by_id(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result<
+ tonic::Response,
+ tonic::Status,
+ > {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::unknown(
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/graphprotocol.indexer.dips.AgreementService/GetAgreementById",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "graphprotocol.indexer.dips.AgreementService",
+ "GetAgreementById",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
+ }
+ pub async fn get_price(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::unknown(
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/graphprotocol.indexer.dips.AgreementService/GetPrice",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "graphprotocol.indexer.dips.AgreementService",
+ "GetPrice",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
+ }
+ }
+}
+/// Generated server implementations.
+pub mod agreement_service_server {
+ #![allow(
+ unused_variables,
+ dead_code,
+ missing_docs,
+ clippy::wildcard_imports,
+ clippy::let_unit_value,
+ )]
+ use tonic::codegen::*;
+ /// Generated trait containing gRPC methods that should be implemented for use with AgreementServiceServer.
+ #[async_trait]
+ pub trait AgreementService: std::marker::Send + std::marker::Sync + 'static {
+ async fn create_agreement(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result<
+ tonic::Response,
+ tonic::Status,
+ >;
+ async fn cancel_agreement(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result<
+ tonic::Response,
+ tonic::Status,
+ >;
+ async fn get_agreement_by_id(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result<
+ tonic::Response,
+ tonic::Status,
+ >;
+ async fn get_price(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result, tonic::Status>;
+ }
+ #[derive(Debug)]
+ pub struct AgreementServiceServer {
+ inner: Arc,
+ accept_compression_encodings: EnabledCompressionEncodings,
+ send_compression_encodings: EnabledCompressionEncodings,
+ max_decoding_message_size: Option,
+ max_encoding_message_size: Option,
+ }
+ impl AgreementServiceServer {
+ pub fn new(inner: T) -> Self {
+ Self::from_arc(Arc::new(inner))
+ }
+ pub fn from_arc(inner: Arc) -> Self {
+ Self {
+ inner,
+ accept_compression_encodings: Default::default(),
+ send_compression_encodings: Default::default(),
+ max_decoding_message_size: None,
+ max_encoding_message_size: None,
+ }
+ }
+ pub fn with_interceptor(
+ inner: T,
+ interceptor: F,
+ ) -> InterceptedService
+ where
+ F: tonic::service::Interceptor,
+ {
+ InterceptedService::new(Self::new(inner), interceptor)
+ }
+ /// Enable decompressing requests with the given encoding.
+ #[must_use]
+ pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.accept_compression_encodings.enable(encoding);
+ self
+ }
+ /// Compress responses with the given encoding, if the client supports it.
+ #[must_use]
+ pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.send_compression_encodings.enable(encoding);
+ self
+ }
+ /// Limits the maximum size of a decoded message.
+ ///
+ /// Default: `4MB`
+ #[must_use]
+ pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+ self.max_decoding_message_size = Some(limit);
+ self
+ }
+ /// Limits the maximum size of an encoded message.
+ ///
+ /// Default: `usize::MAX`
+ #[must_use]
+ pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+ self.max_encoding_message_size = Some(limit);
+ self
+ }
+ }
+ impl tonic::codegen::Service> for AgreementServiceServer
+ where
+ T: AgreementService,
+ B: Body + std::marker::Send + 'static,
+ B::Error: Into + std::marker::Send + 'static,
+ {
+ type Response = http::Response;
+ type Error = std::convert::Infallible;
+ type Future = BoxFuture;
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+ fn call(&mut self, req: http::Request) -> Self::Future {
+ match req.uri().path() {
+ "/graphprotocol.indexer.dips.AgreementService/CreateAgreement" => {
+ #[allow(non_camel_case_types)]
+ struct CreateAgreementSvc(pub Arc);
+ impl<
+ T: AgreementService,
+ > tonic::server::UnaryService
+ for CreateAgreementSvc {
+ type Response = super::CreateAgreementResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move {
+ ::create_agreement(&inner, request)
+ .await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let method = CreateAgreementSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/graphprotocol.indexer.dips.AgreementService/CancelAgreement" => {
+ #[allow(non_camel_case_types)]
+ struct CancelAgreementSvc(pub Arc);
+ impl<
+ T: AgreementService,
+ > tonic::server::UnaryService
+ for CancelAgreementSvc {
+ type Response = super::AgreementCanellationResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move {
+ ::cancel_agreement(&inner, request)
+ .await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let method = CancelAgreementSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/graphprotocol.indexer.dips.AgreementService/GetAgreementById" => {
+ #[allow(non_camel_case_types)]
+ struct GetAgreementByIdSvc(pub Arc);
+ impl<
+ T: AgreementService,
+ > tonic::server::UnaryService
+ for GetAgreementByIdSvc {
+ type Response = super::GetAgreementByIdResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move {
+ ::get_agreement_by_id(
+ &inner,
+ request,
+ )
+ .await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let method = GetAgreementByIdSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/graphprotocol.indexer.dips.AgreementService/GetPrice" => {
+ #[allow(non_camel_case_types)]
+ struct GetPriceSvc(pub Arc);
+ impl<
+ T: AgreementService,
+ > tonic::server::UnaryService
+ for GetPriceSvc {
+ type Response = super::PriceResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move {
+ ::get_price(&inner, request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let method = GetPriceSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ _ => {
+ Box::pin(async move {
+ let mut response = http::Response::new(empty_body());
+ let headers = response.headers_mut();
+ headers
+ .insert(
+ tonic::Status::GRPC_STATUS,
+ (tonic::Code::Unimplemented as i32).into(),
+ );
+ headers
+ .insert(
+ http::header::CONTENT_TYPE,
+ tonic::metadata::GRPC_CONTENT_TYPE,
+ );
+ Ok(response)
+ })
+ }
+ }
+ }
+ }
+ impl Clone for AgreementServiceServer {
+ fn clone(&self) -> Self {
+ let inner = self.inner.clone();
+ Self {
+ inner,
+ accept_compression_encodings: self.accept_compression_encodings,
+ send_compression_encodings: self.send_compression_encodings,
+ max_decoding_message_size: self.max_decoding_message_size,
+ max_encoding_message_size: self.max_encoding_message_size,
+ }
+ }
+ }
+ /// Generated gRPC service name
+ pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.AgreementService";
+ impl tonic::server::NamedService for AgreementServiceServer {
+ const NAME: &'static str = SERVICE_NAME;
+ }
+}
diff --git a/crates/dips/src/proto/mod.rs b/crates/dips/src/proto/mod.rs
new file mode 100644
index 00000000..8edb4be2
--- /dev/null
+++ b/crates/dips/src/proto/mod.rs
@@ -0,0 +1,8 @@
+// This file is @generated by prost-build.
+pub mod graphprotocol {
+ pub mod indexer {
+ pub mod dips {
+ include!("graphprotocol.indexer.dips.rs");
+ }
+ }
+}
diff --git a/crates/dips/src/server.rs b/crates/dips/src/server.rs
new file mode 100644
index 00000000..3cd5dc58
--- /dev/null
+++ b/crates/dips/src/server.rs
@@ -0,0 +1,88 @@
+// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
+// SPDX-License-Identifier: Apache-2.0
+
+use std::{str::FromStr, sync::Arc, time::Duration};
+
+use crate::{
+ proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement,
+ validate_and_create_agreement, DipsError,
+};
+use alloy_sol_types::Eip712Domain;
+use anyhow::anyhow;
+use async_trait::async_trait;
+use thegraph_core::Address;
+use uuid::Uuid;
+
+#[derive(Debug)]
+pub struct DipsServer {
+ pub agreement_store: Arc,
+ pub expected_payee: Address,
+ pub allowed_payers: Vec,
+ pub domain: Eip712Domain,
+ pub cancel_voucher_time_tolerance: Duration,
+}
+
+#[async_trait]
+impl agreement_service_server::AgreementService for DipsServer {
+ async fn create_agreement(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result, tonic::Status> {
+ let CreateAgreementRequest { id, signed_voucher } = request.into_inner();
+ let uid = Uuid::from_str(&id).map_err(|_| {
+ Into::::into(DipsError::from(anyhow!("failed to parse uuid")))
+ })?;
+
+ validate_and_create_agreement(
+ self.agreement_store.clone(),
+ &self.domain,
+ uid,
+ &self.expected_payee,
+ &self.allowed_payers,
+ signed_voucher,
+ )
+ .await
+ .map_err(Into::::into)?;
+
+ Ok(tonic::Response::new(CreateAgreementResponse {
+ uuid: uid.to_string(),
+ }))
+ }
+
+ async fn cancel_agreement(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result, tonic::Status> {
+ let CancelAgreementRequest { id, signed_voucher } = request.into_inner();
+ let uid = Uuid::from_str(&id).map_err(|_| {
+ Into::::into(DipsError::from(anyhow!("failed to parse uuid")))
+ })?;
+
+ validate_and_cancel_agreement(
+ self.agreement_store.clone(),
+ &self.domain,
+ uid,
+ signed_voucher,
+ self.cancel_voucher_time_tolerance,
+ )
+ .await
+ .map_err(Into::::into)?;
+
+ Ok(tonic::Response::new(AgreementCanellationResponse {
+ uuid: uid.to_string(),
+ }))
+ }
+ async fn get_agreement_by_id(
+ &self,
+ _request: tonic::Request,
+ ) -> std::result::Result, tonic::Status> {
+ todo!()
+ }
+
+ async fn get_price(
+ &self,
+ _request: tonic::Request,
+ ) -> std::result::Result, tonic::Status> {
+ todo!()
+ }
+}
diff --git a/crates/dips/src/store.rs b/crates/dips/src/store.rs
new file mode 100644
index 00000000..7453b484
--- /dev/null
+++ b/crates/dips/src/store.rs
@@ -0,0 +1,56 @@
+// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
+// SPDX-License-Identifier: Apache-2.0
+
+use std::collections::HashMap;
+
+use async_trait::async_trait;
+use uuid::Uuid;
+
+use crate::{SignedCancellationRequest, SignedIndexingAgreementVoucher};
+
+#[async_trait]
+pub trait AgreementStore: Sync + Send + std::fmt::Debug {
+ async fn get_by_id(&self, id: Uuid) -> anyhow::Result