diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index 6cc8d21538..2c129917a4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -416,6 +416,11 @@ sealed trait LocalFundingStatus { def signedTx_opt: Option[Transaction] /** We store local signatures for the purpose of retransmitting if the funding/splicing flow is interrupted. */ def localSigs_opt: Option[TxSignatures] + /** + * Commitment index of the first remote commitment we signed that spends this funding transaction. + * Once the funding transaction confirms, our peer won't be able to publish revoked commitments with lower indices. + */ + def firstCommitIndex_opt: Option[Long] } object LocalFundingStatus { sealed trait NotLocked extends LocalFundingStatus @@ -430,15 +435,16 @@ object LocalFundingStatus { */ case class SingleFundedUnconfirmedFundingTx(signedTx_opt: Option[Transaction]) extends UnconfirmedFundingTx with NotLocked { override val localSigs_opt: Option[TxSignatures] = None + override val firstCommitIndex_opt: Option[Long] = None } - case class DualFundedUnconfirmedFundingTx(sharedTx: SignedSharedTransaction, createdAt: BlockHeight, fundingParams: InteractiveTxParams) extends UnconfirmedFundingTx with NotLocked { + case class DualFundedUnconfirmedFundingTx(sharedTx: SignedSharedTransaction, createdAt: BlockHeight, fundingParams: InteractiveTxParams, firstCommitIndex_opt: Option[Long]) extends UnconfirmedFundingTx with NotLocked { override val signedTx_opt: Option[Transaction] = sharedTx.signedTx_opt override val localSigs_opt: Option[TxSignatures] = Some(sharedTx.localSigs) } - case class ZeroconfPublishedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures]) extends UnconfirmedFundingTx with Locked { + case class ZeroconfPublishedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures], firstCommitIndex_opt: Option[Long]) extends UnconfirmedFundingTx with Locked { override val signedTx_opt: Option[Transaction] = Some(tx) } - case class ConfirmedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures]) extends LocalFundingStatus with Locked { + case class ConfirmedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures], firstCommitIndex_opt: Option[Long]) extends LocalFundingStatus with Locked { override val signedTx_opt: Option[Transaction] = Some(tx) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 101176f4c4..4c43677546 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -1150,6 +1150,11 @@ case class Commitments(params: ChannelParams, all.find(_.fundingTxId == fundingTxId).flatMap(_.localFundingStatus.localSigs_opt) } + /** Return the index of the first remote commitment we signed spending the given funding transaction. */ + def firstCommitIndex(fundingTxId: ByteVector32): Option[Long] = { + all.find(_.fundingTxId == fundingTxId).flatMap(_.localFundingStatus.firstCommitIndex_opt) + } + /** * Update the local/remote funding status * diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index fa34d2f806..1a08bc194f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -1059,7 +1059,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case Event(msg: TxSignatures, d: DATA_NORMAL) => d.commitments.latest.localFundingStatus match { - case dfu@LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _) if fundingTx.txId == msg.txId => + case dfu@LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _, _) if fundingTx.txId == msg.txId => // we already sent our tx_signatures InteractiveTxSigningSession.addRemoteSigs(keyManager, d.commitments.params, dfu.fundingParams, fundingTx, msg) match { case Left(cause) => @@ -1101,7 +1101,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with } case Event(w: WatchPublishedTriggered, d: DATA_NORMAL) => - val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid)) + val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid)) d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None) @@ -1806,7 +1806,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case d: DATA_NORMAL => d.spliceStatus match { case SpliceStatus.SpliceWaitingForSigs(status) => Set(ChannelReestablishTlv.NextFundingTlv(status.fundingTx.txId.reverse)) case _ => d.commitments.latest.localFundingStatus match { - case LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _) => Set(ChannelReestablishTlv.NextFundingTlv(fundingTx.txId.reverse)) + case LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _, _) => Set(ChannelReestablishTlv.NextFundingTlv(fundingTx.txId.reverse)) case _ => Set.empty } } @@ -2190,11 +2190,11 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with // slightly before us. In that case, the WatchConfirmed may trigger first, and it would be inefficient to let the // WatchPublished override our funding status: it will make us set a new WatchConfirmed that will instantly // trigger and rewrite the funding status again. - val alreadyConfirmed = d.commitments.active.map(_.localFundingStatus).collect { case LocalFundingStatus.ConfirmedFundingTx(tx, _) => tx }.exists(_.txid == w.tx.txid) + val alreadyConfirmed = d.commitments.active.map(_.localFundingStatus).collect { case LocalFundingStatus.ConfirmedFundingTx(tx, _, _) => tx }.exists(_.txid == w.tx.txid) if (alreadyConfirmed) { stay() } else { - val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid)) + val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid)) d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => log.info(s"zero-conf funding txid=${w.tx.txid} has been published") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala index 2802b7aeac..fb69b60e0d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala @@ -673,7 +673,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { case Event(w: WatchPublishedTriggered, d: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED) => log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId) - val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid)) + val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid)) d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => // we still watch the funding tx for confirmation even if we can use the zero-conf channel right away diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala index f1b085e89e..646e903040 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala @@ -389,7 +389,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { stay() using d.copy(deferred = Some(remoteChannelReady)) // no need to store, they will re-send if we get disconnected case Event(w: WatchPublishedTriggered, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => - val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, None) + val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, None, None) d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala index 7985d6eed0..cd75c694a4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx} import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream} @@ -82,8 +83,14 @@ trait CommonFundingHandlers extends CommonHandlers { } case _ => () // in the dual-funding case, we have already verified the funding tx } - val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid)) + val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid)) context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx)) + // When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the + // previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can + // clean-up the htlc data that we were storing for the matching penalty transactions. + fundingStatus.firstCommitIndex_opt.foreach { + commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, commitIndex)) + } d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map { case (commitments1, commitment) => // First of all, we watch the funding tx that is now confirmed. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index 655291dfe6..cc65c057c5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -942,7 +942,7 @@ object InteractiveTxSigningSession { val localPerCommitmentPoint = nodeParams.channelKeyManager.commitmentPoint(channelKeyPath, localCommitIndex) LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit => if (shouldSignFirst(channelParams, fundingTx.tx)) { - val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams) + val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams, Some(remoteCommit.index)) val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) SendingSigs(fundingStatus, commitment, fundingTx.localSigs) } else { @@ -967,7 +967,7 @@ object InteractiveTxSigningSession { Left(f) case Right(fullySignedTx) => log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length) - val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams) + val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams, Some(remoteCommit.index)) val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index b978d240cc..d9d8e5514b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -18,9 +18,9 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.DbEventHandler.ChannelEvent +import fr.acinq.eclair.{CltvExpiry, Paginated} trait ChannelsDb { @@ -30,8 +30,13 @@ trait ChannelsDb { def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit + /** Mark a channel as closed, but keep it in the DB. */ def removeChannel(channelId: ByteVector32): Unit + /** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */ + def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit + + /** Remove up to batchSize obsolete revoked HTLC information. */ def removeHtlcInfos(batchSize: Int): Unit def listLocalChannels(): Seq[PersistentChannelData] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 27c328dc57..3bd2f7aa4a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -225,6 +225,11 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.removeChannel(channelId) } + override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = { + runAsync(secondary.forgetHtlcInfos(channelId, beforeCommitIndex)) + primary.forgetHtlcInfos(channelId, beforeCommitIndex) + } + override def removeHtlcInfos(batchSize: Int): Unit = { runAsync(secondary.removeHtlcInfos(batchSize)) primary.removeHtlcInfos(batchSize) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala index 66919e9a4b..c0ad3b370b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala @@ -17,29 +17,37 @@ package fr.acinq.eclair.db import akka.actor.typed.Behavior +import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.Behaviors +import fr.acinq.bitcoin.scalacompat.ByteVector32 import scala.concurrent.duration.FiniteDuration /** - * When a channel is closed, we can remove the information about old HTLCs that was stored in the DB to punish revoked commitments. - * We potentially have millions of rows to delete per channel, and there is no rush to remove them. - * We don't want this to negatively impact active channels, so this actor deletes that data in small batches, at regular intervals. + * When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was + * stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there + * is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data + * in small batches, at regular intervals. */ object RevokedHtlcInfoCleaner { // @formatter:off sealed trait Command + case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command private case object DeleteBatch extends Command // @formatter:on case class Config(batchSize: Int, interval: FiniteDuration) def apply(db: ChannelsDb, config: Config): Behavior[Command] = { - Behaviors.setup { _ => + Behaviors.setup { context => + context.system.eventStream ! EventStream.Subscribe(context.self) Behaviors.withTimers { timers => timers.startTimerWithFixedDelay(DeleteBatch, config.interval) Behaviors.receiveMessage { + case ForgetHtlcInfos(channelId, beforeCommitIndex) => + db.forgetHtlcInfos(channelId, beforeCommitIndex) + Behaviors.same case DeleteBatch => db.removeHtlcInfos(config.batchSize) Behaviors.same diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 4b361f7540..3dce363aab 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -116,7 +116,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } def migration89(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)") + statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") } getVersion(statement, DB_NAME) match { @@ -125,7 +125,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") - statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)") + statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") @@ -235,8 +235,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. // We instead run an asynchronous job to clean up that data in small batches. - using(pg.prepareStatement("INSERT INTO local.closed_channels_to_clean_up VALUES (?) ON CONFLICT DO NOTHING")) { statement => + using(pg.prepareStatement("INSERT INTO local.htlc_infos_to_remove (channel_id, before_commitment_number) VALUES(?, ?) ON CONFLICT (channel_id) DO UPDATE SET before_commitment_number = EXCLUDED.before_commitment_number")) { statement => statement.setString(1, channelId.toHex) + statement.setLong(2, Long.MaxValue) statement.executeUpdate() } @@ -248,27 +249,42 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } + override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Postgres) { + withLock { pg => + using(pg.prepareStatement("INSERT INTO local.htlc_infos_to_remove (channel_id, before_commitment_number) VALUES(?, ?) ON CONFLICT (channel_id) DO UPDATE SET before_commitment_number = EXCLUDED.before_commitment_number")) { statement => + statement.setString(1, channelId.toHex) + statement.setLong(2, beforeCommitIndex) + statement.executeUpdate() + } + } + } + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Postgres) { withLock { pg => // Check if there are channels that need to be cleaned up. - val channelId_opt = using(pg.prepareStatement("SELECT channel_id FROM local.closed_channels_to_clean_up LIMIT 1")) { statement => - statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32FromHex("channel_id"))).lastOption + val channelToCleanUp_opt = using(pg.prepareStatement("SELECT channel_id, before_commitment_number FROM local.htlc_infos_to_remove LIMIT 1")) { statement => + statement.executeQuery().map(rs => { + val channelId = ByteVector32(rs.getByteVector32FromHex("channel_id")) + val beforeCommitmentNumber = rs.getLong("before_commitment_number") + (channelId, beforeCommitmentNumber) + }).lastOption } // Remove a batch of HTLC information for that channel. - channelId_opt.foreach(channelId => { - val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement => + channelToCleanUp_opt.foreach { case (channelId, beforeCommitmentNumber) => + val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? AND commitment_number statement.setString(1, channelId.toHex) statement.setString(2, channelId.toHex) + statement.setLong(3, beforeCommitmentNumber) statement.executeUpdate() } // If we've deleted all HTLC information for that channel, we can now remove it from the DB. if (deletedCount < batchSize) { - using(pg.prepareStatement("DELETE FROM local.closed_channels_to_clean_up WHERE channel_id=?")) { statement => + using(pg.prepareStatement("DELETE FROM local.htlc_infos_to_remove WHERE channel_id=?")) { statement => statement.setString(1, channelId.toHex) statement.executeUpdate() } } - }) + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index f6bcfc6f26..4a37cb4dc9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -80,14 +80,14 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } def migration45(): Unit = { - statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)") + statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") } getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") - statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)") + statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") @@ -162,10 +162,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. // We instead run an asynchronous job to clean up that data in small batches. - using(sqlite.prepareStatement("INSERT INTO closed_channels_to_clean_up VALUES (?)")) { statement => - statement.setBytes(1, channelId.toArray) - statement.executeUpdate() - } + forgetHtlcInfos(channelId, Long.MaxValue) using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1, closed_timestamp=? WHERE channel_id=?")) { statement => statement.setLong(1, TimestampMilli.now().toLong) @@ -174,26 +171,45 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } + override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Sqlite) { + using(sqlite.prepareStatement("UPDATE htlc_infos_to_remove SET before_commitment_number=? WHERE channel_id=?")) { update => + update.setLong(1, beforeCommitIndex) + update.setBytes(2, channelId.toArray) + if (update.executeUpdate() == 0) { + using(sqlite.prepareStatement("INSERT INTO htlc_infos_to_remove VALUES (?, ?)")) { statement => + statement.setBytes(1, channelId.toArray) + statement.setLong(2, beforeCommitIndex) + statement.executeUpdate() + } + } + } + } + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Sqlite) { // Check if there are channels that need to be cleaned up. - val channelId_opt = using(sqlite.prepareStatement("SELECT channel_id FROM closed_channels_to_clean_up LIMIT 1")) { statement => - statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32("channel_id"))).lastOption + val channelToCleanUp_opt = using(sqlite.prepareStatement("SELECT channel_id, before_commitment_number FROM htlc_infos_to_remove LIMIT 1")) { statement => + statement.executeQuery().map(rs => { + val channelId = ByteVector32(rs.getByteVector32("channel_id")) + val beforeCommitmentNumber = rs.getLong("before_commitment_number") + (channelId, beforeCommitmentNumber) + }).lastOption } // Remove a batch of HTLC information for that channel. - channelId_opt.foreach(channelId => { - val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement => + channelToCleanUp_opt.foreach { case (channelId, beforeCommitmentNumber) => + val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? AND commitment_number statement.setBytes(1, channelId.toArray) statement.setBytes(2, channelId.toArray) + statement.setLong(3, beforeCommitmentNumber) statement.executeUpdate() } // If we've deleted all HTLC information for that channel, we can now remove it from the DB. if (deletedCount < batchSize) { - using(sqlite.prepareStatement("DELETE FROM closed_channels_to_clean_up WHERE channel_id=?")) { statement => + using(sqlite.prepareStatement("DELETE FROM htlc_infos_to_remove WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() } } - }) + } } override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala index fd301d0208..8a7714cd24 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala @@ -9,13 +9,12 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fund.InteractiveTxSigningSession.UnsignedLocalCommit import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession} import fr.acinq.eclair.crypto.ShaChain -import fr.acinq.eclair.MilliSatoshiLong import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions.{CommitmentSpec, DirectedHtlc, IncomingHtlc, OutgoingHtlc} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ import fr.acinq.eclair.wire.protocol.{TxSignatures, UpdateAddHtlc, UpdateMessage} -import fr.acinq.eclair.{BlockHeight, FeatureSupport, Features, PermanentChannelFeature, channel} +import fr.acinq.eclair.{BlockHeight, FeatureSupport, Features, MilliSatoshiLong, PermanentChannelFeature, channel} import scodec.bits.{BitVector, ByteVector} import scodec.codecs._ import scodec.{Attempt, Codec} @@ -352,18 +351,28 @@ private[channel] object ChannelCodecs4 { .typecase(0x01, partiallySignedSharedTransactionCodec) .typecase(0x02, fullySignedSharedTransactionCodec) + private val dualFundedUnconfirmedFundingTxWithoutCommitIndexCodec: Codec[DualFundedUnconfirmedFundingTx] = ( + ("sharedTx" | signedSharedTransactionCodec) :: + ("createdAt" | blockHeight) :: + ("fundingParams" | fundingParamsCodec) :: + provide(Option.empty[Long])).as[DualFundedUnconfirmedFundingTx] + private val dualFundedUnconfirmedFundingTxCodec: Codec[DualFundedUnconfirmedFundingTx] = ( ("sharedTx" | signedSharedTransactionCodec) :: ("createdAt" | blockHeight) :: - ("fundingParams" | fundingParamsCodec)).as[DualFundedUnconfirmedFundingTx] + ("fundingParams" | fundingParamsCodec) :: + ("commitIndex" | optional(bool8, uint64overflow))).as[DualFundedUnconfirmedFundingTx] val fundingTxStatusCodec: Codec[LocalFundingStatus] = discriminated[LocalFundingStatus].by(uint8) .typecase(0x01, optional(bool8, txCodec).as[SingleFundedUnconfirmedFundingTx]) - .typecase(0x02, dualFundedUnconfirmedFundingTxCodec) - .typecase(0x05, (txCodec :: optional(bool8, lengthDelimited(txSignaturesCodec))).as[ZeroconfPublishedFundingTx]) - .typecase(0x06, (txCodec :: optional(bool8, lengthDelimited(txSignaturesCodec))).as[ConfirmedFundingTx]) - .typecase(0x03, (txCodec :: provide(Option.empty[TxSignatures])).as[ZeroconfPublishedFundingTx]) - .typecase(0x04, (txCodec :: provide(Option.empty[TxSignatures])).as[ConfirmedFundingTx]) + .typecase(0x07, dualFundedUnconfirmedFundingTxCodec) + .typecase(0x08, (txCodec :: optional(bool8, lengthDelimited(txSignaturesCodec)) :: optional(bool8, uint64overflow)).as[ZeroconfPublishedFundingTx]) + .typecase(0x09, (txCodec :: optional(bool8, lengthDelimited(txSignaturesCodec)) :: optional(bool8, uint64overflow)).as[ConfirmedFundingTx]) + .typecase(0x02, dualFundedUnconfirmedFundingTxWithoutCommitIndexCodec) + .typecase(0x05, (txCodec :: optional(bool8, lengthDelimited(txSignaturesCodec)) :: provide(Option.empty[Long])).as[ZeroconfPublishedFundingTx]) + .typecase(0x06, (txCodec :: optional(bool8, lengthDelimited(txSignaturesCodec)) :: provide(Option.empty[Long])).as[ConfirmedFundingTx]) + .typecase(0x03, (txCodec :: provide(Option.empty[TxSignatures]) :: provide(Option.empty[Long])).as[ZeroconfPublishedFundingTx]) + .typecase(0x04, (txCodec :: provide(Option.empty[TxSignatures]) :: provide(Option.empty[Long])).as[ConfirmedFundingTx]) val remoteFundingStatusCodec: Codec[RemoteFundingStatus] = discriminated[RemoteFundingStatus].by(uint8) .typecase(0x01, provide(RemoteFundingStatus.NotLocked)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index b0cff34d78..b497c7d108 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -34,6 +34,7 @@ import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, PublishRepla import fr.acinq.eclair.channel.states.ChannelStateTestsBase.{FakeTxPublisherFactory, PimpTestFSM} import fr.acinq.eclair.channel.states.ChannelStateTestsTags.{AnchorOutputsZeroFeeHtlcTxs, NoMaxHtlcValueInFlight, ZeroConf} import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner.ForgetHtlcInfos import fr.acinq.eclair.testutils.PimpTestProbe.convert import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing} import fr.acinq.eclair.transactions.Transactions @@ -690,9 +691,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val aliceEvents = TestProbe() val bobEvents = TestProbe() + systemA.eventStream.subscribe(aliceEvents.ref, classOf[ForgetHtlcInfos]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[AvailableBalanceChanged]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[LocalChannelUpdate]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[LocalChannelDown]) + systemB.eventStream.subscribe(bobEvents.ref, classOf[ForgetHtlcInfos]) systemB.eventStream.subscribe(bobEvents.ref, classOf[AvailableBalanceChanged]) systemB.eventStream.subscribe(bobEvents.ref, classOf[LocalChannelUpdate]) systemB.eventStream.subscribe(bobEvents.ref, classOf[LocalChannelDown]) @@ -706,6 +709,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx1) alice2bob.expectMsgType[SpliceLocked] alice2bob.forward(bob) + aliceEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.remoteCommitIndex)) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) @@ -713,6 +717,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2alice.expectMsgType[SpliceLocked] bob2alice.forward(alice) aliceEvents.expectAvailableBalanceChanged(balance = 1_300_000_000.msat, capacity = 2_000_000.sat) + bobEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.localCommitIndex)) bobEvents.expectAvailableBalanceChanged(balance = 700_000_000.msat, capacity = 2_000_000.sat) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) @@ -721,11 +726,13 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2alice.expectMsgType[SpliceLocked] bob2alice.forward(alice) aliceEvents.expectNoMessage(100 millis) + bobEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.localCommitIndex)) bobEvents.expectNoMessage(100 millis) alice ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx2) alice2bob.expectMsgType[SpliceLocked] alice2bob.forward(bob) + aliceEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.remoteCommitIndex)) aliceEvents.expectAvailableBalanceChanged(balance = 1_800_000_000.msat, capacity = 2_500_000.sat) bobEvents.expectAvailableBalanceChanged(balance = 700_000_000.msat, capacity = 2_500_000.sat) aliceEvents.expectNoMessage(100 millis) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index df48402160..5ee4f7b931 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -58,20 +58,18 @@ class ChannelsDbSpec extends AnyFunSuite { test("add/remove/list channels") { forAllDbs { dbs => val db = dbs.channels - dbs.pendingCommands // needed by db.removeChannel val channel1 = ChannelCodecsSpec.normal val channel2a = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) val channel2b = channel2a.modify(_.shortIds.real).setTo(RealScidStatus.Final(RealShortChannelId(189371))) - val commitNumber1 = 42 - val commitNumber2 = 43 + val commitNumber = 42 val paymentHash1 = ByteVector32.Zeroes val cltvExpiry1 = CltvExpiry(123) val paymentHash2 = ByteVector32(ByteVector.fill(32)(1)) val cltvExpiry2 = CltvExpiry(656) - intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash1, cltvExpiry1)) // no related channel + intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel assert(db.listLocalChannels().isEmpty) db.addOrUpdateChannel(channel1) @@ -85,13 +83,11 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel1, channel2b)) assert(db.getChannel(channel2b.channelId).contains(channel2b)) - assert(db.listHtlcInfos(channel1.channelId, commitNumber1).isEmpty) - db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash1, cltvExpiry1) - db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash2, cltvExpiry2) - db.addHtlcInfo(channel1.channelId, commitNumber2, paymentHash1, cltvExpiry1) - assert(db.listHtlcInfos(channel1.channelId, commitNumber1).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) - assert(db.listHtlcInfos(channel1.channelId, commitNumber2).toSet == Set((paymentHash1, cltvExpiry1))) - assert(db.listHtlcInfos(channel1.channelId, 44).isEmpty) + assert(db.listHtlcInfos(channel1.channelId, commitNumber).isEmpty) + db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1) + db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash2, cltvExpiry2) + assert(db.listHtlcInfos(channel1.channelId, commitNumber).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber + 1).isEmpty) assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) @@ -101,23 +97,70 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) - // HTLC info is cleaned up asynchronously. - assert(db.listHtlcInfos(channel1.channelId, commitNumber1).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) - assert(db.listHtlcInfos(channel1.channelId, commitNumber2).toSet == Set((paymentHash1, cltvExpiry1))) - db.removeHtlcInfos(1) // remove one of the commitment number (ordered not deterministic) - val remainingHtlcInfos = Seq(commitNumber1, commitNumber2).flatMap(commitNumber => db.listHtlcInfos(channel1.channelId, commitNumber)) - assert(remainingHtlcInfos.nonEmpty) - db.removeHtlcInfos(1) // remove the remaining commitment number - assert(db.listHtlcInfos(channel1.channelId, commitNumber1).isEmpty) - assert(db.listHtlcInfos(channel1.channelId, commitNumber2).isEmpty) - db.removeHtlcInfos(1) // noop - db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) assert(db.listLocalChannels().isEmpty) } } + test("remove htlc infos") { + forAllDbs { dbs => + val db = dbs.channels + + val channel1 = ChannelCodecsSpec.normal + val channel2 = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) + db.addOrUpdateChannel(channel1) + db.addOrUpdateChannel(channel2) + + val commitNumberSplice1 = 50 + val commitNumberSplice2 = 75 + + // The first channel has one splice transaction and is then closed. + db.addHtlcInfo(channel1.channelId, 49, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.forgetHtlcInfos(channel1.channelId, commitNumberSplice1) + db.addHtlcInfo(channel1.channelId, 51, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 52, randomBytes32(), CltvExpiry(561)) + db.removeChannel(channel1.channelId) + + // The second channel has two splice transactions. + db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 49, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.forgetHtlcInfos(channel2.channelId, commitNumberSplice1) + db.addHtlcInfo(channel2.channelId, 74, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 75, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 76, randomBytes32(), CltvExpiry(561)) + db.forgetHtlcInfos(channel2.channelId, commitNumberSplice2) + + // We asynchronously clean-up the HTLC data from the DB in small batches. + val obsoleteHtlcInfo = Seq( + (channel1.channelId, 49), + (channel1.channelId, 50), + (channel1.channelId, 51), + (channel1.channelId, 52), + (channel2.channelId, 48), + (channel2.channelId, 49), + (channel2.channelId, 50), + (channel2.channelId, 74), + ) + db.removeHtlcInfos(10) // This should remove all the data for one of the two channels in one batch + db.removeHtlcInfos(3) // This should remove only part of the data for the remaining channel + assert(obsoleteHtlcInfo.exists { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber).nonEmpty }) + db.removeHtlcInfos(3) // This should remove the rest of the data for the remaining channel + obsoleteHtlcInfo.foreach { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber).isEmpty } + + // The remaining HTLC data shouldn't be removed. + assert(db.listHtlcInfos(channel2.channelId, 75).nonEmpty) + assert(db.listHtlcInfos(channel2.channelId, 76).nonEmpty) + db.removeHtlcInfos(10) // no-op + assert(db.listHtlcInfos(channel2.channelId, 75).nonEmpty) + assert(db.listHtlcInfos(channel2.channelId, 76).nonEmpty) + } + } + test("concurrent channel updates") { forAllDbs { dbs => val db = dbs.channels diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala index 6a699cedec..2200e97534 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala @@ -17,9 +17,11 @@ package fr.acinq.eclair.db import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.eventstream.EventStream import com.softwaremill.quicklens.ModifyPimp import com.typesafe.config.ConfigFactory import fr.acinq.eclair.TestDatabases.TestSqliteDatabases +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner.ForgetHtlcInfos import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec import fr.acinq.eclair.{CltvExpiry, randomBytes32} import org.scalatest.funsuite.AnyFunSuiteLike @@ -28,7 +30,7 @@ import scala.concurrent.duration.DurationInt class RevokedHtlcInfoCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { - test("clean closed channels at regular intervals") { + test("remove htlc info from closed channels at regular intervals") { val channelsDb = TestSqliteDatabases().channels val channelId = randomBytes32() @@ -51,4 +53,32 @@ class RevokedHtlcInfoCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory } } + test("remove htlc info from spliced channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 1, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 2, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 2, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 3, randomBytes32(), CltvExpiry(1729)) + channelsDb.addHtlcInfo(channelId, 3, randomBytes32(), CltvExpiry(1729)) + channelsDb.addHtlcInfo(channelId, 4, randomBytes32(), CltvExpiry(2465)) + (1 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).nonEmpty)) + + val config = RevokedHtlcInfoCleaner.Config(batchSize = 2, interval = 10 millis) + val htlcCleaner = testKit.spawn(RevokedHtlcInfoCleaner(channelsDb, config)) + + htlcCleaner ! ForgetHtlcInfos(channelId, beforeCommitIndex = 3) + eventually { + (1 to 2).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).isEmpty)) + } + (3 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).nonEmpty)) + + testKit.system.eventStream ! EventStream.Publish(ForgetHtlcInfos(channelId, beforeCommitIndex = 5)) + eventually { + (3 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).isEmpty)) + } + } + }