diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt index d4e1f17c3..7223f77c5 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt @@ -263,9 +263,8 @@ data class Commitment( * Otherwise when we get close to the timeout, we risk an on-chain race condition between their HTLC timeout * and our HTLC success in case of a force-close. */ - fun almostTimedOutIncomingHtlcs(blockHeight: Long, fulfillSafety: CltvExpiryDelta, changes: CommitmentChanges): Set { - val relayedFulfills = changes.localChanges.all.filterIsInstance().map { it.id }.toSet() - return localCommit.spec.htlcs.incomings().filter { relayedFulfills.contains(it.id) && blockHeight >= (it.cltvExpiry - fulfillSafety).toLong() }.toSet() + fun almostTimedOutIncomingHtlcs(blockHeight: Long, fulfillSafety: CltvExpiryDelta): Set { + return localCommit.spec.htlcs.incomings().filter { blockHeight >= (it.cltvExpiry - fulfillSafety).toLong() }.toSet() } fun getOutgoingHtlcCrossSigned(htlcId: Long): UpdateAddHtlc? { @@ -566,11 +565,55 @@ data class Commitments( fun hasPendingOrProposedHtlcs(): Boolean = active.first().hasPendingOrProposedHtlcs(changes) fun timedOutOutgoingHtlcs(currentHeight: Long): Set = active.first().timedOutOutgoingHtlcs(currentHeight) - fun almostTimedOutIncomingHtlcs(currentHeight: Long, fulfillSafety: CltvExpiryDelta): Set = active.first().almostTimedOutIncomingHtlcs(currentHeight, fulfillSafety, changes) + fun almostTimedOutIncomingHtlcs(currentHeight: Long, fulfillSafety: CltvExpiryDelta): Set = active.first().almostTimedOutIncomingHtlcs(currentHeight, fulfillSafety) fun getOutgoingHtlcCrossSigned(htlcId: Long): UpdateAddHtlc? = active.first().getOutgoingHtlcCrossSigned(htlcId) fun getIncomingHtlcCrossSigned(htlcId: Long): UpdateAddHtlc? = active.first().getIncomingHtlcCrossSigned(htlcId) // @formatter:on + fun processIncomingHtlcs(htlcs: List): List { + // Since htlcs are shared across all commitments, we generate the actions only once based on the first commitment. + val remoteCommit = active.first().remoteCommit + val actions = mutableListOf() + htlcs.forEach { + when (it) { + is UpdateAddHtlc -> actions += ChannelAction.ProcessIncomingHtlc(it) + is UpdateFailHtlc -> { + val paymentId = payments[it.id] + val add = remoteCommit.spec.findIncomingHtlcById(it.id)?.add + if (paymentId != null && add != null) { + actions += ChannelAction.ProcessCmdRes.AddSettledFail(paymentId, add, ChannelAction.HtlcResult.Fail.RemoteFail(it)) + } + } + is UpdateFailMalformedHtlc -> { + val paymentId = payments[it.id] + val add = remoteCommit.spec.findIncomingHtlcById(it.id)?.add + if (paymentId != null && add != null) { + actions += ChannelAction.ProcessCmdRes.AddSettledFail(paymentId, add, ChannelAction.HtlcResult.Fail.RemoteFailMalformed(it)) + } + } + else -> Unit + } + } + return actions + } + + // When a channel is reestablished after a wallet restarts, we need to reprocess incoming HTLCs that may have been only partially processed + // (either because they didn't reach the payment handler, or because the payment handler response didn't reach the channel). + // Otherwise these HTLCs will stay in our commitment until they timeout and our peer closes the channel. + // + // We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been forwarded to the payment handler). + // They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when we subsequently sign it. + // That's why we need to look in *their* commitment with direction=OUT. + // + // We also need to filter out htlcs that we already settled and signed (the settlement messages are being retransmitted). + fun reprocessIncomingHtlcs(log: MDCLogger): List { + val alreadySettled = changes.localChanges.signed.filterIsInstance().map { it.id }.toSet() + val htlcsToReprocess = latest.remoteCommit.spec.htlcs.outgoings().filter { !alreadySettled.contains(it.id) } + log.debug { "re-processing signed IN: $htlcsToReprocess" } + + return processIncomingHtlcs(htlcsToReprocess) + } + fun sendAdd(cmd: ChannelCommand.Htlc.Add, paymentId: UUID, blockHeight: Long): Either> { val maxExpiry = Channel.MAX_CLTV_EXPIRY_DELTA.toCltvExpiry(blockHeight) // we don't want to use too high a refund timeout, because our funds will be locked during that time if the payment is never fulfilled @@ -766,27 +809,9 @@ data class Commitments( } // we remove the newly completed htlcs from the payments map val payments1 = payments - completedOutgoingHtlcs.toSet() - val actions = mutableListOf() - changes.remoteChanges.signed.forEach { - when (it) { - is UpdateAddHtlc -> actions += ChannelAction.ProcessIncomingHtlc(it) - is UpdateFailHtlc -> { - val paymentId = payments[it.id] - val add = remoteCommit.spec.findIncomingHtlcById(it.id)?.add - if (paymentId != null && add != null) { - actions += ChannelAction.ProcessCmdRes.AddSettledFail(paymentId, add, ChannelAction.HtlcResult.Fail.RemoteFail(it)) - } - } - is UpdateFailMalformedHtlc -> { - val paymentId = payments[it.id] - val add = remoteCommit.spec.findIncomingHtlcById(it.id)?.add - if (paymentId != null && add != null) { - actions += ChannelAction.ProcessCmdRes.AddSettledFail(paymentId, add, ChannelAction.HtlcResult.Fail.RemoteFailMalformed(it)) - } - } - else -> Unit - } - } + + val actions = processIncomingHtlcs(changes.remoteChanges.signed) + val active1 = active.map { it.copy(remoteCommit = it.nextRemoteCommit!!.commit, nextRemoteCommit = null) } val commitments1 = this.copy( active = active1, @@ -799,7 +824,7 @@ data class Commitments( payments = payments1, remoteChannelData = revocation.channelData ) - return Either.Right(Pair(commitments1, actions.toList())) + return Either.Right(Pair(commitments1, actions)) } private fun ChannelContext.updateFundingStatus(fundingTxId: ByteVector32, updateMethod: (Commitment, Long) -> Commitment): Either> { diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt index f8b5f2378..cd250cf3d 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt @@ -552,6 +552,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() { } Pair(nextState, buildList { + addAll(commitments.reprocessIncomingHtlcs(logger)) add(ChannelAction.Storage.StoreState(nextState)) addAll(localCommitPublished.run { doPublish(channelId, staticParams.nodeParams.minDepthBlocks.toLong()) }) }) @@ -585,6 +586,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() { mutualClosePublished = listOfNotNull(bestUnpublishedClosingTx) ) Pair(nexState, buildList { + addAll(commitments.reprocessIncomingHtlcs(logger)) add(ChannelAction.Storage.StoreState(nexState)) addAll(doPublish(bestUnpublishedClosingTx, nexState.channelId)) }) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt index a5fe34cc1..a64f133b8 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt @@ -425,20 +425,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent: sendQueue.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign)) } - // When a channel is reestablished after a wallet restarts, we need to reprocess incoming HTLCs that may have been only partially processed - // (either because they didn't reach the payment handler, or because the payment handler response didn't reach the channel). - // Otherwise these HTLCs will stay in our commitment until they timeout and our peer closes the channel. - // - // We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been forwarded to the payment handler). - // They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when we subsequently sign it. - // That's why we need to look in *their* commitment with direction=OUT. - // - // We also need to filter out htlcs that we already settled and signed (the settlement messages are being retransmitted). - val alreadySettled = commitments1.changes.localChanges.signed.filterIsInstance().map { it.id }.toSet() - val htlcsToReprocess = commitments1.latest.remoteCommit.spec.htlcs.outgoings().filter { !alreadySettled.contains(it.id) } - logger.debug { "re-processing signed IN: $htlcsToReprocess" } - sendQueue.addAll(htlcsToReprocess.map { ChannelAction.ProcessIncomingHtlc(it) }) - // TODO: do we need to replay settlement commands that came in while quiescent? or are they replayed here? + sendQueue.addAll(commitments1.reprocessIncomingHtlcs(logger)) return Pair(commitments1, sendQueue) }