Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Oct 21, 2023
1 parent 2e682a4 commit b12ddb0
Show file tree
Hide file tree
Showing 16 changed files with 444 additions and 171 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions migrations/20230915230734_tap_ravs.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ CREATE TABLE IF NOT EXISTS scalar_tap_latest_ravs (
allocation_id CHAR(40) NOT NULL,
sender_address CHAR(40) NOT NULL,
rav JSON NOT NULL,
is_last BOOLEAN DEFAULT FALSE NOT NULL,
PRIMARY KEY (allocation_id, sender_address)
);
2 changes: 2 additions & 0 deletions tap_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log = "0.4.19"
reqwest = "0.11.20"
serde = "1.0.188"
serde_json = "1.0.104"
serde_yaml = "0.9.25"
sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal"] }
tap_aggregator = "0.1.6"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol.git", rev = "882ca394444b451538908b9996bf7d45869a1bb9" }
Expand All @@ -47,3 +48,4 @@ ethers-signers = "2.0.8"
faux = "0.1.10"
indexer-common = { path = "../common", features = ["mock"] }
rstest = "0.18.1"
tempfile = "3.8.0"
9 changes: 8 additions & 1 deletion tap_agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Duration;

use crate::{config, database, tap::managers};
use alloy_primitives::Address;
use indexer_common::prelude::{escrow_accounts, indexer_allocations, SubgraphClient};

use crate::{aggregator_endpoints, config, database, tap::managers};

pub async fn start_agent(config: &'static config::Cli) {
let pgpool = database::connect(&config.postgres).await;

Expand Down Expand Up @@ -38,6 +39,11 @@ pub async fn start_agent(config: &'static config::Cli) {
Duration::from_secs(config.escrow_subgraph.escrow_syncing_interval),
);

// TODO: replace with a proper implementation once the gateway registry is ready
let sender_aggregator_endpoints = aggregator_endpoints::load_aggregator_endpoints(
config.tap.sender_aggregator_endpoints_file.clone(),
);

let _managers = managers::TapManagers::new(
config,
pgpool,
Expand All @@ -50,6 +56,7 @@ pub async fn start_agent(config: &'static config::Cli) {
String::from("0"),
// TODO: tap eip712 verifying contract config
Address::ZERO,
sender_aggregator_endpoints,
)
.await;
}
44 changes: 44 additions & 0 deletions tap_agent/src/aggregator_endpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/// Load a hashmap of sender addresses and their corresponding aggregator endpoints
/// from a yaml file. We're using serde_yaml.
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;

use alloy_primitives::Address;

/// Load a hashmap of sender addresses and their corresponding aggregator endpoints
/// from a yaml file. We're using serde_yaml.
pub fn load_aggregator_endpoints(file_path: PathBuf) -> HashMap<Address, String> {
let file = File::open(file_path).unwrap();
let reader = BufReader::new(file);
let endpoints: HashMap<Address, String> = serde_yaml::from_reader(reader).unwrap();
endpoints
}

#[cfg(test)]
mod tests {
use std::{io::Write, str::FromStr};

use super::*;

/// Test that we can load the aggregator endpoints from a yaml file.
/// The test is going to create a temporary yaml file using tempfile, load it, and
/// check that the endpoints are loaded correctly.
#[test]
fn test_load_aggregator_endpoints() {
let named_temp_file = tempfile::NamedTempFile::new().unwrap();
let mut temp_file = named_temp_file.reopen().unwrap();
let yaml = r#"
0xdeadbeefcafebabedeadbeefcafebabedeadbeef: https://example.com/aggregate-receipts
0x0123456789abcdef0123456789abcdef01234567: https://other.example.com/aggregate-receipts
"#;
temp_file.write_all(yaml.as_bytes()).unwrap();
let endpoints = load_aggregator_endpoints(named_temp_file.path().to_path_buf());
assert_eq!(
endpoints
.get(&Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap()),
Some(&"https://example.com/aggregate-receipts".to_string())
);
}
}
18 changes: 14 additions & 4 deletions tap_agent/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0
use std::path::PathBuf;

use alloy_primitives::Address;
use clap::{command, Args, Parser, ValueEnum};
use dotenvy::dotenv;
Expand Down Expand Up @@ -261,8 +262,8 @@ pub struct Tap {
long,
value_name = "rav-request-trigger-value",
env = "RAV_REQUEST_TRIGGER_VALUE",
help = "Value of unaggregated fees that triggers a RAV request (in GRT).",
default_value_t = 10
help = "Value of unaggregated fees that triggers a RAV request (in GRT wei).",
default_value_t = 10_000_000_000_000_000_000 // 10 GRT
)]
pub rav_request_trigger_value: u64,
#[clap(
Expand All @@ -271,9 +272,18 @@ pub struct Tap {
env = "RAV_REQUEST_TIMESTAMP_BUFFER",
help = "Buffer (in ns) to add between the current time and the timestamp of the \
last unaggregated fee when triggering a RAV request.",
default_value_t = 1000
default_value_t = 1_000_000_000 // 1 second
)]
pub rav_request_timestamp_buffer_ns: u64,

// TODO: Remove this whenever the the gateway registry is ready
#[clap(
long,
value_name = "sender-aggregator-endpoints",
env = "SENDER_AGGREGATOR_ENDPOINTS",
help = "YAML file with a map of sender addresses to aggregator endpoints."
)]
pub sender_aggregator_endpoints_file: PathBuf,
}

/// Sets up tracing, allows log level to be set from the environment variables
Expand Down
9 changes: 5 additions & 4 deletions tap_agent/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier:
// Apache-2.0 TODO: DEDUPLICATE
use crate::config;
use sqlx::{postgres::PgPoolOptions, PgPool};
// TODO: DEDUPLICATE
use std::time::Duration;

use sqlx::{postgres::PgPoolOptions, PgPool};
use tracing::debug;

use crate::config;

pub async fn connect(config: &config::Postgres) -> PgPool {
let url = format!(
"postgresql://{}:{}@{}:{}/{}",
Expand Down
5 changes: 3 additions & 2 deletions tap_agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0
use crate::config::Cli;
use anyhow::Result;
use lazy_static::lazy_static;
use log::{debug, info};
use tokio::signal::unix::{signal, SignalKind};

use crate::config::Cli;

mod agent;
mod aggregator_endpoints;
mod config;
mod database;
mod tap;
Expand Down
5 changes: 2 additions & 3 deletions tap_agent/src/tap/escrow_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{collections::HashMap, sync::Arc};

use alloy_primitives::Address;
use async_trait::async_trait;
use ethereum_types::U256;
use eventuals::Eventual;

/// TODO: Implement the escrow adapter. This is only a basic mock implementation.
use std::{collections::HashMap, sync::Arc};
use tap_core::adapters::escrow_adapter::EscrowAdapter as EscrowAdapterTrait;
use thiserror::Error;
use tokio::sync::RwLock;
Expand Down
Loading

0 comments on commit b12ddb0

Please sign in to comment.