Skip to content

Commit

Permalink
Introduce webhook callback in account sync (#998)
Browse files Browse the repository at this point in the history
* Introduce webhook callback in account sync

Signed-off-by: sugargoat <[email protected]>

* Plumbing to introduce webhook thread

Signed-off-by: sugargoat <[email protected]>

* Webhook reqwest functioning

Signed-off-by: sugargoat <[email protected]>

* Clean up

Signed-off-by: sugargoat <[email protected]>

* Updating to use URL + documentation

Signed-off-by: sugargoat <[email protected]>

* Smaller cargo.lock footprint

Signed-off-by: sugargoat <[email protected]>

* Fix linter - sort dependencies

Signed-off-by: sugargoat <[email protected]>

* Plumbing to introduce webhook thread

Signed-off-by: sugargoat <[email protected]>

* Clean up logging

Signed-off-by: sugargoat <[email protected]>

* Rebased on main, some more logging fixups

Signed-off-by: sugargoat <[email protected]>

* Webhook now receives post of list of accounts, and only when they are fully synced

Signed-off-by: sugargoat <[email protected]>

* Cleaning up logging and fixing up documentation

Signed-off-by: sugargoat <[email protected]>

* Move WebhookThread to its own file

Signed-off-by: sugargoat <[email protected]>

* Set poll_interval to match config for full-service polling

Signed-off-by: sugargoat <[email protected]>

* Refactor test_utils to be a little less invasive

Signed-off-by: sugargoat <[email protected]>

* Clean up test documentation

Signed-off-by: sugargoat <[email protected]>

* minor fixups and remove webhook restart notification

---------

Signed-off-by: sugargoat <[email protected]>
Co-authored-by: Henry Holtzman <[email protected]>
  • Loading branch information
sugargoat and holtzman authored Jun 7, 2024
1 parent 65bb71e commit ba25891
Show file tree
Hide file tree
Showing 25 changed files with 1,236 additions and 87 deletions.
644 changes: 636 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions full-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ mc-fog-report-validation = { path = "../mobilecoin/fog/report/validation", featu
mc-fog-report-validation-test-utils = { path = "../mobilecoin/fog/report/validation/test-utils" }

bs58 = "0.5.0"
httpmock = "0.7.0"
tempdir = "0.3"
tokio = "1.27"
url = "2.3"
Expand Down
21 changes: 19 additions & 2 deletions full-service/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mc_consensus_scp::QuorumSet;
use mc_fog_report_resolver::FogResolver;
use mc_full_service::{
check_host,
config::{APIConfig, NetworkConfig},
config::{APIConfig, NetworkConfig, WebhookConfig},
wallet::{consensus_backed_rocket, validator_backed_rocket, APIKeyState, WalletState},
ValidatorLedgerSyncThread, WalletDb, WalletService,
};
Expand Down Expand Up @@ -117,17 +117,30 @@ fn rocket() -> Rocket<Build> {
tx_sources,
};

let webhook_config = config.deposits_webhook_url.clone().map(|wu| WebhookConfig {
url: wu,
poll_interval: config.poll_interval.clone(),
});

let rocket = if let Some(validator_uri) = config.validator.as_ref() {
validator_backed_full_service(
validator_uri,
&config,
network_config,
wallet_db,
rocket_config,
webhook_config,
logger,
)
} else {
consensus_backed_full_service(&config, network_config, wallet_db, rocket_config, logger)
consensus_backed_full_service(
&config,
network_config,
wallet_db,
rocket_config,
webhook_config,
logger,
)
};

let api_key = env::var("MC_API_KEY").unwrap_or_default();
Expand All @@ -139,6 +152,7 @@ fn consensus_backed_full_service(
network_config: NetworkConfig,
wallet_db: Option<WalletDb>,
rocket_config: rocket::Config,
webhook_config: Option<WebhookConfig>,
logger: Logger,
) -> Rocket<Build> {
// Create enclave trusted identity.
Expand Down Expand Up @@ -245,6 +259,7 @@ fn consensus_backed_full_service(
config.get_fog_resolver_factory(logger.clone()),
config.offline,
config.t3_sync_config.clone(),
webhook_config,
logger,
);

Expand All @@ -260,6 +275,7 @@ fn validator_backed_full_service(
network_config: NetworkConfig,
wallet_db: Option<WalletDb>,
rocket_config: rocket::Config,
webhook_config: Option<WebhookConfig>,
logger: Logger,
) -> Rocket<Build> {
if config.watcher_db.is_some() {
Expand Down Expand Up @@ -342,6 +358,7 @@ fn validator_backed_full_service(
}),
false,
config.t3_sync_config.clone(),
webhook_config,
logger,
);

Expand Down
35 changes: 35 additions & 0 deletions full-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mc_util_uri::{ConnectionUri, ConsensusClientUri, FogUri};
use mc_validator_api::ValidatorUri;

use clap::Parser;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::{
convert::TryFrom,
Expand Down Expand Up @@ -89,6 +90,33 @@ pub struct APIConfig {
/// T3 Server to connect to and the api key to use for authorization.
#[clap(flatten)]
pub t3_sync_config: T3Config,

/// Webhook configuration to notify an external server listening for
/// deposit notifications.
///
/// The format of the webhook is a POST request with the following query
/// parameters:
///
/// POST /webhook -H "Content-Type: application/json" \
/// -d '{"accounts": [A,B,C], "restart": false}'
///
/// The first time full-service is caught up with the network ledger,
/// it will send a webhook with {"restart": true, "accounts": [A,]}
/// where "accounts" may be empty.
///
/// Where the num_txos provided indicate how many txos were received
/// in the last scan period for any account in the wallet.
///
/// The expected action to take in response to the webhook is to call
/// the `get_txos` API endpoint for the given accounts to retrieve more
/// details about the TXOs received.
///
/// We expect a 200 response code to indicate that the webhook was
/// received, and we do not further inspect the response body. Even if
/// not a 200 response, we will continue to attempt to reach the webhook
/// on subsequent deposits.
#[clap(long, value_parser = Url::parse, env = "MC_DEPOSITS_WEBHOOK_URL")]
pub deposits_webhook_url: Option<Url>,
}

fn parse_quorum_set_from_json(src: &str) -> Result<QuorumSet<ResponderId>, String> {
Expand Down Expand Up @@ -392,3 +420,10 @@ impl LedgerDbConfig {
ledger_db
}
}

/// The Webhook Setup object.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct WebhookConfig {
pub url: Url,
pub poll_interval: Duration,
}
3 changes: 2 additions & 1 deletion full-service/src/db/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ use mc_core::slip10::Slip10KeyGenerator;
use mc_crypto_digestible::{Digestible, MerlinTranscript};
use mc_crypto_keys::{RistrettoPrivate, RistrettoPublic};
use mc_transaction_core::{get_tx_out_shared_secret, TokenId};
use serde_derive::Serialize;
use std::fmt;

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize)]
pub struct AccountID(pub String);

impl From<&AccountKey> for AccountID {
Expand Down
55 changes: 47 additions & 8 deletions full-service/src/db/transaction_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,11 @@ impl TransactionLogModel for TransactionLog {

#[cfg(test)]
mod tests {
use std::ops::DerefMut;
use std::{
collections::HashMap,
ops::DerefMut,
sync::{Arc, Mutex},
};

use mc_account_keys::{PublicAddress, CHANGE_SUBADDRESS_INDEX};
use mc_common::logger::{async_test_with_logger, Logger};
Expand Down Expand Up @@ -892,7 +896,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down Expand Up @@ -1075,7 +1084,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down Expand Up @@ -1166,7 +1180,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down Expand Up @@ -1275,7 +1294,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down Expand Up @@ -1350,7 +1374,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down Expand Up @@ -1582,7 +1611,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down Expand Up @@ -1813,7 +1847,12 @@ mod tests {
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);

// Start sync thread
let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone());
let _sync_thread = SyncThread::start(
ledger_db.clone(),
wallet_db.clone(),
Arc::new(Mutex::new(HashMap::<AccountID, bool>::new())),
logger.clone(),
);

let account_key = random_account_with_seed_values(
&wallet_db,
Expand Down
1 change: 1 addition & 0 deletions full-service/src/json_rpc/v1/api/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub fn create_test_setup(
get_resolver_factory(rng).unwrap(),
false,
T3Config::default(),
None,
logger,
);

Expand Down
33 changes: 29 additions & 4 deletions full-service/src/json_rpc/v2/api/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use rocket::{
use tempdir::TempDir;
use url::Url;

use crate::config::WebhookConfig;
use std::{
convert::TryFrom,
sync::{
Expand Down Expand Up @@ -138,6 +139,7 @@ pub fn create_test_setup(
mut rng: &mut StdRng,
use_wallet_db: bool,
use_watcher_db: bool,
webhook_config: Option<WebhookConfig>,
logger: Logger,
) -> (
rocket::Rocket<Build>,
Expand Down Expand Up @@ -178,6 +180,7 @@ pub fn create_test_setup(
get_resolver_factory(rng).unwrap(),
false,
T3Config::default(),
webhook_config,
logger,
);

Expand All @@ -201,7 +204,29 @@ pub fn setup(
Arc<RwLock<PollingNetworkState<MockBlockchainConnection<LedgerDB>>>>,
) {
let (rocket_instance, ledger_db, db_test_context, network_state) =
create_test_setup(rng, true, false, logger);
create_test_setup(rng, true, false, None, logger);

let rocket = rocket_instance.manage(APIKeyState("".to_string()));
(
Client::untracked(rocket).expect("valid rocket instance"),
ledger_db,
db_test_context,
network_state,
)
}

pub fn setup_with_webhook(
rng: &mut StdRng,
webhook_config: WebhookConfig,
logger: Logger,
) -> (
Client,
LedgerDB,
WalletDbTestContext,
Arc<RwLock<PollingNetworkState<MockBlockchainConnection<LedgerDB>>>>,
) {
let (rocket_instance, ledger_db, db_test_context, network_state) =
create_test_setup(rng, true, false, Some(webhook_config), logger);

let rocket = rocket_instance.manage(APIKeyState("".to_string()));
(
Expand All @@ -222,7 +247,7 @@ pub fn setup_with_watcher(
Arc<RwLock<PollingNetworkState<MockBlockchainConnection<LedgerDB>>>>,
) {
let (rocket_instance, ledger_db, db_test_context, network_state) =
create_test_setup(rng, true, true, logger);
create_test_setup(rng, true, true, None, logger);

let rocket = rocket_instance.manage(APIKeyState("".to_string()));
(
Expand All @@ -243,7 +268,7 @@ pub fn setup_no_wallet_db(
Arc<RwLock<PollingNetworkState<MockBlockchainConnection<LedgerDB>>>>,
) {
let (rocket_instance, ledger_db, db_test_context, network_state) =
create_test_setup(rng, false, false, logger);
create_test_setup(rng, false, false, None, logger);

let rocket = rocket_instance.manage(APIKeyState("".to_string()));
(
Expand All @@ -265,7 +290,7 @@ pub fn setup_with_api_key(
Arc<RwLock<PollingNetworkState<MockBlockchainConnection<LedgerDB>>>>,
) {
let (rocket_instance, ledger_db, db_test_context, network_state) =
create_test_setup(rng, true, false, logger);
create_test_setup(rng, true, false, None, logger);

let rocket = rocket_instance.manage(APIKeyState(api_key));

Expand Down
1 change: 1 addition & 0 deletions full-service/src/json_rpc/v2/e2e_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod account;
mod other;
mod transaction;
mod webhook;
Loading

0 comments on commit ba25891

Please sign in to comment.