Skip to content

Commit

Permalink
Refactoring, test clean-up and shutdown handling
Browse files Browse the repository at this point in the history
This commit contains mostly refactoring and test improvements.
It also contains a fix for the case where `stfu` and `shutdown` are sent
concurrently.
  • Loading branch information
t-bast authored and remyers committed Nov 14, 2023
1 parent 89f8949 commit 9fcc63b
Show file tree
Hide file tree
Showing 9 changed files with 464 additions and 440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,5 @@ data class InvalidFailureCode (override val channelId: Byte
data class PleasePublishYourCommitment (override val channelId: ByteVector32) : ChannelException(channelId, "please publish your local commitment")
data class CommandUnavailableInThisState (override val channelId: ByteVector32, val state: String) : ChannelException(channelId, "cannot execute command in state=$state")
data class ForbiddenDuringSplice (override val channelId: ByteVector32, val command: String?) : ChannelException(channelId, "cannot process $command while splicing")
data class ForbiddenDuringQuiescence (override val channelId: ByteVector32, val command: String?) : ChannelException(channelId, "cannot process $command while quiescent")
data class InvalidSpliceRequest (override val channelId: ByteVector32) : ChannelException(channelId, "invalid splice request")
// @formatter:on
13 changes: 9 additions & 4 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,16 @@ data class Commitments(
// @formatter:on

/**
* Whenever we're not sure the `IncomingPaymentHandler` has received our previous
* `ChannelAction.ProcessIncomingHtlcs`, or when we may have ignored the responses from the
* `IncomingPaymentHandler` (eg. while quiescent), we need to reprocess those incoming HTLCs.
* Whenever we're not sure the `IncomingPaymentHandler` has received our previous `ChannelAction.ProcessIncomingHtlcs`,
* or when we may have ignored the responses from the `IncomingPaymentHandler` (eg. while quiescent or disconnected),
* we need to reprocess those incoming HTLCs.
*/
fun reprocessIncomingHtlcs(): List<ChannelAction> {
fun reprocessIncomingHtlcs(): List<ChannelAction.ProcessIncomingHtlc> {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been forwarded to the payment handler).
// They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when we subsequently sign it.
// That's why we need to look in *their* commitment with direction=OUT.
//
// We also need to filter out htlcs that we already settled and signed (the settlement messages are being retransmitted).
val alreadySettled = changes.localChanges.signed.filterIsInstance<HtlcSettlementMessage>().map { it.id }.toSet()
return latest.remoteCommit.spec.htlcs.outgoings().filter { !alreadySettled.contains(it.id) }.map { ChannelAction.ProcessIncomingHtlc(it) }
}
Expand Down
61 changes: 27 additions & 34 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,17 @@ data class Normal(
override fun updateCommitments(input: Commitments): ChannelStateWithCommitments = this.copy(commitments = input)

override fun ChannelContext.processInternal(cmd: ChannelCommand): Pair<ChannelState, List<ChannelAction>> {
if (cmd is ChannelCommand.ForbiddenDuringQuiescence && spliceStatus is QuiescenceNegotiation) {
val error = ForbiddenDuringQuiescence(channelId, cmd::class.simpleName)
return when (cmd) {
is ChannelCommand.Htlc.Settlement -> {
// Htlc settlement commands are ignored and will be replayed when not quiescent.
// This could create issues if we're keeping htlcs that should be settled pending for too long, as they could timeout.
logger.warning { "ignoring ${cmd::class.simpleName} for htlc #${cmd.id} during quiescence: will be replayed once quiescence ends" }
Pair(this@Normal, listOf())
}
else -> handleCommandError(cmd, error, channelUpdate)
}
}
if (cmd is ChannelCommand.ForbiddenDuringSplice && spliceStatus is QuiescentSpliceStatus) {
val error = ForbiddenDuringSplice(channelId, cmd::class.simpleName)
val forbiddenPreSplice = cmd is ChannelCommand.ForbiddenDuringQuiescence && spliceStatus is QuiescenceNegotiation
val forbiddenDuringSplice = cmd is ChannelCommand.ForbiddenDuringSplice && spliceStatus is QuiescentSpliceStatus
if (forbiddenPreSplice || forbiddenDuringSplice) {
return when (cmd) {
is ChannelCommand.Htlc.Settlement -> {
// Htlc settlement commands are ignored and will be replayed when the splice completes.
// This could create issues if we're keeping htlcs that should be settled pending for too long, as they could timeout.
logger.warning { "ignoring ${cmd::class.simpleName} for htlc #${cmd.id} during splice: will be replayed once splice is complete" }
Pair(this@Normal, listOf())
}
else -> handleCommandError(cmd, error, channelUpdate)
else -> handleCommandError(cmd, ForbiddenDuringSplice(channelId, cmd::class.simpleName), channelUpdate)
}
}
return when (cmd) {
Expand Down Expand Up @@ -132,16 +121,14 @@ data class Normal(
}
is ChannelCommand.MessageReceived -> when {
cmd.message is ForbiddenMessageDuringSplice && spliceStatus is QuiescentSpliceStatus -> {
logger.warning {"received forbidden message ${cmd::class.simpleName} during splicing with status ${spliceStatus}" }
val error = ForbiddenDuringSplice(channelId, cmd.message::class.simpleName)
val warn = ChannelAction.Message.Send(Warning(channelId, error.message))
logger.warning { "received forbidden message ${cmd::class.simpleName} during splicing with status ${spliceStatus::class.simpleName}" }
// Instead of force-closing (which would cost us on-chain fees), we try to resolve this issue by disconnecting.
// This will abort the splice attempt if it hasn't been signed yet, and restore the channel to a clean state.
// If the splice attempt was signed, it gives us an opportunity to re-exchange signatures on reconnection before
// the forbidden message. It also provides the opportunity for our peer to update their node to get rid of that
// bug and resume normal execution.
val actions = buildList {
add(warn)
add(ChannelAction.Message.Send(Warning(channelId, ForbiddenDuringSplice(channelId, cmd.message::class.simpleName).message)))
add(ChannelAction.Disconnect)
}
Pair(this@Normal, actions)
Expand Down Expand Up @@ -357,9 +344,21 @@ data class Normal(
}
}
}

is Stfu -> if (commitments.remoteIsQuiescent()) {
when (spliceStatus) {
is Stfu -> when {
localShutdown != null -> {
logger.warning { "our peer sent stfu but we sent shutdown first" }
// We don't need to do anything, they should accept our shutdown.
Pair(this@Normal, listOf())
}
!commitments.remoteIsQuiescent() -> {
logger.warning { "our peer sent stfu but is not quiescent" }
val actions = buildList {
add(ChannelAction.Message.Send(Warning(channelId, InvalidSpliceNotQuiescent(channelId).message)))
add(ChannelAction.Disconnect)
}
Pair(this@Normal.copy(spliceStatus = SpliceStatus.None), actions)
}
else -> when (spliceStatus) {
is SpliceStatus.None -> {
if (commitments.localIsQuiescent()) {
Pair(this@Normal.copy(spliceStatus = SpliceStatus.NonInitiatorQuiescent), listOf(ChannelAction.Message.Send(Stfu(channelId, initiator = false))))
Expand All @@ -386,9 +385,10 @@ data class Normal(
localOutputs = spliceStatus.command.spliceOutputs,
targetFeerate = spliceStatus.command.feerate
)
val commitTxFees = if (commitments.params.localParams.isInitiator) {
Transactions.commitTxFee(commitments.params.remoteParams.dustLimit, parentCommitment.remoteCommit.spec)
} else 0.sat
val commitTxFees = when {
commitments.params.localParams.isInitiator -> Transactions.commitTxFee(commitments.params.remoteParams.dustLimit, parentCommitment.remoteCommit.spec)
else -> 0.sat
}
if (parentCommitment.localCommit.spec.toLocal + fundingContribution.toMilliSatoshi() < parentCommitment.localChannelReserve(commitments.params).max(commitTxFees)) {
logger.warning { "cannot do splice: insufficient funds" }
spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.InsufficientFunds)
Expand Down Expand Up @@ -431,20 +431,13 @@ data class Normal(
spliceStatus.command.replyTo.complete(ChannelCommand.Commitment.Splice.Response.Failure.ConcurrentRemoteSplice)
Pair(this@Normal.copy(spliceStatus = SpliceStatus.NonInitiatorQuiescent), emptyList())
}
} else -> {
}
else -> {
logger.warning { "ignoring duplicate stfu" }
Pair(this@Normal, emptyList())
}
}
} else {
logger.warning { "our peer sent stfu but is not quiescent" }
val actions = buildList {
add(ChannelAction.Message.Send(Warning(channelId, InvalidSpliceNotQuiescent(channelId).message)))
add(ChannelAction.Disconnect)
}
Pair(this@Normal.copy(spliceStatus = SpliceStatus.None), actions)
}

is SpliceInit -> when (spliceStatus) {
is SpliceStatus.None -> {
logger.warning { "rejecting splice attempt: quiescence not negotiated" }
Expand Down
14 changes: 3 additions & 11 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import fr.acinq.lightning.ShortChannelId
import fr.acinq.lightning.blockchain.*
import fr.acinq.lightning.channel.*
import fr.acinq.lightning.crypto.KeyManager
import fr.acinq.lightning.transactions.outgoings
import fr.acinq.lightning.utils.Either
import fr.acinq.lightning.utils.toByteVector
import fr.acinq.lightning.wire.*
Expand Down Expand Up @@ -428,16 +427,9 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
// When a channel is reestablished after a wallet restarts, we need to reprocess incoming HTLCs that may have been only partially processed
// (either because they didn't reach the payment handler, or because the payment handler response didn't reach the channel).
// Otherwise these HTLCs will stay in our commitment until they timeout and our peer closes the channel.
//
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been forwarded to the payment handler).
// They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when we subsequently sign it.
// That's why we need to look in *their* commitment with direction=OUT.
//
// We also need to filter out htlcs that we already settled and signed (the settlement messages are being retransmitted).
val alreadySettled = commitments1.changes.localChanges.signed.filterIsInstance<HtlcSettlementMessage>().map { it.id }.toSet()
val htlcsToReprocess = commitments1.latest.remoteCommit.spec.htlcs.outgoings().filter { !alreadySettled.contains(it.id) }
logger.debug { "re-processing signed IN: $htlcsToReprocess" }
sendQueue.addAll(htlcsToReprocess.map { ChannelAction.ProcessIncomingHtlc(it) })
val htlcsToReprocess = commitments1.reprocessIncomingHtlcs()
logger.debug { "re-processing signed IN: ${htlcsToReprocess.map { it.add.id }.joinToString()}" }
sendQueue.addAll(htlcsToReprocess)

return Pair(commitments1, sendQueue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,13 +856,14 @@ data class ChannelReady(
data class Stfu(
override val channelId: ByteVector32,
val initiator: Boolean
) : SetupMessage, HasChannelId {
) : SetupMessage, HasChannelId {
override val type: Long get() = Stfu.type

override fun write(out: Output) {
LightningCodecs.writeBytes(channelId, out)
LightningCodecs.writeByte(if (initiator) 1 else 0, out)
}

companion object : LightningMessageReader<Stfu> {
const val type: Long = 2

Expand Down
10 changes: 5 additions & 5 deletions src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ object TestsHelper {
/**
* Cross sign nodes where nodeA initiate the signature exchange
*/
fun <T : ChannelStateWithCommitments> crossSign(nodeA: LNChannel<T>, nodeB: LNChannel<T>, commitmentsCount: Int = 1): Triple<LNChannel<T>, LNChannel<T>, List<ChannelAction>> {
fun <T : ChannelStateWithCommitments> crossSign(nodeA: LNChannel<T>, nodeB: LNChannel<T>, commitmentsCount: Int = 1): Pair<LNChannel<T>, LNChannel<T>> {
val sCommitIndex = nodeA.state.commitments.localCommitIndex
val rCommitIndex = nodeB.state.commitments.localCommitIndex
val rHasChanges = nodeB.state.commitments.changes.localHasChanges()
Expand All @@ -497,7 +497,7 @@ object TestsHelper {

val (sender2, sActions2) = receiveCommitSigs(sender1, commitSigs1)
val revokeAndAck1 = sActions2.findOutgoingMessage<RevokeAndAck>()
val (receiver2, rActions2) = receiver1.process(ChannelCommand.MessageReceived(revokeAndAck1))
val (receiver2, _) = receiver1.process(ChannelCommand.MessageReceived(revokeAndAck1))
assertIs<LNChannel<T>>(receiver2)

if (rHasChanges) {
Expand All @@ -508,7 +508,7 @@ object TestsHelper {

val (receiver3, rActions3) = receiveCommitSigs(receiver2, commitSigs2)
val revokeAndAck2 = rActions3.findOutgoingMessage<RevokeAndAck>()
val (sender4, sActions4) = sender3.process(ChannelCommand.MessageReceived(revokeAndAck2))
val (sender4, _) = sender3.process(ChannelCommand.MessageReceived(revokeAndAck2))

assertIs<LNChannel<T>>(sender4)
assertIs<LNChannel<T>>(receiver3)
Expand All @@ -517,15 +517,15 @@ object TestsHelper {
assertEquals(rCommitIndex + 2, receiver3.commitments.localCommitIndex)
assertEquals(sCommitIndex + 1, receiver3.commitments.remoteCommitIndex)

return Triple(sender4, receiver3, sActions4 + rActions3)
return sender4 to receiver3
} else {
assertIs<LNChannel<T>>(sender2)
assertEquals(sCommitIndex + 1, sender2.commitments.localCommitIndex)
assertEquals(rCommitIndex + 1, sender2.commitments.remoteCommitIndex)
assertEquals(rCommitIndex + 1, receiver2.commitments.localCommitIndex)
assertEquals(sCommitIndex + 1, receiver2.commitments.remoteCommitIndex)

return Triple(sender2, receiver2, sActions2 + rActions2)
return sender2 to receiver2
}
}

Expand Down
Loading

0 comments on commit 9fcc63b

Please sign in to comment.