diff --git a/Cargo.lock b/Cargo.lock index ac20a725..2966987f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2442,8 +2442,8 @@ dependencies = [ "bhttp", "bip21", "bitcoin 0.32.2", + "bitcoin-hpke", "bitcoin-ohttp", - "chacha20poly1305 0.10.1", "http 1.1.0", "log", "reqwest 0.12.7", diff --git a/src/api/server/mod.rs b/src/api/server/mod.rs index 0f273611..3828684e 100644 --- a/src/api/server/mod.rs +++ b/src/api/server/mod.rs @@ -374,7 +374,7 @@ impl BriaService for Bria { ) -> Result, Status> { crate::tracing::record_error(|| async move { extract_tracing(&request); - println!("REQ"); + dbg!("REQ"); let key = extract_api_token(&request)?; let profile = self.app.authenticate(key).await?; diff --git a/src/app/mod.rs b/src/app/mod.rs index a68471d2..2bbe9364 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -530,7 +530,7 @@ impl App { let keychain_wallet = wallet.current_keychain_wallet(&self.pool); let addr = keychain_wallet.new_external_address().await?; let address = Address::from(addr.address.clone()); - println!("got address: {:?}", addr.address); + dbg!("got address: {:?}", addr.address); let mut builder = NewAddress::builder(); builder .address(address.clone()) @@ -546,11 +546,10 @@ impl App { } let new_address = builder.build().expect("Couldn't build NewUri"); self.addresses.persist_new_address(new_address).await?; - println!("init payjoin"); - let (session, ohttp_keys) = crate::payjoin::init_payjoin_session(payjoin::bitcoin::Address::from_str(&address.to_string()).unwrap().assume_checked(), self.pj.clone(), profile.account_id).await?; - println!("init'd payjoin"); - // TODO save session to DB - let uri = session.pj_uri_builder().amount(payjoin::bitcoin::Amount::from_sat(600_000)).build().to_string(); + dbg!("init payjoin"); + let (session, ohttp_keys) = self.pj.init_payjoin_session(&profile.account_id, payjoin::bitcoin::Address::from_str(&address.to_string()).unwrap().assume_checked()).await?; + dbg!("init'd payjoin"); + let uri = session.session.pj_uri_builder().amount(payjoin::bitcoin::Amount::from_sat(600_000)).build().to_string(); Ok((wallet.id, uri)) } diff --git a/src/batch/entity.rs b/src/batch/entity.rs index 358b377e..e931191f 100644 --- a/src/batch/entity.rs +++ b/src/batch/entity.rs @@ -12,6 +12,7 @@ pub struct Batch { pub wallet_summaries: HashMap, pub unsigned_psbt: bitcoin::psbt::PartiallySignedTransaction, pub signed_tx: Option, + pub provisional_proposal: Option, } impl Batch { @@ -31,6 +32,7 @@ pub struct NewBatch { pub(super) total_fee_sats: Satoshis, pub(super) unsigned_psbt: bitcoin::psbt::PartiallySignedTransaction, pub(super) wallet_summaries: HashMap, + pub(super) provisional_proposal: Option, } impl NewBatch { diff --git a/src/job/batch_signing.rs b/src/job/batch_signing.rs index 0af00a7e..4c3a58c3 100644 --- a/src/job/batch_signing.rs +++ b/src/job/batch_signing.rs @@ -44,6 +44,7 @@ pub async fn execute( let mut stalled = false; let mut last_err = None; let mut current_keychain = None; + // get provisional proposal psbt to replace batch.unsigned_psbt out with an mpsc channel, sign it, and replace it with the result with a channel back into the finalize_psbt wallet_process_psbt closure let (mut sessions, mut account_xpub_cache) = if let Some(batch_session) = signing_sessions .list_for_batch(data.account_id, data.batch_id) .await? @@ -112,6 +113,7 @@ pub async fn execute( continue; } }; + // switch session.unsigned_psbt to provisional_proposal.finalize_psbt(|psbt|) match client.sign_psbt(&session.unsigned_psbt).await { Ok(psbt) => { session.remote_signing_complete(psbt); diff --git a/src/job/mod.rs b/src/job/mod.rs index f40edba3..c1502c7d 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -240,21 +240,6 @@ pub async fn spawn_process_payout_queue( .await } -pub async fn spawn_payjoin_payout_queue( - pool: &sqlx::PgPool, - data: impl Into, -) -> Result { - let data = data.into(); - onto_account_main_channel( - pool, - data.account_id, - Uuid::new_v4(), - "payjoin_payout_queue", - data, - ) - .await -} - #[job(name = "schedule_process_payout_queue")] async fn schedule_process_payout_queue(mut current_job: CurrentJob) -> Result<(), JobError> { let pool = current_job.pool().clone(); @@ -716,7 +701,19 @@ impl From<(AccountId, PayoutQueueId)> for ProcessPayoutQueueData { payout_queue_id, account_id, batch_id: BatchId::new(), - payjoin: None, + payjoin_session: None, + tracing_data: crate::tracing::extract_tracing_data(), + } + } +} + +impl From<(AccountId, PayoutQueueId, payjoin::receive::v2::WantsOutputs)> for ProcessPayoutQueueData { + fn from((account_id, payout_queue_id, session): (AccountId, PayoutQueueId, payjoin::receive::v2::WantsOutputs)) -> Self { + Self { + payout_queue_id, + account_id, + batch_id: BatchId::new(), + payjoin_session: Some(session), tracing_data: crate::tracing::extract_tracing_data(), } } diff --git a/src/job/process_payout_queue.rs b/src/job/process_payout_queue.rs index a8972fae..011ce9e3 100644 --- a/src/job/process_payout_queue.rs +++ b/src/job/process_payout_queue.rs @@ -1,6 +1,6 @@ -use payjoin::receive::{v2::{ActiveSession, WantsOutputs}, ProvisionalProposal}; +use payjoin::receive::v2::{ActiveSession, ProvisionalProposal, UncheckedProposal, WantsOutputs}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, str::FromStr}; use tracing::instrument; use super::error::JobError; @@ -13,7 +13,7 @@ pub struct ProcessPayoutQueueData { pub(super) payout_queue_id: PayoutQueueId, pub(super) account_id: AccountId, pub(super) batch_id: BatchId, - pub(super) payjoin_session: Option, // find by id? + pub(super) payjoin_session: Option, // find by id? #[serde(flatten)] pub(super) tracing_data: HashMap, } @@ -63,6 +63,9 @@ pub(super) async fn let fee_rate = fees_client .fee_rate(payout_queue.config.tx_priority) .await?; + + // simplification: only payjoin when there is just one wallet + let is_payjoin_eligible = data.payjoin_session.is_some() && unbatched_payouts.wallet_ids().len() == 1; let FinishedPsbtBuild { psbt, included_payouts, @@ -70,18 +73,35 @@ pub(super) async fn wallet_totals, tx_id, fee_satoshis, + provisional_proposal, .. - } = construct_psbt( - &pool, - &mut tx, - &unbatched_payouts, - &utxos, - &wallets, - payout_queue, - fee_rate, - false, - ) - .await?; + } = if is_payjoin_eligible { + let wants_outputs = data.payjoin_session.clone().unwrap(); + construct_payjoin_psbt( + &pool, + &mut tx, + &unbatched_payouts, + &utxos, + &wallets, + payout_queue, + fee_rate, + false, + wants_outputs, + ) + .await? + } else { + construct_psbt( + &pool, + &mut tx, + &unbatched_payouts, + &utxos, + &wallets, + payout_queue, + fee_rate, + false, + ) + .await? + }; let span = tracing::Span::current(); if let (Some(tx_id), Some(psbt)) = (tx_id, psbt) { @@ -114,6 +134,7 @@ pub(super) async fn .tx_id(tx_id) .unsigned_psbt(psbt) .total_fee_sats(fee_satoshis) + .provisional_proposal(provisional_proposal) .wallet_summaries( wallet_totals .into_iter() @@ -171,151 +192,6 @@ pub(super) async fn } } -// #[allow(clippy::type_complexity, clippy::too_many_arguments)] -// pub(super) async fn execute_payjoin<'a>( -// pool: sqlx::PgPool, -// payouts: Payouts, -// wallets: Wallets, -// payout_queues: PayoutQueues, -// batches: Batches, -// utxos: Utxos, -// data: ProcessPayoutQueueData, -// fees_client: FeesClient, -// provisional_proposal: &mut ProvisionalProposal, -// ) -> Result< -// ( -// ProcessPayoutQueueData, -// Option<(sqlx::Transaction<'a, sqlx::Postgres>, Vec)>, -// ), -// JobError, -// > { -// let payout_queue = payout_queues -// .find_by_id(data.account_id, data.payout_queue_id) -// .await?; -// let mut unbatched_payouts = payouts -// .list_unbatched(data.account_id, data.payout_queue_id) -// .await?; - -// let fee_rate = fees_client -// .fee_rate(payout_queue.config.tx_priority) -// .await?; -// let mut tx = pool.begin().await?; - -// // TODO FIRST add their payjoin input to Utxos -// utxos. -// // TODO THEN add their payjoin output to unbatched payout -// let payjoin_payouts = provisional_proposal.; -// payouts.update_unbatched(tx, payouts) - -// let FinishedPsbtBuild { -// psbt, -// included_payouts, -// included_utxos, -// wallet_totals, -// tx_id, -// fee_satoshis, -// .. -// } = construct_payjoin_psbt( -// &pool, -// &mut tx, -// &unbatched_payouts, -// &utxos, -// &wallets, -// payout_queue, -// fee_rate, -// false, -// provisional_proposal, -// ) -// .await?; - -// let span = tracing::Span::current(); -// if let (Some(tx_id), Some(psbt)) = (tx_id, psbt) { -// span.record("tx_id", &tracing::field::display(tx_id)); -// span.record("psbt", &tracing::field::display(&psbt)); - -// let wallet_ids = wallet_totals.keys().copied().collect(); -// span.record("batch_id", &tracing::field::display(data.batch_id)); -// span.record("total_fee_sats", &tracing::field::display(fee_satoshis)); -// span.record( -// "total_change_sats", -// &tracing::field::display( -// wallet_totals -// .values() -// .fold(Satoshis::ZERO, |acc, v| acc + v.change_satoshis), -// ), -// ); -// span.record( -// "cpfp_fee_sats", -// &tracing::field::display( -// wallet_totals -// .values() -// .fold(Satoshis::ZERO, |acc, v| acc + v.cpfp_fee_satoshis), -// ), -// ); -// let batch = NewBatch::builder() -// .account_id(data.account_id) -// .id(data.batch_id) -// .payout_queue_id(data.payout_queue_id) -// .tx_id(tx_id) -// .unsigned_psbt(psbt) -// .total_fee_sats(fee_satoshis) -// .wallet_summaries( -// wallet_totals -// .into_iter() -// .map(|(wallet_id, total)| (wallet_id, WalletSummary::from(total))) -// .collect(), -// ) -// .build() -// .expect("Couldn't build batch"); - -// // Not using a Box here causes an interesting compile error with rustc 1.69.0 -// let included_utxos: Box + Send> = -// Box::new(included_utxos.into_iter().flat_map(|(_, keychain_map)| { -// keychain_map -// .into_iter() -// .flat_map(|(keychain_id, outpoints)| { -// outpoints -// .into_iter() -// .map(move |outpoint| (keychain_id, outpoint)) -// }) -// })); - -// let batch_id = batch.id; -// batches.create_in_tx(&mut tx, batch).await?; -// utxos -// .reserve_utxos_in_batch( -// &mut tx, -// data.account_id, -// batch_id, -// data.payout_queue_id, -// fee_rate, -// included_utxos, -// ) -// .await?; - -// unbatched_payouts.commit_to_batch( -// tx_id, -// batch_id, -// included_payouts -// .into_values() -// .flat_map(|payouts| payouts.into_iter().map(|((id, _, _), vout)| (id, vout))), -// ); - -// if unbatched_payouts.n_not_batched() > 0 { -// queue_drain_error(unbatched_payouts.n_not_batched()); -// } - -// payouts.update_unbatched(&mut tx, unbatched_payouts).await?; - -// Ok((data, Some((tx, wallet_ids)))) -// } else { -// if unbatched_payouts.n_not_batched() > 0 { -// queue_drain_error(unbatched_payouts.n_not_batched()); -// } -// Ok((data, None)) -// } -// } - #[allow(clippy::too_many_arguments)] pub async fn construct_psbt( pool: &sqlx::Pool, @@ -389,106 +265,81 @@ pub async fn construct_psbt( .await?) } -// pub async fn sign_payjoin_psbt( -// psbt: bdk::bitcoin::psbt::Psbt, -// pool: &sqlx::Pool, -// tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, -// unbatched_payouts: &UnbatchedPayouts, -// utxos: &Utxos, -// wallets: &Wallets, -// payout_queue: PayoutQueue, -// fee_rate: bitcoin::FeeRate, -// for_estimation: bool, -// ) -> Result { - -// } - -// #[allow(clippy::too_many_arguments)] -// pub async fn construct_payjoin_psbt( -// pool: &sqlx::Pool, -// tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, -// unbatched_payouts: &UnbatchedPayouts, -// utxos: &Utxos, -// wallets: &Wallets, -// payout_queue: PayoutQueue, -// fee_rate: bitcoin::FeeRate, -// for_estimation: bool, -// provisional_proposal: &mut ProvisionalProposal, -// ) -> Result { -// let span = tracing::Span::current(); -// let PayoutQueue { -// id: queue_id, -// config: queue_cfg, -// name: queue_name, -// .. -// } = payout_queue; -// span.record("payout_queue_name", queue_name); -// span.record("payout_queue_id", &tracing::field::display(queue_id)); -// span.record("n_unbatched_payouts", unbatched_payouts.n_payouts()); - -// let wallets = wallets.find_by_ids(unbatched_payouts.wallet_ids()).await?; -// let reserved_utxos = { -// let keychain_ids = wallets.values().flat_map(|w| w.keychain_ids()); -// utxos -// .outpoints_bdk_should_not_select(tx, keychain_ids) -// .await? -// }; -// span.record( -// "n_reserved_utxos", -// reserved_utxos.values().fold(0, |acc, v| acc + v.len()), -// ); - -// span.record("n_cpfp_utxos", 0); +#[allow(clippy::too_many_arguments)] +pub async fn construct_payjoin_psbt( + pool: &sqlx::Pool, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + unbatched_payouts: &UnbatchedPayouts, + utxos: &Utxos, + wallets: &Wallets, // FIXME invariant where unbatched_payouts.wallet_ids().len() == 1 + payout_queue: PayoutQueue, + fee_rate: bitcoin::FeeRate, + for_estimation: bool, + wants_outputs: WantsOutputs, +) -> Result { + let span = tracing::Span::current(); + let PayoutQueue { + id: queue_id, + config: queue_cfg, + name: queue_name, + .. + } = payout_queue; + span.record("payout_queue_name", queue_name); + span.record("payout_queue_id", &tracing::field::display(queue_id)); + span.record("n_unbatched_payouts", unbatched_payouts.n_payouts()); -// let mut cfg = PsbtBuilderConfig::builder() -// .consolidate_deprecated_keychains(queue_cfg.consolidate_deprecated_keychains) -// .fee_rate(fee_rate) -// .reserved_utxos(reserved_utxos) -// .force_min_change_output(queue_cfg.force_min_change_sats); -// if !for_estimation && queue_cfg.should_cpfp() { -// let keychain_ids = wallets.values().flat_map(|w| w.keychain_ids()); -// let utxos = utxos -// .find_cpfp_utxos( -// tx, -// keychain_ids, -// queue_id, -// queue_cfg.cpfp_payouts_detected_before(), -// queue_cfg -// .cpfp_payouts_detected_before_block(crate::bdk::last_sync_time(pool).await?), -// ) -// .await?; -// span.record( -// "n_cpfp_utxos", -// utxos.values().fold(0, |acc, v| acc + v.len()), -// ); -// cfg = cfg.cpfp_utxos(utxos); -// } + let wallets = wallets.find_by_ids(unbatched_payouts.wallet_ids()).await?; + // inputs + let reserved_utxos = { + let keychain_ids = wallets.values().flat_map(|w| w.keychain_ids()); + utxos + .outpoints_bdk_should_not_select(tx, keychain_ids) + .await? + }; + span.record( + "n_reserved_utxos", + reserved_utxos.values().fold(0, |acc, v| acc + v.len()), + ); -// let tx_payouts = unbatched_payouts.into_tx_payouts(); -// // TODO add proposal tx_payouts -// let finished_psbt_build = PsbtBuilder::construct_psbt( -// pool, -// cfg.for_estimation(for_estimation) -// .build() -// .expect("Couldn't build PsbtBuilderConfig"), -// tx_payouts, -// wallets, -// ) -// .await?; + span.record("n_cpfp_utxos", 0); -// let FinishedPsbtBuild { -// psbt, -// included_payouts, -// included_utxos, -// wallet_totals, -// tx_id, -// fee_satoshis, -// .. -// } = finished_psbt_build; + let mut cfg = PsbtBuilderConfig::builder() + .consolidate_deprecated_keychains(queue_cfg.consolidate_deprecated_keychains) + .fee_rate(fee_rate) + .reserved_utxos(reserved_utxos) + .force_min_change_output(queue_cfg.force_min_change_sats); + if !for_estimation && queue_cfg.should_cpfp() { + let keychain_ids = wallets.values().flat_map(|w| w.keychain_ids()); + let utxos = utxos + .find_cpfp_utxos( + tx, + keychain_ids, + queue_id, + queue_cfg.cpfp_payouts_detected_before(), + queue_cfg + .cpfp_payouts_detected_before_block(crate::bdk::last_sync_time(pool).await?), + ) + .await?; + span.record( + "n_cpfp_utxos", + utxos.values().fold(0, |acc, v| acc + v.len()), + ); + cfg = cfg.cpfp_utxos(utxos); + } -// provisional_proposal. -// Ok(finished_psbt_build) -// } + let tx_payouts = unbatched_payouts.into_tx_payouts(); + // TODO add proposal tx_payouts + Ok(PsbtBuilder::construct_psbt( + pool, + cfg.for_estimation(for_estimation) + .wants_outputs(Some(wants_outputs)) + .build() + .expect("Couldn't build PsbtBuilderConfig"), + tx_payouts, + wallets, + ) + .await?) +} #[instrument(name = "job.queue_drain_error", fields(error = true, error.level, error.message))] fn queue_drain_error(n_not_batched: usize) { diff --git a/src/job/process_payout_queue_payjoin-notes.md b/src/job/process_payout_queue_payjoin-notes.md new file mode 100644 index 00000000..21b9857f --- /dev/null +++ b/src/job/process_payout_queue_payjoin-notes.md @@ -0,0 +1,280 @@ +```rs + //-- THIS IS THE old process_payout_queue payjoin code + //let wallet_id = unbatched_payouts.wallet_ids().into_iter()..first().unwrap(); // we know the length is one from the is_payjoin_eligible check + // DEFINE OUTPUTS ------- + // ---------------------- + use rust_decimal::prelude::ToPrimitive; + let replacement_outputs: Vec = unbatched_payouts + .into_iter() + .flat_map(|(_wallet_id, payouts)| payouts.into_iter()) + .map(|(_, address, sats)| { + payjoin::bitcoin::TxOut { + value: payjoin::bitcoin::Amount::from_btc(sats.to_btc().to_f64().unwrap()).unwrap(), + script_pubkey: payjoin::bitcoin::ScriptBuf::from_bytes(address.script_pubkey().to_bytes()), + } + }) + .collect(); + + // FIXME STUPID SIMPLIFICATION: pick first availabledrain address + // FIXME bria can have multiple drain scripts since a queue 'receiver' is actually multiple wallets + let drain_script = replacement_outputs.first().expect("no outputs to replace with").script_pubkey; + let wants_inputs = wants_outputs.replace_receiver_outputs(replacement_outputs, &drain_script).unwrap().commit_outputs(); + + // CONTRIBUTE INPUTS ------- + // ------------------------- + // payout queue config, batch signing job + println!("contribute"); + // Don't throw an error. Continue optimistic process even if we can't contribute inputs. + + let available_wallets = wallets + .list_by_account_id(data.account_id) + .await + .expect("Failed to list wallets"); + let keychain_ids = available_wallets + .iter() + .flat_map(|wallet| wallet.keychain_ids()); + let mut keychain_utxos = utxos.find_keychain_utxos(keychain_ids).await.expect("failed to find keychain utxos"); + let keychain_utxos = keychain_utxos + .drain() + .map(|(_, keychain_utxos)| keychain_utxos) + .collect::>(); + + let mut available_inputs = keychain_utxos + .iter() + .flat_map(|keychain_utxos| keychain_utxos.utxos.iter()); + + let candidate_inputs: HashMap = available_inputs + .clone() + // Why is a utxo output value NOT saved in bitcoin::Amount? How can it be partial satoshis? + .map(|i| { + let txid = payjoin::bitcoin::Txid::from_str(&i.outpoint.txid.to_string()).unwrap(); + ( + payjoin::bitcoin::Amount::from_sat(i.value.into()), + payjoin::bitcoin::OutPoint::new(txid, i.outpoint.vout), + ) + }) + .collect(); + let selected_outpoint = wants_inputs + .try_preserving_privacy(candidate_inputs) + .expect("no privacy preserving utxo found"); + let selected_utxo = available_inputs + .find(|i| { + let txid = payjoin::bitcoin::Txid::from_str(&i.outpoint.txid.to_string()).unwrap(); + payjoin::bitcoin::OutPoint::new(txid, i.outpoint.vout) == selected_outpoint + }) + .expect("This shouldn't happen. Failed to retrieve the privacy preserving utxo from those we provided to the seclector."); + + let txo_to_contribute = payjoin::bitcoin::TxOut { + value: payjoin::bitcoin::Amount::from_sat(selected_utxo.value.into()), + script_pubkey: payjoin::bitcoin::ScriptBuf::from_bytes(selected_utxo + .address + .clone() + .expect("selected_utxo missing script") + .script_pubkey().to_bytes()), + }; + let provisional_proposal = wants_inputs.contribute_witness_inputs(vec![(selected_outpoint, txo_to_contribute)]).expect("failed to contribute inputs").commit_inputs(); + // -- +``` + +```rs +use std::sync::{Arc, Mutex}; + use std::sync::mpsc::{self, Sender, Receiver}; + use std::thread; + use std::time::Duration; + use crate::payjoin::ProcessPsbtControl; + + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + provisional_proposal.finalize_proposal(|psbt| { + let psbt = crate::payjoin::wallet_process_psbt(psbt.clone()).unwrap(); + Ok(psbt.clone()) + }, None, payjoin::bitcoin::FeeRate::from_sat_per_vb(100).unwrap()); + // TODO + // TODO + // TODO + + + ``` + ```rs + + #[instrument(name = "psbt_builder.construct_payjoin_psbt", skip_all)] + pub async fn construct_payjoin_psbt( + pool: &sqlx::PgPool, + cfg: PsbtBuilderConfig, + wants_outputs: payjoin::receive::v2::WantsOutputs, + unbatched_payouts: HashMap>, + mut wallets: HashMap, // FIXME invariant where unbatched_payouts.wallet_ids().len() == 1 + ) -> Result { + let mut outer_builder: PsbtBuilder = PsbtBuilder::new(cfg); + + let wallet_id = unbatched_payouts.keys().next().expect("unbatched_payouts must be non-empty"); + let payouts = unbatched_payouts.values().next().expect("unbatched_payouts must be non-empty"); + let wallet = wallets.remove(&wallet_id.clone()).expect("Wallet not found"); + + let mut builder = outer_builder.wallet_payouts(*wallet_id, payouts.to_vec()); + for keychain in wallet.deprecated_keychain_wallets(pool.clone()) { + builder = keychain.dispatch_bdk_wallet(builder).await?; + } + // include inputs and outputs: + outer_builder = wallet + .current_keychain_wallet(pool) + .dispatch_bdk_wallet(builder.accept_current_keychain()) + .await? + .next_wallet(); + + //-- + //let wallet_id = unbatched_payouts.wallet_ids().into_iter()..first().unwrap(); // we know the length is one from the is_payjoin_eligible check + // DEFINE OUTPUTS ------- + // ---------------------- + use rust_decimal::prelude::ToPrimitive; + let replacement_outputs: Vec = unbatched_payouts + .into_iter() + .flat_map(|(_wallet_id, payouts)| payouts.into_iter()) + .map(|(_, address, sats)| { + payjoin::bitcoin::TxOut { + value: payjoin::bitcoin::Amount::from_btc(sats.to_btc().to_f64().unwrap()).unwrap(), + script_pubkey: payjoin::bitcoin::ScriptBuf::from_bytes(address.script_pubkey().to_bytes()), + } + }) + .collect(); + + // FIXME STUPID SIMPLIFICATION: pick first availabledrain address + // FIXME bria can have multiple drain scripts since a queue 'receiver' is actually multiple wallets + let drain_script = replacement_outputs.first().expect("no outputs to replace with").script_pubkey; + let wants_inputs = wants_outputs.replace_receiver_outputs(replacement_outputs, &drain_script).unwrap().commit_outputs(); + + // CONTRIBUTE INPUTS ------- + // ------------------------- + // payout queue config, batch signing job + println!("contribute"); + // Don't throw an error. Continue optimistic process even if we can't contribute inputs. + + let available_wallets = wallets + .list_by_account_id(data.account_id) + .await + .expect("Failed to list wallets"); + let keychain_ids = available_wallets + .iter() + .flat_map(|wallet| wallet.keychain_ids()); + let mut keychain_utxos = utxos.find_keychain_utxos(keychain_ids).await.expect("failed to find keychain utxos"); + let keychain_utxos = keychain_utxos + .drain() + .map(|(_, keychain_utxos)| keychain_utxos) + .collect::>(); + + let mut available_inputs = keychain_utxos + .iter() + .flat_map(|keychain_utxos| keychain_utxos.utxos.iter()); + + let candidate_inputs: HashMap = available_inputs + .clone() + // Why is a utxo output value NOT saved in bitcoin::Amount? How can it be partial satoshis? + .map(|i| { + let txid = payjoin::bitcoin::Txid::from_str(&i.outpoint.txid.to_string()).unwrap(); + ( + payjoin::bitcoin::Amount::from_sat(i.value.into()), + payjoin::bitcoin::OutPoint::new(txid, i.outpoint.vout), + ) + }) + .collect(); + let selected_outpoint = wants_inputs + .try_preserving_privacy(candidate_inputs) + .expect("no privacy preserving utxo found"); + let selected_utxo = available_inputs + .find(|i| { + let txid = payjoin::bitcoin::Txid::from_str(&i.outpoint.txid.to_string()).unwrap(); + payjoin::bitcoin::OutPoint::new(txid, i.outpoint.vout) == selected_outpoint + }) + .expect("This shouldn't happen. Failed to retrieve the privacy preserving utxo from those we provided to the seclector."); + + let txo_to_contribute = payjoin::bitcoin::TxOut { + value: payjoin::bitcoin::Amount::from_sat(selected_utxo.value.into()), + script_pubkey: payjoin::bitcoin::ScriptBuf::from_bytes(selected_utxo + .address + .clone() + .expect("selected_utxo missing script") + .script_pubkey().to_bytes()), + }; + let provisional_proposal = wants_inputs.contribute_witness_inputs(vec![(selected_outpoint, txo_to_contribute)]).expect("failed to contribute inputs").commit_inputs(); + // -- + Ok(outer_builder.finish()) + } +``` + +```rs + +#[allow(clippy::too_many_arguments)] +pub async fn construct_payjoin_psbt( + pool: &sqlx::Pool, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + unbatched_payouts: &UnbatchedPayouts, + utxos: &Utxos, + wallets: &Wallets, // FIXME invariant where unbatched_payouts.wallet_ids().len() == 1 + payout_queue: PayoutQueue, + fee_rate: bitcoin::FeeRate, + for_estimation: bool, + wants_outputs: WantsOutputs, +) -> Result { + let span = tracing::Span::current(); + let PayoutQueue { + id: queue_id, + config: queue_cfg, + name: queue_name, + .. + } = payout_queue; + span.record("payout_queue_name", queue_name); + span.record("payout_queue_id", &tracing::field::display(queue_id)); + span.record("n_unbatched_payouts", unbatched_payouts.n_payouts()); + + let wallets = wallets.find_by_ids(unbatched_payouts.wallet_ids()).await?; + // inputs + let reserved_utxos = { + let keychain_ids = wallets.values().flat_map(|w| w.keychain_ids()); + utxos + .outpoints_bdk_should_not_select(tx, keychain_ids) + .await? + }; + span.record( + "n_reserved_utxos", + reserved_utxos.values().fold(0, |acc, v| acc + v.len()), + ); + + span.record("n_cpfp_utxos", 0); + + let mut cfg = PsbtBuilderConfig::builder() + .consolidate_deprecated_keychains(queue_cfg.consolidate_deprecated_keychains) + .fee_rate(fee_rate) + .reserved_utxos(reserved_utxos) + .force_min_change_output(queue_cfg.force_min_change_sats); + if !for_estimation && queue_cfg.should_cpfp() { + let keychain_ids = wallets.values().flat_map(|w| w.keychain_ids()); + let utxos = utxos + .find_cpfp_utxos( + tx, + keychain_ids, + queue_id, + queue_cfg.cpfp_payouts_detected_before(), + queue_cfg + .cpfp_payouts_detected_before_block(crate::bdk::last_sync_time(pool).await?), + ) + .await?; + span.record( + "n_cpfp_utxos", + utxos.values().fold(0, |acc, v| acc + v.len()), + ); + cfg = cfg.cpfp_utxos(utxos); + } + + let tx_payouts = unbatched_payouts.into_tx_payouts(); + // TODO add proposal tx_payouts + Ok(PsbtBuilder::construct_psbt( + pool, + cfg.for_estimation(for_estimation) + .wants_outputs(Some(wants_outputs)) + .build() + .expect("Couldn't build PsbtBuilderConfig"), + tx_payouts, + wallets, + ) + .await?) +} +``` \ No newline at end of file diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index 91ad22c3..a8a7eb02 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -1,9 +1,11 @@ pub mod config; pub mod error; + +use payjoin::bitcoin; use crate::{ - address::error::AddressError, app::error::ApplicationError, job::{self, process_payout_queue}, payjoin::config::*, payout_queue::PayoutQueues, primitives::AccountId + address::error::AddressError, app::error::ApplicationError, job::{self, process_payout_queue::{self, ProcessPayoutQueueData}}, payjoin::config::*, payout_queue::PayoutQueues, primitives::{AccountId, ProfileId}, profile::Profile }; -use std::{collections::HashMap, str::FromStr, time::Duration}; +use std::{any::Any, collections::HashMap, str::FromStr, time::Duration}; use anyhow::{anyhow, Result, Context}; use bdk::bitcoin::{psbt::Psbt, Address, Transaction, Txid}; @@ -27,7 +29,7 @@ use crate::{ wallet::Wallets, }; -#[derive(Clone)] +/// A representation of a payjoin receiver "service" pub struct PayjoinReceiver { rt: Handle, pool: sqlx::PgPool, @@ -61,110 +63,68 @@ impl PayjoinReceiver { } } - pub async fn sanity_check( - self, - session: RecvSession, - proposal: UncheckedProposal, - ) -> Result> { - // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx - let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); - // we have to look up the output address from a list of payjoin addresses that should NOT contain change addresses - // if we hit 2x payjoin addresses, we should abort - - // The network is used for checks later - let network = self.network; - let account_id = session.account_id; - - // Receive Check 1: Can Broadcast - let proposal = proposal.check_broadcast_suitability(None, |_tx| { - // TODO test_mempool_accept e.g.: - // - // Fulcrum does not yet support this, so we need to devise a way to check this to the best of our ability - // Probably by using bitcoind directly and deprecating Fulcrum - Ok(true) - }).expect("check1 failed"); - println!("check2"); - let network = network.clone(); - let (tx, rx) = std::sync::mpsc::channel(); - // Receive Check 2: receiver can't sign for proposal inputs - let proposal = proposal.check_inputs_not_owned(|input| { - // Spawn a new thread for each input check - let tx = tx.clone(); - let addresses = self.addresses.clone(); - let input = input.to_string(); - println!("check2"); - std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - println!("check2"); - let result = match bitcoin::BdkAddress::from_str(&input) { - Ok(address) => { - match addresses.find_by_address(account_id, address.require_network(self.network).unwrap().to_string()).await { - Ok(_) => Ok(true), - Err(AddressError::AddressNotFound(_)) => Ok(false), - Err(e) => { - println!("ERROR! {:?}", e.to_string()); - Err(e.to_string()) - }, - } - }, - Err(e) => Err(e.to_string()), - }; - println!("check2"); - tx.send(result.unwrap()).unwrap(); - }); - }); - - // This will block until the async operation is complete - Ok(rx.recv().unwrap()) - }).expect("check2 failed"); - println!("check3"); - - // Receive Check 3: receiver can't sign for proposal inputs - let proposal = proposal.check_no_mixed_input_scripts()?; - - // Receive Check 4: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers. - let payjoin = proposal.check_no_inputs_seen_before(|input| { - // TODO implement input_seen_before database check - // Ok(!self.insert_input_seen_before(*input).map_err(|e| Error::Server(e.into()))?) - Ok(false) - }).expect("check4 failed"); + /// Initializes a payjoin session and listens for a payjoin request on a background thread. + /// TODO save the session to the database so it can be resumed after a shutdown + pub async fn init_payjoin_session(&self, account_id: &AccountId, address: payjoin::bitcoin::Address) -> Result<(RecvSession, payjoin::OhttpKeys), anyhow::Error> { + let payjoin_dir = Url::parse("https://payjo.in").expect("Invalid URL"); + let ohttp_relays: [Url; 2] = [ + Url::parse("https://pj.bobspacebkk.com").expect("Invalid URL"), + Url::parse("https://ohttp.payjoin.org").expect("Invalid URL"), + ]; + dbg!("fetch"); + let payjoin_dir_clone = payjoin_dir.clone(); + let ohttp_relay_clone = ohttp_relays[0].clone(); + let ohttp_keys = tokio::task::spawn_blocking(move || { + payjoin::io::fetch_ohttp_keys(ohttp_relay_clone, payjoin_dir_clone) + }).await?.await?; + let http_client = reqwest::Client::builder().build()?; + dbg!("fetched"); + fn random_ohttp_relay(ohttp_relays: [Url; 2]) -> Url { + use rand::seq::SliceRandom; + use rand::thread_rng; + ohttp_relays.choose(&mut thread_rng()).unwrap().clone() + } + dbg!("enroll"); + let mut enroller = payjoin::receive::v2::SessionInitializer::new( + address, + payjoin_dir.to_owned(), + ohttp_keys.clone(), + ohttp_relays[0].to_owned(), + None, + ); + dbg!("req"); + let (req, context) = enroller.extract_req().map_err(|e| anyhow::anyhow!(e.to_string()))?; + let ohttp_response = http_client + .post(req.url) + .header("Content-Type", "message/ohttp-req") + .body(req.body) + .send() + .await?; + let ohttp_response = ohttp_response.bytes().await?; + dbg!("res"); + let session = enroller.process_res(ohttp_response.as_ref(), context).map_err(|e| anyhow::anyhow!(e.to_string()))?; + let recv_session = RecvSession { account_id: account_id.clone(), session: session.clone(), expiry: std::time::Duration::from_secs(60 * 60 * 24), payjoin_tx: None }; + self.spawn_recv_session(recv_session.clone()); + // ^^ ABOVE DOES THIS + // tokio::task::spawn(move || { + // let wants_outputs = self.sanity_check(recv_session, proposal).await?; - // Receive Check 4: receiver can't sign for proposal inputs - let network = network.clone(); - let (tx2, rx2) = std::sync::mpsc::channel(); - let mut payjoin = payjoin.identify_receiver_outputs(|output_script| { - // Clone transmitter for each output_script - let tx2 = tx2.clone(); - let addresses = self.addresses.clone(); - let output_script = output_script.to_string(); - // Spawn a new thread for each output_script check - std::thread::spawn(move || { - println!("check4"); - let rt = tokio::runtime::Runtime::new().unwrap(); // Create a new runtime for the thread - rt.block_on(async { - let result = match bitcoin::BdkAddress::from_str(&output_script) { - Ok(address) => { - match addresses.find_by_address(account_id, address.assume_checked().to_string()).await { - Ok(_) => Ok(true), // TODO: Confirm ownership logic if needed - Err(AddressError::AddressNotFound(_)) => Ok(false), - Err(e) => { - println!("ERROR!"); - Err(e.to_string()) - }, - } - }, - Err(e) => Err(e.to_string()), - }; - println!("check4"); - tx2.send(result).unwrap(); // Send the result back to the main thread - }); - }); - - // Block until the async operation is complete - rx2.recv().unwrap().map_err(|e| payjoin::Error::Server(e.into())) - }).expect("check5 failed"); - Ok(payjoin) + // // let rt = tokio::runtime::Runtime::new().unwrap(); + // // rt.block_on(async { + // // let proposal = poll_for_fallback_psbt(&http_client, &mut recv_session).await?; + // // // TODO start listening, on a job? + // // }) + // // TODO start listening, on a job? + // // TODO listen on thread for a payjoin request + // // spawn_recv_session(recv_session, pj).await?; + // }); + // TODO save session to DB before returning + // TODO start listening, on a job? + dbg!("made sesh"); + Ok(( + recv_session, + ohttp_keys, + )) } // fn complete_payjoin(self, payjoin: WantsOutputs) -> Result { @@ -254,132 +214,79 @@ impl PayjoinReceiver { Ok(()) } - async fn trigger_payout_queue( - &self, - account_id: AccountId, - name: String, - ) -> Result<(), ApplicationError> { - let payout_queue = self - .payout_queues - .find_by_name(account_id, name) - .await?; - job::spawn_payjoin_payout_queue(&self.pool, (payout_queue.account_id, payout_queue.id)) - .await?; + pub async fn spawn_recv_session(&self, mut session: RecvSession) -> Result<()> { + let payout_queues = self.payout_queues.clone(); + let pool = self.pool.clone(); + let addresses = self.addresses.clone(); + let network = self.network.clone(); + tokio::spawn(async move { + let qs = payout_queues.clone().list_by_account_id(session.account_id).await.unwrap(); + let payout_queue_id = &qs.first().unwrap().id; + let http_client = reqwest::Client::builder().build().unwrap(); + let proposal = poll_for_fallback_psbt(session.clone(), &http_client).await.unwrap(); + let wants_outputs = sanity_check(session.clone(), proposal, network, addresses.clone()).await.unwrap(); + job::spawn_process_payout_queue(&pool.clone(), (session.account_id, *payout_queue_id, wants_outputs)).await.unwrap(); + // let _ = self.resume_recv_session(session).await.unwrap();d + }); Ok(()) } -} - -pub async fn init_payjoin_session(address: payjoin::bitcoin::Address, pj: PayjoinReceiver, account_id: AccountId) -> Result<(ActiveSession, payjoin::OhttpKeys), anyhow::Error> { - let payjoin_dir = Url::parse("https://payjo.in").expect("Invalid URL"); - let ohttp_relays: [Url; 2] = [ - Url::parse("https://pj.bobspacebkk.com").expect("Invalid URL"), - Url::parse("https://ohttp-relay.obscuravpn.io").expect("Invalid URL"), - ]; - println!("fetch"); - let payjoin_dir_clone = payjoin_dir.clone(); - let ohttp_relay_clone = ohttp_relays[0].clone(); - let ohttp_keys = tokio::task::spawn_blocking(move || { - payjoin::io::fetch_ohttp_keys(ohttp_relay_clone, payjoin_dir_clone) - }).await?.await?; - let http_client = reqwest::Client::builder().build()?; - println!("fetched"); - fn random_ohttp_relay(ohttp_relays: [Url; 2]) -> Url { - use rand::seq::SliceRandom; - use rand::thread_rng; - ohttp_relays.choose(&mut thread_rng()).unwrap().clone() - } - println!("enroll"); - let mut enroller = payjoin::receive::v2::SessionInitializer::new( - address, - payjoin_dir.to_owned(), - ohttp_keys.clone(), - ohttp_relays[0].to_owned(), - None, - ); - println!("req"); - let (req, context) = enroller.extract_req().map_err(|e| anyhow::anyhow!(e.to_string()))?; - let ohttp_response = http_client - .post(req.url) - .header("Content-Type", "message/ohttp-req") - .body(req.body) - .send() - .await?; - let ohttp_response = ohttp_response.bytes().await?; - println!("res"); - let enrolled = enroller.process_res(ohttp_response.as_ref(), context).map_err(|e| anyhow::anyhow!(e.to_string()))?; - let recv_session = RecvSession { enrolled: enrolled.clone(), expiry: std::time::Duration::from_secs(60 * 60 * 24), payjoin_tx: None, account_id }; - // TODO listen on thread for a payjoin request - println!("made sesh"); - // TODO spawn job to listen for payjoin - // spawn_recv_session(recv_session, pj).await?; - Ok(( - enrolled, - ohttp_keys, - )) -} - -// pub async fn spawn_recv_session(session: RecvSession, pj: PayjoinReceiver) -> Result<()> { -// tokio::spawn(async move { -// let _ = resume_recv_session(session, pj).await; -// }); -// Ok(()) -// } -// async fn resume_recv_session(mut session: RecvSession, pj: PayjoinReceiver) -> Result { -// println!("RESUME RECEIVE SESSION"); -// let http_client = reqwest::Client::builder() -// .build()?; -// let proposal: UncheckedProposal = poll_for_fallback_psbt( -// &http_client, -// &mut session, -// ) -// .await?; -// println!("POLLED RECEIVE SESSION"); -// let _original_tx = proposal.extract_tx_to_schedule_broadcast(); -// let mut payjoin_proposal = match pj -// .sanity_check(session, proposal) -// .await -// .map_err(|e| anyhow::anyhow!(e.to_string())) -// { -// Ok(p) => p, -// Err(e) => { -// // TODO pj.wallet.broadcast_transaction(original_tx).await?; -// return Err(e.into()); -// } -// }; - -// let (req, ohttp_ctx) = payjoin_proposal -// .extract_v2_req().map_err(|e| anyhow::anyhow!(e.to_string()))?; -// let res = http_client -// .post(req.url) -// .header("Content-Type", "message/ohttp-req") -// .body(req.body) -// .send() +// async fn resume_recv_session(self, mut session: RecvSession) -> Result { +// dbg!("RESUME RECEIVE SESSION"); +// let http_client = reqwest::Client::builder() +// .build()?; +// let proposal: UncheckedProposal = poll_for_fallback_psbt( +// session, +// &http_client, +// ) // .await?; +// dbg!("POLLED RECEIVE SESSION"); +// let _original_tx = proposal.extract_tx_to_schedule_broadcast(); +// let mut payjoin_proposal = match sanity_check(session, proposal, self.network, self.addresses) +// .await +// .map_err(|e| anyhow::anyhow!(e.to_string())) +// { +// Ok(p) => p, +// Err(e) => { +// // TODO pj.wallet.broadcast_transaction(original_tx).await?; +// return Err(e.into()); +// } +// }; -// let res = res.bytes().await?; -// // enroll must succeed -// let _res = payjoin_proposal -// .deserialize_res(res.to_vec(), ohttp_ctx).map_err(|e| anyhow::anyhow!(e.to_string()))?; -// let payjoin_tx = payjoin_proposal.psbt().clone().extract_tx(); -// let payjoin_txid = payjoin_tx.txid(); -// // TODO -// // wallet -// // .insert_tx( -// // payjoin_tx.clone(), -// // ConfirmationTime::unconfirmed(utils::now().as_secs()), -// // None, -// // ) -// // .await?; -// // session.payjoin_tx = Some(payjoin_tx); -// // storage.update_recv_session(session)?; -// Ok(payjoin_txid) -// } +// let (req, ohttp_ctx) = payjoin_proposal +// .extract_v2_req().map_err(|e| anyhow::anyhow!(e.to_string()))?; +// let res = http_client +// .post(req.url) +// .header("Content-Type", "message/ohttp-req") +// .body(req.body) +// .send() +// .await?; -async fn poll_for_fallback_psbt( +// let res = res.bytes().await?; +// // enroll must succeed +// let _res = payjoin_proposal +// .deserialize_res(res.to_vec(), ohttp_ctx).map_err(|e| anyhow::anyhow!(e.to_string()))?; +// let payjoin_tx = payjoin_proposal.psbt().clone().extract_tx(); +// let payjoin_txid = payjoin_tx.txid(); +// // TODO +// // wallet +// // .insert_tx( +// // payjoin_tx.clone(), +// // ConfirmationTime::unconfirmed(utils::now().as_secs()), +// // None, +// // ) +// // .await?; +// // session.payjoin_tx = Some(payjoin_tx); +// // storage.update_recv_session(session)?; +// Ok(payjoin_txid) +// } +} + +pub async fn poll_for_fallback_psbt( + session: RecvSession, client: &reqwest::Client, - session: &mut crate::payjoin::RecvSession, ) -> Result { + let mut session = session.session; loop { // if stop.load(Ordering::Relaxed) { // return Err(crate::payjoin::Error::Shutdown); @@ -395,7 +302,7 @@ async fn poll_for_fallback_psbt( // return Err(crate::payjoin::Error::SessionExpired); // } println!("POLLING RECEIVE SESSION"); - let (req, context) = session.enrolled.extract_req().map_err(|e| anyhow::anyhow!(e.to_string()))?; + let (req, context) = session.extract_req().map_err(|e| anyhow::anyhow!(e.to_string()))?; let ohttp_response = client .post(req.url) .header("Content-Type", "message/ohttp-req") @@ -404,7 +311,6 @@ async fn poll_for_fallback_psbt( .await?; let ohttp_response = ohttp_response.bytes().await?; let proposal = session - .enrolled .process_res(ohttp_response.as_ref(), context).map_err(|e| anyhow::anyhow!(e.to_string()))?; match proposal { Some(proposal) => return Ok(proposal), @@ -413,12 +319,126 @@ async fn poll_for_fallback_psbt( } } +pub async fn sanity_check( + session: RecvSession, + proposal: UncheckedProposal, + network: Network, + addresses: Addresses, +) -> Result> { + // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx + let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); + // we have to look up the output address from a list of payjoin addresses that should NOT contain change addresses + // if we hit 2x payjoin addresses, we should abort + let account_id = session.account_id; + + // Receive Check 1: Can Broadcast + let proposal = proposal.check_broadcast_suitability(None, |_tx| { + // TODO test_mempool_accept e.g.: + // + // Fulcrum does not yet support this, so we need to devise a way to check this to the best of our ability + // Probably by using bitcoind directly and deprecating Fulcrum + Ok(true) + }).expect("check1 failed"); + dbg!("check2"); + let network = network.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + // Receive Check 2: receiver can't sign for proposal inputs + let proposal = proposal.check_inputs_not_owned(|input| { + // Spawn a new thread for each input check + let tx = tx.clone(); + let addresses = addresses.clone(); + let input = input.to_string(); + let network = network.clone(); + tokio::spawn(async move { + let result = match bitcoin::BdkAddress::from_str(&input) { + Ok(address) => { + match addresses.find_by_address(account_id, address.require_network(network).unwrap().to_string()).await { + Ok(_) => Ok(true), + Err(AddressError::AddressNotFound(_)) => Ok(false), + Err(e) => { + eprintln!("ERROR: {}", e); + Err(e.to_string()) + }, + } + }, + Err(e) => Err(e.to_string()), + }; + tx.send(result).unwrap(); + }); + + // This will block until the async operation is complete + rx.recv().unwrap().map_err(|e| payjoin::Error::Server(e.into())) + }).expect("check2 failed"); + dbg!("check3"); + + // Receive Check 3: receiver can't sign for proposal inputs + let proposal = proposal.check_no_mixed_input_scripts()?; + + // Receive Check 4: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers. + let payjoin = proposal.check_no_inputs_seen_before(|input| { + // TODO implement input_seen_before database check + // Ok(!self.insert_input_seen_before(*input).map_err(|e| Error::Server(e.into()))?) + Ok(false) + }).expect("check4 failed"); + + // Receive Check 4: receiver can't sign for proposal inputs + let network = network.clone(); + let (tx2, rx2) = std::sync::mpsc::channel(); + let mut payjoin = payjoin.identify_receiver_outputs(|output_script| { + // Clone transmitter for each output_script + let tx2 = tx2.clone(); + let addresses = addresses.clone(); + let output_script = output_script.to_string(); + // Spawn a new thread for each output_script check + std::thread::spawn(move || { + dbg!("check4"); + let rt = tokio::runtime::Runtime::new().unwrap(); // Create a new runtime for the thread + rt.block_on(async { + let result = match bitcoin::BdkAddress::from_str(&output_script) { + Ok(address) => { + match addresses.find_by_address(account_id, address.assume_checked().to_string()).await { + Ok(_) => Ok(true), // TODO: Confirm ownership logic if needed + Err(AddressError::AddressNotFound(_)) => Ok(false), + Err(e) => { + dbg!("ERROR!"); + Err(e.to_string()) + }, + } + }, + Err(e) => Err(e.to_string()), + }; + dbg!("check4"); + tx2.send(result).unwrap(); // Send the result back to the main thread + }); + }); + + // Block until the async operation is complete + rx2.recv().unwrap().map_err(|e| payjoin::Error::Server(e.into())) + }).expect("check5 failed"); + Ok(payjoin) +} + +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{self, Sender, Receiver}; +use std::thread; + +pub(crate) enum ProcessPsbtControl { + Pause, + Resume, + Stop, +} + +/// sign and finalize the proposal psbt +pub fn wallet_process_psbt(psbt: bitcoin::Psbt) -> Result { + Ok(psbt) +} + #[derive(Debug, Clone, PartialEq)] pub struct RecvSession { - pub enrolled: ActiveSession, + pub account_id: AccountId, + pub session: ActiveSession, pub expiry: Duration, pub payjoin_tx: Option, - pub account_id: AccountId, } // impl RecvSession { diff --git a/src/wallet/psbt_builder.rs b/src/wallet/psbt_builder.rs index 17300958..777ab46a 100644 --- a/src/wallet/psbt_builder.rs +++ b/src/wallet/psbt_builder.rs @@ -1,7 +1,5 @@ use bdk::{ - database::BatchDatabase, - wallet::{tx_builder::TxOrdering, AddressIndex, AddressInfo}, - FeeRate, Wallet, + bitcoin::hashes::Hash, database::BatchDatabase, wallet::{tx_builder::TxOrdering, AddressIndex, AddressInfo}, FeeRate, Wallet }; use derive_builder::Builder; use std::{ @@ -44,6 +42,7 @@ pub struct FinishedPsbtBuild { pub fee_satoshis: Satoshis, pub tx_id: Option, pub psbt: Option, + pub provisional_proposal: Option, } impl FinishedPsbtBuild { @@ -76,8 +75,8 @@ pub struct PsbtBuilderConfig { for_estimation: bool, #[builder(default)] force_min_change_output: Option, - // #[builder(default)] - // payjoin_proposal: Option, + #[builder(default)] + wants_outputs: Option, } impl PsbtBuilderConfig { @@ -129,6 +128,7 @@ pub struct PsbtBuilder { result: FinishedPsbtBuild, input_weights: HashMap, all_included_utxos: HashSet, + provisional_proposal: Option, _phantom: PhantomData, } @@ -193,7 +193,7 @@ impl PsbtBuilder { sum.keychains_with_inputs .extend(keychain_utxos.keys().copied()); } - + ret.provisional_proposal = self.provisional_proposal; ret } } @@ -243,7 +243,9 @@ impl PsbtBuilder { fee_satoshis: Satoshis::from(0), tx_id: None, psbt: None, + provisional_proposal: None, }, + provisional_proposal: None, _phantom: PhantomData, } } @@ -267,6 +269,7 @@ impl PsbtBuilder { all_included_utxos: self.all_included_utxos, input_weights: self.input_weights, result: self.result, + provisional_proposal: self.provisional_proposal, _phantom: PhantomData, } } @@ -337,6 +340,7 @@ impl PsbtBuilder { all_included_utxos: self.all_included_utxos, input_weights: self.input_weights, result: self.result, + provisional_proposal: self.provisional_proposal, _phantom: PhantomData, } } @@ -403,6 +407,24 @@ impl BdkWalletVisitor for PsbtBuilder { } } + // add foreign payjoin utxos + // *try* Handle payjoin to see what happens. visit_bdk_wallet will actually use the state machine + if let Some(wants_outputs) = self.cfg.wants_outputs { + use std::str::FromStr; + let mut payjoin_original_psbt = psbt::Psbt::from_str(&wants_outputs.original_psbt().to_string()).expect("failed to parse payjoin original psbt"); + let current_wallet_owned_vouts = wants_outputs.owned_vouts(); + for i in (0..payjoin_original_psbt.unsigned_tx.output.len()).rev() { + if !current_wallet_owned_vouts.contains(&i) { + payjoin_original_psbt.outputs.remove(i); + payjoin_original_psbt.unsigned_tx.output.remove(i); + } + } + // for include each remaining payjoin output + for output in payjoin_original_psbt.unsigned_tx.output.iter() { + builder.add_recipient(output.script_pubkey, output.value); + } + } + let mut total_output_satoshis = Satoshis::from(0); for (payout_id, destination, satoshis) in self.current_payouts.drain(..max_payout) { total_output_satoshis += satoshis; @@ -464,6 +486,25 @@ impl BdkWalletVisitor for PsbtBuilder { builder.ordering(TxOrdering::Bip69Lexicographic); match builder.finish() { Ok((psbt, details)) => { + if let Some(wants_outputs) = self.cfg.wants_outputs { + use std::str::FromStr; + // convert psbt unsigned_tx.output to payjoin::bitcoin::TxOut + let replacement_outputs: Vec = psbt.unsigned_tx.output.into_iter().map(|out| payjoin::bitcoin::TxOut { + value: payjoin::bitcoin::Amount::from_sat(out.value.into()), + script_pubkey: payjoin::bitcoin::ScriptBuf::from_bytes(out.script_pubkey.to_bytes()), + }).collect(); + let payjoin_drain_script = payjoin::bitcoin::ScriptBuf::from_bytes(change_address.script_pubkey().to_bytes()); + let wants_inputs = wants_outputs.replace_receiver_outputs(replacement_outputs, &payjoin_drain_script).unwrap().commit_outputs(); + + let inputs: Vec<_> = psbt.unsigned_tx.input.into_iter().zip(psbt.inputs).map(|(txin, psbt_input)| ( + payjoin::bitcoin::OutPoint::new(payjoin::bitcoin::Txid::from_str(&txin.previous_output.txid.to_string()).unwrap(), txin.previous_output.vout), + payjoin::bitcoin::TxOut { + value: payjoin::bitcoin::Amount::from_sat(psbt_input.witness_utxo.unwrap().value.into()), + script_pubkey: payjoin::bitcoin::ScriptBuf::from_bytes(psbt_input.witness_utxo.unwrap().script_pubkey.to_bytes()), + } + )).collect(); + self.provisional_proposal = Some(wants_inputs.contribute_witness_inputs(inputs).unwrap().commit_inputs()); + } // FIXME I think the fee is definitely wrong since proposal.apply_fee has not been called let fee_satoshis = Satoshis::from(details.fee.expect("fee must be present")); let current_wallet_fee = fee_satoshis - self.result.fee_satoshis; let wallet_id = self.current_wallet.expect("current wallet must be set"); @@ -546,6 +587,7 @@ impl PsbtBuilder { all_included_utxos: self.all_included_utxos, input_weights: self.input_weights, result: self.result, + provisional_proposal: self.provisional_proposal, _phantom: PhantomData, } } @@ -596,6 +638,27 @@ impl PsbtBuilder { } let mut foreign_utxos = HashSet::new(); + // add foreign payjoin utxos + // *try* Handle payjoin to see what happens. visit_bdk_wallet will actually use the state machine + if let Some(wants_outputs) = self.cfg.wants_outputs { + use std::str::FromStr; + let mut payjoin_original_psbt = psbt::Psbt::from_str(&wants_outputs.original_psbt().to_string()).expect("failed to parse payjoin original psbt"); + let current_wallet_owned_vouts = wants_outputs.owned_vouts(); + for i in (0..payjoin_original_psbt.unsigned_tx.output.len()).rev() { + if !current_wallet_owned_vouts.contains(&i) { + payjoin_original_psbt.outputs.remove(i); + payjoin_original_psbt.unsigned_tx.output.remove(i); + } + } + // for each remaining output, still pay that change + for output in payjoin_original_psbt.unsigned_tx.output.iter() { + builder.add_recipient(output.script_pubkey, output.value); + } + + // add inputs in following loop + self.current_wallet_psbts.push((keychain_id, payjoin_original_psbt)); + } + for (_, psbt) in self.current_wallet_psbts.iter() { for (input, psbt_input) in psbt.unsigned_tx.input.iter().zip(psbt.inputs.iter()) { foreign_utxos.insert(input.previous_output); @@ -643,6 +706,7 @@ impl PsbtBuilder { true, )) } + // TODO different case for payjoin? Err(bdk::Error::InsufficientFunds { .. }) => Ok((0, Vec::new(), false)), Err(e) => Err(e.into()), }