Skip to content

Commit

Permalink
[Draft] run PayjoinApp
Browse files Browse the repository at this point in the history
  • Loading branch information
DanGould committed Jan 11, 2024
1 parent 78733ca commit 67c8d89
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 170 deletions.
3 changes: 3 additions & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod config;
mod server;

use crate::app::{error::*, *};
use crate::payjoin::{error::*, *};

pub use config::*;
pub use server::*;
Expand All @@ -26,6 +27,7 @@ pub async fn run_dev(
let profile = Profiles::new(&pool)
.find_by_key(dev_constants::BRIA_DEV_KEY)
.await?;
let pj = PayjoinApp::run(app.clone(), profile.clone()).await;
let (_, xpubs) = app
.create_wpkh_wallet(
&profile,
Expand Down Expand Up @@ -67,6 +69,7 @@ pub async fn run(
app_cfg: AppConfig,
) -> Result<(), ApplicationError> {
let app = App::run(pool, app_cfg).await?;
// let pj = PayjoinApp::run(app.clone()).await;
server::start(config, app).await?;
Ok(())
}
2 changes: 2 additions & 0 deletions src/api/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use proto::{bria_service_server::BriaService, *};
use super::config::*;
use crate::{
app::{error::ApplicationError, *},
payjoin::PayjoinApp,
payout_queue,
primitives::*,
profile,
Expand Down Expand Up @@ -882,6 +883,7 @@ pub(crate) async fn start(server_config: ApiConfig, app: App) -> Result<(), Appl
.add_service(BriaServiceServer::new(bria))
.serve(([0, 0, 0, 0], server_config.listen_port).into())
.await?;

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
xpub::*,
};

#[derive(Clone)]
#[allow(dead_code)]
pub struct App {
_runner: JobRunnerHandle,
Expand Down
7 changes: 0 additions & 7 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,13 +1048,6 @@ async fn run_cmd(
});
}));
let payjoin_send = send.clone();
handles.push(tokio::spawn(async move {
let _ = payjoin_send.try_send(
super::payjoin::run(pool, app)
.await
.context("Payjoin server error")
);
}));

let reason = receive.recv().await.expect("Didn't receive msg");
for handle in handles {
Expand Down
225 changes: 145 additions & 80 deletions src/payjoin/app.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,109 @@
use std::{str::FromStr, collections::HashMap};
use std::collections::HashMap;

use anyhow::{anyhow, Result};
use payjoin::{receive::{PayjoinProposal, UncheckedProposal, ProvisionalProposal}, Error};
use anyhow::{anyhow, Context, Result};
use payjoin::{
receive::{PayjoinProposal, ProvisionalProposal, UncheckedProposal},
Error,
};
use tracing::instrument;

use super::error::*;
use crate::primitives::bitcoin;

const BOOTSTRAP_KEY_NAME: &str = "payjoin_bootstrap_key";
type ProtoClient =
crate::api::proto::bria_service_client::BriaServiceClient<tonic::transport::Channel>;
use crate::{
app::{error::ApplicationError, App},
primitives::bitcoin,
profile::Profile,
};

#[derive(Clone)]
pub struct PayjoinApp {
// config: bitcoind ledger, wallet, pj_host, pj_endpoint
// ledger: Ledger,
// network: bitcoin::Network,
pool: sqlx::PgPool,
app: App,
profile: Profile,
}

impl PayjoinApp {
pub fn new(pool: sqlx::PgPool) -> Self {
Self {
pool,
}
pub async fn run(app: App, profile: Profile) -> Self {
Self { app, profile }
}

#[instrument(name = "payjoin_app.process_proposal", skip(self), err)]
fn process_proposal(&self, proposal: UncheckedProposal) -> Result<PayjoinProposal, Error> {
let bitcoind = self.bitcoind().map_err(|e| Error::Server(e.into()))?;

pub async fn process_proposal(
self,
proposal: UncheckedProposal,
) -> Result<PayjoinProposal, Error> {
// in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();

// The network is used for checks later
let network =
bitcoind.get_blockchain_info().map_err(|e| Error::Server(e.into())).and_then(
|info| bitcoin::Network::from_str(&info.chain).map_err(|e| Error::Server(e.into())),
)?;
let network = self.app.network();

// Receive Check 1: Can Broadcast
let proposal = proposal.check_broadcast_suitability(None, |tx| {
let raw_tx = bitcoin::consensus::encode::serialize_hex(&tx);
let mempool_results =
bitcoind.test_mempool_accept(&[raw_tx]).map_err(|e| Error::Server(e.into()))?;
match mempool_results.first() {
Some(result) => Ok(result.allowed),
None => Err(Error::Server(
anyhow!("No mempool results returned on broadcast check").into(),
)),
}
let _raw_tx = bitcoin::consensus::encode::serialize_hex(&tx);
// TODO test_mempool_accept e.g.:
//
// let mempool_results =
// bitcoind.test_mempool_accept(&[raw_tx]).map_err(|e| Error::Server(e.into()))?;
// match mempool_results.first() {
// Some(result) => Ok(result.allowed),
// None => Err(Error::Server(
// anyhow!("No mempool results returned on broadcast check").into(),
// )),
// }
Ok(true)
})?;
tracing::trace!("check1");

use bdk::bitcoin::{Script, ScriptBuf};
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::sync::Mutex;

// Create a channel
let (tx, rx) = mpsc::channel::<(ScriptBuf, Sender<Result<bool, ApplicationError>>)>();
let tx = Arc::new(Mutex::new(tx));

let app = self.app.clone();
let profile = self.profile.clone();
let network = network.clone();

// Receive Check 2: receiver can't sign for proposal inputs
tokio::spawn(async move {
while let Ok((input, response_tx)) = rx.recv() {
let result = check_script_owned(&app, &profile, network, &input).await;
let _ = response_tx.send(result);
}
});

async fn check_script_owned(
app: &App,
profile: &Profile,
network: bitcoin::Network,
input: &Script,
) -> Result<bool, ApplicationError> {
let address = bitcoin::BdkAddress::from_script(&input, network)?;
match app
.find_address(&profile.clone(), address.to_string())
.await
{
Err(ApplicationError::AddressError(e)) => Err(ApplicationError::AddressError(e)),
Err(_) => Ok(false), // good address script, but not found
Ok(_) => Ok(true),
}
}

let proposal = proposal.check_inputs_not_owned(|input| {
if let Ok(address) = bitcoin::BdkAddress::from_script(input, network) {
bitcoind
.get_address_info(&address)
.map(|info| info.is_mine.unwrap_or(false))
.map_err(|e| Error::Server(e.into()))
} else {
Ok(false)
let (response_tx, response_rx) = mpsc::channel::<Result<bool, ApplicationError>>();
{
let tx = tx.lock().expect("lock"); // TODO Handle lock error if needed
tx.send((input.to_owned(), response_tx))
.map_err(|e| Error::Server(e.into()))?;
}
let recv_res = response_rx.recv().map_err(|e| Error::Server(e.into()))?;
Ok(recv_res.map_err(|e| Error::Server(e.into()))?)
})?;

tracing::trace!("check2");
// Receive Check 3: receiver can't sign for proposal inputs
let proposal = proposal.check_no_mixed_input_scripts()?;
Expand All @@ -74,35 +117,47 @@ impl PayjoinApp {
})?;
tracing::trace!("check4");

// Receive Check 4: receiver can't sign for proposal inputs
let (tx, rx) = mpsc::channel::<(ScriptBuf, Sender<Result<bool, ApplicationError>>)>();
let tx = Arc::new(Mutex::new(tx));

let app = self.app.clone();
let profile = self.profile.clone();
let network = network.clone();
tokio::spawn(async move {
while let Ok((input, response_tx)) = rx.recv() {
let result = check_script_owned(&app, &profile, network, &input).await;
let _ = response_tx.send(result);
}
});

let mut provisional_payjoin = payjoin.identify_receiver_outputs(|output_script| {
if let Ok(address) = bitcoin::BdkAddress::from_script(output_script, network) {
bitcoind
.get_address_info(&address)
.map(|info| info.is_mine.unwrap_or(false))
.map_err(|e| Error::Server(e.into()))
} else {
Ok(false)
let (response_tx, response_rx) = mpsc::channel::<Result<bool, ApplicationError>>();
{
let tx = tx.lock().expect("lock"); // TODO Handle lock error if needed
tx.send((output_script.to_owned(), response_tx))
.map_err(|e| Error::Server(e.into()))?;
}
let recv_res = response_rx.recv().map_err(|e| Error::Server(e.into()))?;
Ok(recv_res.map_err(|e| Error::Server(e.into()))?)
})?;

if !self.config.sub_only {
// Select receiver payjoin inputs.
_ = try_contributing_inputs(&mut provisional_payjoin, &bitcoind)
.map_err(|e| tracing::warn!("Failed to contribute inputs: {}", e));
}
// Don't throw an error. Continue optimistic process even if we can't contribute inputs.
self.try_contributing_inputs(&mut provisional_payjoin)
.await
.map_err(|e| tracing::warn!("Failed to contribute inputs: {}", e));

let receiver_substitute_address = bitcoind
.get_new_address(None, None)
.map_err(|e| Error::Server(e.into()))?
.assume_checked();
provisional_payjoin.substitute_output_address(receiver_substitute_address);
// Output substitution could go here

let payjoin_proposal = provisional_payjoin.finalize_proposal(
|psbt: &bitcoin::psbt::Psbt| {
bitcoind
.wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, Some(false))
.map(|res| bitcoin::psbt::Psbt::from_str(&res.psbt).map_err(|e| Error::Server(e.into())))
.map_err(|e| Error::Server(e.into()))?
Err(Error::Server(anyhow!("TODO sign psbt").into()))
// TODO sign proposal psbt with our inputs & subbed outputs e.g.:
//
// bitcoind
// .wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, Some(false))
// .map(|res| bitcoin::psbt::Psbt::from_str(&res.psbt).map_err(|e| Error::Server(e.into())))
// .map_err(|e| Error::Server(e.into()))?
},
None, // TODO set to bitcoin::FeeRate::MIN or similar
)?;
Expand All @@ -114,36 +169,46 @@ impl PayjoinApp {
Ok(payjoin_proposal)
}


fn try_contributing_inputs(
payjoin: &mut ProvisionalProposal,
bitcoind: &bitcoincore_rpc::Client,
) -> Result<()> {
async fn try_contributing_inputs(self, payjoin: &mut ProvisionalProposal) -> Result<()> {
use bitcoin::OutPoint;

let available_inputs = bitcoind
.list_unspent(None, None, None, None, None)
.context("Failed to list unspent from bitcoind")?;
let candidate_inputs: HashMap<bitcoin::Amount, OutPoint> = available_inputs
let available_inputs = match self.app.list_utxos(&self.profile, "".to_owned()).await {
Err(e) => {
tracing::warn!("Failed to list utxos: {}", e);
return Ok(());
}
Ok((_, utxos)) => utxos,
};
let mut available_inputs = available_inputs
.iter()
.map(|i| (i.amount, OutPoint { txid: i.txid, vout: i.vout }))
.flat_map(|keychain_utxos| keychain_utxos.utxos.iter());
let candidate_inputs: HashMap<bitcoin::Amount, OutPoint> = available_inputs
.clone()
// Why is a utxo output value NOT saved in bitcoin::Amount? How can it be partial satoshis?
.map(|i| {
(
bitcoin::Amount::from_sat(i.value.into()),
i.outpoint.clone(),
)
})
.collect();

let selected_outpoint = payjoin.try_preserving_privacy(candidate_inputs).expect("gg");
let selected_outpoint = payjoin
.try_preserving_privacy(candidate_inputs)
.expect("gg");
let selected_utxo = available_inputs
.iter()
.find(|i| i.txid == selected_outpoint.txid && i.vout == selected_outpoint.vout)
.find(|i| i.outpoint == selected_outpoint)
.context("This shouldn't happen. Failed to retrieve the privacy preserving utxo from those we provided to the seclector.")?;
tracing::debug!("selected utxo: {:#?}", selected_utxo);

// calculate receiver payjoin outputs given receiver payjoin inputs and original_psbt,
let txo_to_contribute = bitcoin::TxOut {
value: selected_utxo.amount.to_sat(),
script_pubkey: selected_utxo.script_pub_key.clone(),
value: selected_utxo.value.into(),
script_pubkey: selected_utxo
.address
.clone()
.ok_or_else(|| anyhow!("selected_utxo missing script"))?
.script_pubkey(),
};
let outpoint_to_contribute =
bitcoin::OutPoint { txid: selected_utxo.txid, vout: selected_utxo.vout };
payjoin.contribute_witness_input(txo_to_contribute, outpoint_to_contribute);
payjoin.contribute_witness_input(txo_to_contribute, selected_outpoint);
Ok(())
}
}
}
Loading

0 comments on commit 67c8d89

Please sign in to comment.