Skip to content

Commit

Permalink
Try swap-in on liquidity policy changes (#548)
Browse files Browse the repository at this point in the history
We previously tried swapping funds in:

- at start-up
- when our wallet state was updated
- when a new block was found

We now also check for swaps when the liquidity policy changes or the swap
feerate changes.
  • Loading branch information
t-bast authored Oct 19, 2023
1 parent 1137cfe commit 56ade7e
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ import fr.acinq.lightning.channel.*
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.crypto.noise.*
import fr.acinq.lightning.db.*
import fr.acinq.lightning.payment.IncomingPaymentHandler
import fr.acinq.lightning.payment.OutgoingPaymentFailure
import fr.acinq.lightning.payment.OutgoingPaymentHandler
import fr.acinq.lightning.payment.PaymentRequest
import fr.acinq.lightning.payment.*
import fr.acinq.lightning.serialization.Encryption.from
import fr.acinq.lightning.serialization.Serialization.DeserializationResult
import fr.acinq.lightning.transactions.Transactions
Expand Down Expand Up @@ -411,30 +408,36 @@ class Peer(
receiveLoop() // This suspends until the coroutines is cancelled or the socket is closed
}

/** We try swapping funds in whenever one of those fields is updated. */
data class TrySwapInFlow(val currentBlockHeight: Int, val walletState: WalletState, val feerate: FeeratePerKw, val liquidityPolicy: LiquidityPolicy)

/**
* This function needs to be called after [Peer] is initialized, to start watching the swap-in wallet
* and trigger swap-ins.
* Warning: not thread-safe!
*/
suspend fun startWatchSwapInWallet() {
logger.info { "starting swap-in watch job" }
if (swapInJob != null) return
// wait to have a swap-in feerate available
logger.info { "waiting for feerates" }
swapInFeeratesFlow.filterNotNull().first()
if (swapInJob != null) {
logger.info { "swap-in watch job already started" }
return
}
logger.info { "waiting for peer to be ready" }
waitForPeerReady()
swapInJob = launch {
swapInWallet.walletStateFlow.combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> currentTip.first to walletState }
.filter { (_, walletState) -> walletState.consistent }
.collect { (currentBlockHeight, walletState) ->
swapInWallet.walletStateFlow
.filter { it.consistent }
.combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip.first) }
.combine(swapInFeeratesFlow.filterNotNull()) { (walletState, currentTip), feerate -> Triple(walletState, currentTip, feerate) }
.combine(nodeParams.liquidityPolicy) { (walletState, currentTip, feerate), policy -> TrySwapInFlow(currentTip, walletState, feerate, policy) }
.collect { w ->
// Local mutual close txs from pre-splice channels can be used as zero-conf inputs for swap-in to facilitate migration
val mutualCloseTxs = channels.values
.filterIsInstance<Closing>()
.filterNot { it.commitments.params.channelFeatures.hasFeature(Feature.DualFunding) }
.flatMap { state -> state.mutualClosePublished.map { closingTx -> closingTx.tx.txid } }
val trustedTxs = trustedSwapInTxs + mutualCloseTxs
swapInCommands.send(SwapInCommand.TrySwapIn(currentBlockHeight, walletState, walletParams.swapInParams, trustedTxs))
swapInCommands.send(SwapInCommand.TrySwapIn(w.currentBlockHeight, w.walletState, walletParams.swapInParams, trustedTxs))
}
}
}
Expand Down Expand Up @@ -768,6 +771,7 @@ class Peer(
// MUST ONLY BE SET BY processEvent()
private var peerConnection: PeerConnection? = null

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun processEvent(cmd: PeerCommand, logger: MDCLogger) {
when (cmd) {
is Connected -> {
Expand Down Expand Up @@ -1008,7 +1012,6 @@ class Peer(
val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger)

logger.info { "requesting splice-in using balance=${cmd.walletInputs.balance} feerate=$feerate fee=$fee" }

nodeParams.liquidityPolicy.value.maybeReject(cmd.walletInputs.balance.toMilliSatoshi(), fee.toMilliSatoshi(), LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting splice: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
Expand All @@ -1022,6 +1025,12 @@ class Peer(
spliceOut = null,
feerate = feerate
)
// If the splice fails, we immediately unlock the utxos to reuse them in the next attempt.
spliceCommand.replyTo.invokeOnCompletion { ex ->
if (ex == null && spliceCommand.replyTo.getCompleted() is ChannelCommand.Commitment.Splice.Response.Failure) {
swapInCommands.trySend(SwapInCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet()))
}
}
input.send(WrappedChannelCommand(channel.channelId, spliceCommand))
}
else -> {
Expand Down

0 comments on commit 56ade7e

Please sign in to comment.