Skip to content

Commit

Permalink
Asynchronously clean up obsolete HTLC info from DB (#2705)
Browse files Browse the repository at this point in the history
When a channel is closed, we can forget the data from historical HTLCs
sent and received through that channel (which is otherwise required to
punish cheating attempts by our peer).

We previously synchronously removed that data from the DB when the closing
transaction confirmed. However, this could create performance issues as
the `htlc_infos` table can be very large for busy nodes and many concurrent
writes may be happening at the same time.

We don't need to get rid of this data immediately: we only want to remove
it to avoid degrading the performance of active channels that read and
write to the `htlc_infos` table. We now mark channels as closed in a
dedicated table, and run a background actor that deletes batches of
obsolete htlc data at regular intervals. This ensures that the table is
eventually cleaned up, without impacting the performance of active
channels.

When a splice transaction confirms, all the revoked commitment transactions
that only applied to the previous funding transaction cannot be published
anymore, because the previous funding output has already been spent.

We can thus forget all the historical HTLCs that were included in those
commitments, because we will never need to generate the corresponding
penalty transactions.

This ensures that the growth of our DB is bounded, and will shrink every
time a splice transaction is confirmed.

Fixes #2610, #2702 and #2740
  • Loading branch information
t-bast authored Feb 14, 2024
1 parent 599f9af commit f41bd22
Show file tree
Hide file tree
Showing 24 changed files with 578 additions and 111 deletions.
10 changes: 10 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,16 @@ eclair {
migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary)
compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary)
}
// During normal channel operation, we need to store information about past HTLCs to be able to punish our peer if
// they publish a revoked commitment. Once a channel closes or a splice transaction confirms, we can clean up past
// data (which reduces the size of our DB). Since there may be millions of rows to delete and we don't want to slow
// down the node, we delete those rows in batches at regular intervals.
revoked-htlc-info-cleaner {
// Number of rows to delete per batch: a higher value will clean up the DB faster, but may have a higher impact on performance.
batch-size = 50000
// Frequency at which batches of rows are deleted: a lower value will clean up the DB faster, but may have a higher impact on performance.
interval = 15 minutes
}
}

file-backup {
Expand Down
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
blockchainWatchdogThreshold: Int,
blockchainWatchdogSources: Seq[String],
onionMessageConfig: OnionMessageConfig,
purgeInvoicesInterval: Option[FiniteDuration]) {
purgeInvoicesInterval: Option[FiniteDuration],
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -605,7 +606,11 @@ object NodeParams extends Logging {
timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS),
maxAttempts = config.getInt("onion-messages.max-attempts"),
),
purgeInvoicesInterval = purgeInvoicesInterval
purgeInvoicesInterval = purgeInvoicesInterval,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(
batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"),
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
)
)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fr.acinq.eclair.channel

import akka.event.LoggingAdapter
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, Satoshi, SatoshiLong, Script, Transaction, TxId}
import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw, OnChainFeeConf}
Expand Down Expand Up @@ -266,12 +265,16 @@ case class NextRemoteCommit(sig: CommitSig, commit: RemoteCommit)
/**
* A minimal commitment for a given funding tx.
*
* @param fundingTxIndex index of the funding tx in the life of the channel:
* - initial funding tx has index 0
* - splice txs have index 1, 2, ...
* - commitments that share the same index are rbfed
* @param fundingTxIndex index of the funding tx in the life of the channel:
* - initial funding tx has index 0
* - splice txs have index 1, 2, ...
* - commitments that share the same index are rbfed
* @param firstRemoteCommitIndex index of the first remote commitment we signed that spends the funding transaction.
* Once the funding transaction confirms, our peer won't be able to publish revoked
* commitments with lower commitment indices.
*/
case class Commitment(fundingTxIndex: Long,
firstRemoteCommitIndex: Long,
remoteFundingPubKey: PublicKey,
localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus,
localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) {
Expand Down Expand Up @@ -730,6 +733,7 @@ object Commitment {
/** Subset of Commitments when we want to work with a single, specific commitment. */
case class FullCommitment(params: ChannelParams, changes: CommitmentChanges,
fundingTxIndex: Long,
firstRemoteCommitIndex: Long,
remoteFundingPubKey: PublicKey,
localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus,
localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) {
Expand All @@ -739,7 +743,7 @@ case class FullCommitment(params: ChannelParams, changes: CommitmentChanges,
val commitInput = localCommit.commitTxAndRemoteSig.commitTx.input
val fundingTxId = commitInput.outPoint.txid
val capacity = commitInput.txOut.amount
val commitment = Commitment(fundingTxIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt)
val commitment = Commitment(fundingTxIndex, firstRemoteCommitIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt)

def localChannelReserve: Satoshi = commitment.localChannelReserve(params)

Expand Down Expand Up @@ -803,7 +807,7 @@ case class Commitments(params: ChannelParams,
lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min

// We always use the last commitment that was created, to make sure we never go back in time.
val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt)
val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.firstRemoteCommitIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt)

val all: Seq[Commitment] = active ++ inactive

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.fsm
import akka.actor.Status
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.pattern.pipe
import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script, TxHash}
import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script}
import fr.acinq.eclair.blockchain.OnChainWallet.MakeFundingTxResponse
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel.Helpers.Funding
Expand Down Expand Up @@ -277,6 +277,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
)
val commitment = Commitment(
fundingTxIndex = 0,
firstRemoteCommitIndex = 0,
remoteFundingPubKey = remoteFundingPubKey,
localFundingStatus = SingleFundedUnconfirmedFundingTx(None),
remoteFundingStatus = RemoteFundingStatus.NotLocked,
Expand Down Expand Up @@ -323,6 +324,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
case Success(_) =>
val commitment = Commitment(
fundingTxIndex = 0,
firstRemoteCommitIndex = 0,
remoteFundingPubKey = remoteFundingPubKey,
localFundingStatus = SingleFundedUnconfirmedFundingTx(Some(fundingTx)),
remoteFundingStatus = RemoteFundingStatus.NotLocked,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees
import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL}
import fr.acinq.eclair.db.RevokedHtlcInfoCleaner
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand Down Expand Up @@ -83,6 +84,12 @@ trait CommonFundingHandlers extends CommonHandlers {
}
val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx))
// When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the
// previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can
// clean-up the htlc data that we were storing for the matching penalty transactions.
d.commitments.all.find(_.fundingTxId == w.tx.txid).map(_.firstRemoteCommitIndex).foreach {
commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, beforeCommitIndex = commitIndex))
}
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map {
case (commitments1, commitment) =>
// First of all, we watch the funding tx that is now confirmed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ object InteractiveTxSigningSession {
LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit =>
if (shouldSignFirst(fundingParams.isInitiator, channelParams, fundingTx.tx)) {
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams)
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
SendingSigs(fundingStatus, commitment, fundingTx.localSigs)
} else {
this.copy(localCommit = Right(signedLocalCommit))
Expand All @@ -989,7 +989,7 @@ object InteractiveTxSigningSession {
case Right(fullySignedTx) =>
log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams)
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond}
import fr.acinq.eclair.channel.PersistentChannelData
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.{CltvExpiry, Paginated}

trait ChannelsDb {

Expand All @@ -30,8 +30,15 @@ trait ChannelsDb {

def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit

/** Mark a channel as closed, but keep it in the DB. */
def removeChannel(channelId: ByteVector32): Unit

/** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */
def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit

/** Remove up to batchSize obsolete revoked HTLC information. */
def removeHtlcInfos(batchSize: Int): Unit

def listLocalChannels(): Seq[PersistentChannelData]

def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package fr.acinq.eclair.db

import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{Actor, DiagnosticActorLogging, Props}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
Expand All @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams}
*/
class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging {

val auditDb: AuditDb = nodeParams.db.audit
val channelsDb: ChannelsDb = nodeParams.db.channels
private val auditDb: AuditDb = nodeParams.db.audit
private val channelsDb: ChannelsDb = nodeParams.db.channels

context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner")

context.system.eventStream.subscribe(self, classOf[PaymentSent])
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
Expand Down
10 changes: 10 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch
primary.removeChannel(channelId)
}

override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = {
runAsync(secondary.markHtlcInfosForRemoval(channelId, beforeCommitIndex))
primary.markHtlcInfosForRemoval(channelId, beforeCommitIndex)
}

override def removeHtlcInfos(batchSize: Int): Unit = {
runAsync(secondary.removeHtlcInfos(batchSize))
primary.removeHtlcInfos(batchSize)
}

override def listLocalChannels(): Seq[PersistentChannelData] = {
runAsync(secondary.listLocalChannels())
primary.listLocalChannels()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.scalacompat.ByteVector32

import scala.concurrent.duration.FiniteDuration

/**
* When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was
* stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there
* is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data
* in small batches, at regular intervals.
*/
object RevokedHtlcInfoCleaner {

// @formatter:off
sealed trait Command
case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command
private case object DeleteBatch extends Command
// @formatter:on

case class Config(batchSize: Int, interval: FiniteDuration)

def apply(db: ChannelsDb, config: Config): Behavior[Command] = {
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.self)
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(DeleteBatch, config.interval)
Behaviors.receiveMessage {
case ForgetHtlcInfos(channelId, beforeCommitIndex) =>
db.markHtlcInfosForRemoval(channelId, beforeCommitIndex)
Behaviors.same
case DeleteBatch =>
db.removeHtlcInfos(config.batchSize)
Behaviors.same
}
}
}
}

}
Loading

0 comments on commit f41bd22

Please sign in to comment.