From 67c8d8982b9ce33bb50302a4401daf017b507b7f Mon Sep 17 00:00:00 2001 From: DanGould Date: Thu, 11 Jan 2024 12:19:53 -0500 Subject: [PATCH] [Draft] run PayjoinApp --- src/api/mod.rs | 3 + src/api/server/mod.rs | 2 + src/app/mod.rs | 1 + src/cli/mod.rs | 7 -- src/payjoin/app.rs | 225 ++++++++++++++++++++++++-------------- src/payjoin/mod.rs | 60 +--------- src/payjoin/server/mod.rs | 65 +++++++---- src/primitives/mod.rs | 3 +- src/profile/entity.rs | 2 +- src/profile/repo.rs | 1 + 10 files changed, 199 insertions(+), 170 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 00f55cf7..ae47430b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -2,6 +2,7 @@ mod config; mod server; use crate::app::{error::*, *}; +use crate::payjoin::{error::*, *}; pub use config::*; pub use server::*; @@ -26,6 +27,7 @@ pub async fn run_dev( let profile = Profiles::new(&pool) .find_by_key(dev_constants::BRIA_DEV_KEY) .await?; + let pj = PayjoinApp::run(app.clone(), profile.clone()).await; let (_, xpubs) = app .create_wpkh_wallet( &profile, @@ -67,6 +69,7 @@ pub async fn run( app_cfg: AppConfig, ) -> Result<(), ApplicationError> { let app = App::run(pool, app_cfg).await?; + // let pj = PayjoinApp::run(app.clone()).await; server::start(config, app).await?; Ok(()) } diff --git a/src/api/server/mod.rs b/src/api/server/mod.rs index 57dbe305..c7cac361 100644 --- a/src/api/server/mod.rs +++ b/src/api/server/mod.rs @@ -17,6 +17,7 @@ use proto::{bria_service_server::BriaService, *}; use super::config::*; use crate::{ app::{error::ApplicationError, *}, + payjoin::PayjoinApp, payout_queue, primitives::*, profile, @@ -882,6 +883,7 @@ pub(crate) async fn start(server_config: ApiConfig, app: App) -> Result<(), Appl .add_service(BriaServiceServer::new(bria)) .serve(([0, 0, 0, 0], server_config.listen_port).into()) .await?; + Ok(()) } diff --git a/src/app/mod.rs b/src/app/mod.rs index e3adea71..a8f7f7ba 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -29,6 +29,7 @@ use crate::{ xpub::*, }; +#[derive(Clone)] #[allow(dead_code)] pub struct App { _runner: JobRunnerHandle, diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 3730f25c..70a339e2 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1048,13 +1048,6 @@ async fn run_cmd( }); })); let payjoin_send = send.clone(); - handles.push(tokio::spawn(async move { - let _ = payjoin_send.try_send( - super::payjoin::run(pool, app) - .await - .context("Payjoin server error") - ); - })); let reason = receive.recv().await.expect("Didn't receive msg"); for handle in handles { diff --git a/src/payjoin/app.rs b/src/payjoin/app.rs index eaf2f60b..2342d1c3 100644 --- a/src/payjoin/app.rs +++ b/src/payjoin/app.rs @@ -1,66 +1,109 @@ -use std::{str::FromStr, collections::HashMap}; +use std::collections::HashMap; -use anyhow::{anyhow, Result}; -use payjoin::{receive::{PayjoinProposal, UncheckedProposal, ProvisionalProposal}, Error}; +use anyhow::{anyhow, Context, Result}; +use payjoin::{ + receive::{PayjoinProposal, ProvisionalProposal, UncheckedProposal}, + Error, +}; use tracing::instrument; -use super::error::*; -use crate::primitives::bitcoin; - -const BOOTSTRAP_KEY_NAME: &str = "payjoin_bootstrap_key"; +type ProtoClient = + crate::api::proto::bria_service_client::BriaServiceClient; +use crate::{ + app::{error::ApplicationError, App}, + primitives::bitcoin, + profile::Profile, +}; +#[derive(Clone)] pub struct PayjoinApp { - // config: bitcoind ledger, wallet, pj_host, pj_endpoint - // ledger: Ledger, - // network: bitcoin::Network, - pool: sqlx::PgPool, + app: App, + profile: Profile, } impl PayjoinApp { - pub fn new(pool: sqlx::PgPool) -> Self { - Self { - pool, - } + pub async fn run(app: App, profile: Profile) -> Self { + Self { app, profile } } #[instrument(name = "payjoin_app.process_proposal", skip(self), err)] - fn process_proposal(&self, proposal: UncheckedProposal) -> Result { - let bitcoind = self.bitcoind().map_err(|e| Error::Server(e.into()))?; - + pub async fn process_proposal( + self, + proposal: UncheckedProposal, + ) -> Result { // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); // The network is used for checks later - let network = - bitcoind.get_blockchain_info().map_err(|e| Error::Server(e.into())).and_then( - |info| bitcoin::Network::from_str(&info.chain).map_err(|e| Error::Server(e.into())), - )?; + let network = self.app.network(); // Receive Check 1: Can Broadcast let proposal = proposal.check_broadcast_suitability(None, |tx| { - let raw_tx = bitcoin::consensus::encode::serialize_hex(&tx); - let mempool_results = - bitcoind.test_mempool_accept(&[raw_tx]).map_err(|e| Error::Server(e.into()))?; - match mempool_results.first() { - Some(result) => Ok(result.allowed), - None => Err(Error::Server( - anyhow!("No mempool results returned on broadcast check").into(), - )), - } + let _raw_tx = bitcoin::consensus::encode::serialize_hex(&tx); + // TODO test_mempool_accept e.g.: + // + // let mempool_results = + // bitcoind.test_mempool_accept(&[raw_tx]).map_err(|e| Error::Server(e.into()))?; + // match mempool_results.first() { + // Some(result) => Ok(result.allowed), + // None => Err(Error::Server( + // anyhow!("No mempool results returned on broadcast check").into(), + // )), + // } + Ok(true) })?; tracing::trace!("check1"); + use bdk::bitcoin::{Script, ScriptBuf}; + use std::sync::mpsc; + use std::sync::mpsc::Sender; + use std::sync::Arc; + use std::sync::Mutex; + + // Create a channel + let (tx, rx) = mpsc::channel::<(ScriptBuf, Sender>)>(); + let tx = Arc::new(Mutex::new(tx)); + + let app = self.app.clone(); + let profile = self.profile.clone(); + let network = network.clone(); + // Receive Check 2: receiver can't sign for proposal inputs + tokio::spawn(async move { + while let Ok((input, response_tx)) = rx.recv() { + let result = check_script_owned(&app, &profile, network, &input).await; + let _ = response_tx.send(result); + } + }); + + async fn check_script_owned( + app: &App, + profile: &Profile, + network: bitcoin::Network, + input: &Script, + ) -> Result { + let address = bitcoin::BdkAddress::from_script(&input, network)?; + match app + .find_address(&profile.clone(), address.to_string()) + .await + { + Err(ApplicationError::AddressError(e)) => Err(ApplicationError::AddressError(e)), + Err(_) => Ok(false), // good address script, but not found + Ok(_) => Ok(true), + } + } + let proposal = proposal.check_inputs_not_owned(|input| { - if let Ok(address) = bitcoin::BdkAddress::from_script(input, network) { - bitcoind - .get_address_info(&address) - .map(|info| info.is_mine.unwrap_or(false)) - .map_err(|e| Error::Server(e.into())) - } else { - Ok(false) + let (response_tx, response_rx) = mpsc::channel::>(); + { + let tx = tx.lock().expect("lock"); // TODO Handle lock error if needed + tx.send((input.to_owned(), response_tx)) + .map_err(|e| Error::Server(e.into()))?; } + let recv_res = response_rx.recv().map_err(|e| Error::Server(e.into()))?; + Ok(recv_res.map_err(|e| Error::Server(e.into()))?) })?; + tracing::trace!("check2"); // Receive Check 3: receiver can't sign for proposal inputs let proposal = proposal.check_no_mixed_input_scripts()?; @@ -74,35 +117,47 @@ impl PayjoinApp { })?; tracing::trace!("check4"); + // Receive Check 4: receiver can't sign for proposal inputs + let (tx, rx) = mpsc::channel::<(ScriptBuf, Sender>)>(); + let tx = Arc::new(Mutex::new(tx)); + + let app = self.app.clone(); + let profile = self.profile.clone(); + let network = network.clone(); + tokio::spawn(async move { + while let Ok((input, response_tx)) = rx.recv() { + let result = check_script_owned(&app, &profile, network, &input).await; + let _ = response_tx.send(result); + } + }); + let mut provisional_payjoin = payjoin.identify_receiver_outputs(|output_script| { - if let Ok(address) = bitcoin::BdkAddress::from_script(output_script, network) { - bitcoind - .get_address_info(&address) - .map(|info| info.is_mine.unwrap_or(false)) - .map_err(|e| Error::Server(e.into())) - } else { - Ok(false) + let (response_tx, response_rx) = mpsc::channel::>(); + { + let tx = tx.lock().expect("lock"); // TODO Handle lock error if needed + tx.send((output_script.to_owned(), response_tx)) + .map_err(|e| Error::Server(e.into()))?; } + let recv_res = response_rx.recv().map_err(|e| Error::Server(e.into()))?; + Ok(recv_res.map_err(|e| Error::Server(e.into()))?) })?; - if !self.config.sub_only { - // Select receiver payjoin inputs. - _ = try_contributing_inputs(&mut provisional_payjoin, &bitcoind) - .map_err(|e| tracing::warn!("Failed to contribute inputs: {}", e)); - } + // Don't throw an error. Continue optimistic process even if we can't contribute inputs. + self.try_contributing_inputs(&mut provisional_payjoin) + .await + .map_err(|e| tracing::warn!("Failed to contribute inputs: {}", e)); - let receiver_substitute_address = bitcoind - .get_new_address(None, None) - .map_err(|e| Error::Server(e.into()))? - .assume_checked(); - provisional_payjoin.substitute_output_address(receiver_substitute_address); + // Output substitution could go here let payjoin_proposal = provisional_payjoin.finalize_proposal( |psbt: &bitcoin::psbt::Psbt| { - bitcoind - .wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, Some(false)) - .map(|res| bitcoin::psbt::Psbt::from_str(&res.psbt).map_err(|e| Error::Server(e.into()))) - .map_err(|e| Error::Server(e.into()))? + Err(Error::Server(anyhow!("TODO sign psbt").into())) + // TODO sign proposal psbt with our inputs & subbed outputs e.g.: + // + // bitcoind + // .wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, Some(false)) + // .map(|res| bitcoin::psbt::Psbt::from_str(&res.psbt).map_err(|e| Error::Server(e.into()))) + // .map_err(|e| Error::Server(e.into()))? }, None, // TODO set to bitcoin::FeeRate::MIN or similar )?; @@ -114,36 +169,46 @@ impl PayjoinApp { Ok(payjoin_proposal) } - - fn try_contributing_inputs( - payjoin: &mut ProvisionalProposal, - bitcoind: &bitcoincore_rpc::Client, - ) -> Result<()> { + async fn try_contributing_inputs(self, payjoin: &mut ProvisionalProposal) -> Result<()> { use bitcoin::OutPoint; - let available_inputs = bitcoind - .list_unspent(None, None, None, None, None) - .context("Failed to list unspent from bitcoind")?; - let candidate_inputs: HashMap = available_inputs + let available_inputs = match self.app.list_utxos(&self.profile, "".to_owned()).await { + Err(e) => { + tracing::warn!("Failed to list utxos: {}", e); + return Ok(()); + } + Ok((_, utxos)) => utxos, + }; + let mut available_inputs = available_inputs .iter() - .map(|i| (i.amount, OutPoint { txid: i.txid, vout: i.vout })) + .flat_map(|keychain_utxos| keychain_utxos.utxos.iter()); + let candidate_inputs: HashMap = available_inputs + .clone() + // Why is a utxo output value NOT saved in bitcoin::Amount? How can it be partial satoshis? + .map(|i| { + ( + bitcoin::Amount::from_sat(i.value.into()), + i.outpoint.clone(), + ) + }) .collect(); - let selected_outpoint = payjoin.try_preserving_privacy(candidate_inputs).expect("gg"); + let selected_outpoint = payjoin + .try_preserving_privacy(candidate_inputs) + .expect("gg"); let selected_utxo = available_inputs - .iter() - .find(|i| i.txid == selected_outpoint.txid && i.vout == selected_outpoint.vout) + .find(|i| i.outpoint == selected_outpoint) .context("This shouldn't happen. Failed to retrieve the privacy preserving utxo from those we provided to the seclector.")?; - tracing::debug!("selected utxo: {:#?}", selected_utxo); - // calculate receiver payjoin outputs given receiver payjoin inputs and original_psbt, let txo_to_contribute = bitcoin::TxOut { - value: selected_utxo.amount.to_sat(), - script_pubkey: selected_utxo.script_pub_key.clone(), + value: selected_utxo.value.into(), + script_pubkey: selected_utxo + .address + .clone() + .ok_or_else(|| anyhow!("selected_utxo missing script"))? + .script_pubkey(), }; - let outpoint_to_contribute = - bitcoin::OutPoint { txid: selected_utxo.txid, vout: selected_utxo.vout }; - payjoin.contribute_witness_input(txo_to_contribute, outpoint_to_contribute); + payjoin.contribute_witness_input(txo_to_contribute, selected_outpoint); Ok(()) } -} \ No newline at end of file +} diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index b2be0036..15e0de26 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -6,63 +6,9 @@ mod server; pub use app::*; pub use config::*; pub use error::*; -pub use server::*; -// pub async fn run_dev( -// pool: sqlx::PgPool, -// config: ApiConfig, -// app_cfg: AppConfig, -// xpub: Option<(String, String)>, -// derivation_path: Option, -// ) -> Result<(), ApplicationError> { -// use crate::{ -// dev_constants, -// payout_queue::*, -// profile::Profiles, -// xpub::{BitcoindSignerConfig, SignerConfig}, -// }; - -// let app = App::run(pool.clone(), app_cfg).await?; -// if let Some((xpub, signer_endpoint)) = xpub { -// println!("Creating dev entities"); -// let profile = Profiles::new(&pool) -// .find_by_key(dev_constants::BRIA_DEV_KEY) -// .await?; -// let (_, xpubs) = app -// .create_wpkh_wallet( -// &profile, -// dev_constants::DEV_WALLET_NAME.to_string(), -// xpub, -// derivation_path, -// ) -// .await?; -// app.set_signer_config( -// &profile, -// xpubs[0].to_string(), -// SignerConfig::Bitcoind(BitcoindSignerConfig { -// endpoint: signer_endpoint, -// rpc_user: dev_constants::DEFAULT_BITCOIND_RPC_USER.to_string(), -// rpc_password: dev_constants::DEFAULT_BITCOIND_RPC_PASSWORD.to_string(), -// }), -// ) -// .await?; -// app.create_payout_queue( -// &profile, -// dev_constants::DEV_QUEUE_NAME.to_string(), -// None, -// Some(PayoutQueueConfig { -// trigger: PayoutQueueTrigger::Payjoin, -// ..PayoutQueueConfig::default() -// }), -// ) -// .await?; -// } -// server::start(config, app).await?; +// pub async fn run(pool: sqlx::PgPool, config: PayjoinConfig) -> Result<(), PayjoinError> { +// let app = PayjoinApp::run(pool).await; +// server::start(config, app).await; // Ok(()) // } - -pub async fn run(pool: sqlx::PgPool, config: PayjoinConfig) -> Result<(), PayjoinError> { - let app = PayjoinApp::new(pool); - server::start(config, app).await?; - Ok(()) -} diff --git a/src/payjoin/server/mod.rs b/src/payjoin/server/mod.rs index d47ad8ef..d5c77095 100644 --- a/src/payjoin/server/mod.rs +++ b/src/payjoin/server/mod.rs @@ -1,28 +1,34 @@ - -use hyper::{Server, service::{make_service_fn, service_fn}, Response, Request, Method, Body, StatusCode}; +use anyhow::Result; +use hyper::{ + service::{make_service_fn, service_fn}, + Body, Method, Request, Response, Server, StatusCode, +}; use payjoin::Error; use tracing::instrument; use super::{config::*, error::*, PayjoinApp}; +#[derive(Clone)] pub struct Payjoin { app: PayjoinApp, } impl Payjoin { #[instrument(skip_all, err)] - async fn handle_web_request(self, req: Request) -> Result, Error> { + async fn handle_web_request(self, req: Request) -> Result> { let mut response = match (req.method(), req.uri().path()) { (&Method::POST, _) => self .handle_payjoin_post(req) .await .map_err(|e| match e { - Error::BadRequest(e) => { - Response::builder().status(400).body(Body::from(e.to_string())).unwrap() - } - e => { - Response::builder().status(500).body(Body::from(e.to_string())).unwrap() - } + Error::BadRequest(e) => Response::builder() + .status(400) + .body(Body::from(e.to_string())) + .unwrap(), + e => Response::builder() + .status(500) + .body(Body::from(e.to_string())) + .unwrap(), }) .unwrap_or_else(|err_resp| err_resp), _ => Response::builder() @@ -30,9 +36,10 @@ impl Payjoin { .body(Body::from("Not found")) .unwrap(), }; - response - .headers_mut() - .insert("Access-Control-Allow-Origin", hyper::header::HeaderValue::from_static("*")); + response.headers_mut().insert( + "Access-Control-Allow-Origin", + hyper::header::HeaderValue::from_static("*"), + ); Ok(response) } @@ -42,7 +49,10 @@ impl Payjoin { let headers = Headers(&parts.headers); let query_string = parts.uri.query().unwrap_or(""); let body = std::io::Cursor::new( - hyper::body::to_bytes(body).await.map_err(|e| Error::Server(e.into()))?.to_vec(), + hyper::body::to_bytes(body) + .await + .map_err(|e| Error::Server(e.into()))? + .to_vec(), ); let proposal = payjoin::receive::UncheckedProposal::from_request(body, query_string, headers)?; @@ -50,25 +60,29 @@ impl Payjoin { let payjoin_proposal = self.app.process_proposal(proposal)?; let psbt = payjoin_proposal.psbt(); let body = base64::encode(psbt.serialize()); - println!("Responded with Payjoin proposal {}", psbt.clone().extract_tx().txid()); + println!( + "Responded with Payjoin proposal {}", + psbt.clone().extract_tx().txid() + ); Ok(Response::new(Body::from(body))) } } -pub(crate) async fn start( - server_config: PayjoinConfig, - app: PayjoinApp, -) -> Result<(), PayjoinError> { +pub(crate) async fn start(server_config: PayjoinConfig, app: PayjoinApp) -> Result<()> { let payjoin = Payjoin { app }; println!( "Starting payjoin server on port {}", server_config.listen_port ); - - let server = Server::bind(([0, 0, 0, 0], server_config.listen_port).into()); + let bind_addr = std::net::SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), + server_config.listen_port, + ); + let server = Server::bind(&bind_addr); let make_svc = make_service_fn(|_| { + let payjoin = payjoin.clone(); async move { - let handler = move |req| payjoin.handle_web_request(req); + let handler = move |req| payjoin.clone().handle_web_request(req); Ok::<_, hyper::Error>(service_fn(handler)) } }); @@ -80,6 +94,11 @@ struct Headers<'a>(&'a hyper::HeaderMap); impl payjoin::receive::Headers for Headers<'_> { fn get_header(&self, key: &str) -> Option<&str> { - self.0.get(key).map(|v| v.to_str()).transpose().ok().flatten() + self.0 + .get(key) + .map(|v| v.to_str()) + .transpose() + .ok() + .flatten() } -} \ No newline at end of file +} diff --git a/src/primitives/mod.rs b/src/primitives/mod.rs index 807af863..7764995f 100644 --- a/src/primitives/mod.rs +++ b/src/primitives/mod.rs @@ -80,7 +80,6 @@ impl std::ops::Deref for XPubId { pub mod bitcoin { pub use bdk::{ bitcoin::{ - Amount, address::{Error as AddressError, NetworkChecked, NetworkUnchecked}, bip32::{self, DerivationPath, ExtendedPubKey, Fingerprint}, blockdata::{ @@ -89,7 +88,7 @@ pub mod bitcoin { }, consensus, hash_types::Txid, - psbt, Address as BdkAddress, Network, + psbt, Address as BdkAddress, Amount, Network, }, descriptor::ExtendedDescriptor, BlockTime, FeeRate, KeychainKind, diff --git a/src/profile/entity.rs b/src/profile/entity.rs index d40dffa1..d39d47d2 100644 --- a/src/profile/entity.rs +++ b/src/profile/entity.rs @@ -18,7 +18,7 @@ pub enum ProfileEvent { }, } -#[derive(Debug, Builder)] +#[derive(Debug, Clone, Builder)] #[builder(pattern = "owned", build_fn(error = "EntityError"))] pub struct Profile { pub id: ProfileId, diff --git a/src/profile/repo.rs b/src/profile/repo.rs index 0778ea06..2fedc228 100644 --- a/src/profile/repo.rs +++ b/src/profile/repo.rs @@ -6,6 +6,7 @@ use uuid::Uuid; use super::{entity::*, error::ProfileError}; use crate::{dev_constants, entity::*, primitives::*}; +#[derive(Clone)] pub struct Profiles { pool: Pool, }