From 0eb05245d159f5c93362ef70a53e36cd8ba7f883 Mon Sep 17 00:00:00 2001 From: t-bast Date: Tue, 29 Aug 2023 12:16:38 +0200 Subject: [PATCH] Avoid herd effect watching local channels When we restart, we set watches on our funding transactions. But we don't actually need to watch them immediately, we just need enough time to react to our peer broadcasting their commitment. We use long `cltv_delta` delays to guarantee funds safety, so we can spread out the watches across several blocks to reduce the start-up load. It essentially is the same thing as receiving mempool transactions or blocks after a delay, which is something that our threat model already takes into account. --- eclair-core/src/main/resources/reference.conf | 1 + .../scala/fr/acinq/eclair/NodeParams.scala | 3 +- .../fr/acinq/eclair/channel/fsm/Channel.scala | 28 +++++++++++-------- .../channel/fsm/ChannelOpenDualFunded.scala | 10 +++---- .../channel/fsm/ChannelOpenSingleFunded.scala | 6 ++-- .../channel/fsm/CommonFundingHandlers.scala | 26 +++++++++++------ .../scala/fr/acinq/eclair/TestConstants.scala | 2 ++ 7 files changed, 47 insertions(+), 29 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index c96c174efd..e471ba1f78 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -135,6 +135,7 @@ eclair { // expiry-delta-blocks. fulfill-safety-before-timeout-blocks = 24 min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks + max-restart-watch-delay = 60 seconds // we add a random delay before watching funding transactions after restart max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect max-tx-publish-retry-delay = 60 seconds // we add a random delay before retrying failed transaction publication diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 9b7ce7535c..82c6f4a344 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -101,7 +101,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, def currentFeerates: FeeratesPerKw = feerates.get() /** Only to be used in tests. */ - def setFeerates(value: FeeratesPerKw) = feerates.set(value) + def setFeerates(value: FeeratesPerKw): Unit = feerates.set(value) /** Returns the features that should be used in our init message with the given peer. */ def initFeaturesFor(nodeId: PublicKey): Features[InitFeature] = overrideInitFeatures.getOrElse(nodeId, features).initFeatures() @@ -505,6 +505,7 @@ object NodeParams extends Logging { maxExpiryDelta = maxExpiryDelta, fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout, minFinalExpiryDelta = minFinalExpiryDelta, + maxRestartWatchDelay = FiniteDuration(config.getDuration("channel.max-restart-watch-delay").getSeconds, TimeUnit.SECONDS), maxBlockProcessingDelay = FiniteDuration(config.getDuration("channel.max-block-processing-delay").getSeconds, TimeUnit.SECONDS), maxTxPublishRetryDelay = FiniteDuration(config.getDuration("channel.max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS), unhandledExceptionStrategy = unhandledExceptionStrategy, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index 1b18972fbe..8e084eaee6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -82,6 +82,7 @@ object Channel { maxExpiryDelta: CltvExpiryDelta, fulfillSafetyBeforeTimeout: CltvExpiryDelta, minFinalExpiryDelta: CltvExpiryDelta, + maxRestartWatchDelay: FiniteDuration, maxBlockProcessingDelay: FiniteDuration, maxTxPublishRetryDelay: FiniteDuration, unhandledExceptionStrategy: UnhandledExceptionStrategy, @@ -251,8 +252,13 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data)) txPublisher ! SetChannelId(remoteNodeId, data.channelId) - // we watch all unconfirmed funding txs, whatever our state is - // (there can be multiple funding txs due to rbf, and they can be unconfirmed in any state due to zero-conf) + // We watch all unconfirmed funding txs, whatever our state is. + // There can be multiple funding txs due to rbf, and they can be unconfirmed in any state due to zero-conf. + // To avoid a herd effect on restart, we had a delay before watching funding txs. + val herdDelay_opt = nodeParams.channelConf.maxRestartWatchDelay.toSeconds match { + case maxRestartWatchDelay if maxRestartWatchDelay > 0 => Some((1 + Random.nextLong(maxRestartWatchDelay)).seconds) + case _ => None + } data match { case _: ChannelDataWithoutCommitments => () case data: ChannelDataWithCommitments => data.commitments.all.foreach { commitment => @@ -271,14 +277,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with // once, because at the next restore, the status of the funding tx will be "confirmed". () } - watchFundingConfirmed(commitment.fundingTxId, Some(singleFundingMinDepth(data))) + watchFundingConfirmed(commitment.fundingTxId, Some(singleFundingMinDepth(data)), herdDelay_opt) case fundingTx: LocalFundingStatus.DualFundedUnconfirmedFundingTx => publishFundingTx(fundingTx) val minDepth_opt = data.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, fundingTx.sharedTx.tx) - watchFundingConfirmed(fundingTx.sharedTx.txId, minDepth_opt) + watchFundingConfirmed(fundingTx.sharedTx.txId, minDepth_opt, herdDelay_opt) case fundingTx: LocalFundingStatus.ZeroconfPublishedFundingTx => // those are zero-conf channels, the min-depth isn't critical, we use the default - watchFundingConfirmed(fundingTx.tx.txid, Some(nodeParams.channelConf.minDepthBlocks.toLong)) + watchFundingConfirmed(fundingTx.tx.txid, Some(nodeParams.channelConf.minDepthBlocks.toLong), herdDelay_opt) case _: LocalFundingStatus.ConfirmedFundingTx => data match { case closing: DATA_CLOSING if Closing.nothingAtStake(closing) || Closing.isClosingTypeAlreadyKnown(closing).isDefined => @@ -286,9 +292,9 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with () case closing: DATA_CLOSING => // in all other cases we need to be ready for any type of closing - watchFundingSpent(commitment, closing.spendingTxs.map(_.txid).toSet) + watchFundingSpent(commitment, closing.spendingTxs.map(_.txid).toSet, herdDelay_opt) case _ => - watchFundingSpent(commitment) + watchFundingSpent(commitment, Set.empty, herdDelay_opt) } } } @@ -564,7 +570,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with // That's why we move on immediately to the next step, and will update our unsigned funding tx when we // receive their tx_sigs. val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx) - watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt) + watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) val commitments1 = d.commitments.add(signingSession1.commitment) val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice) stay() using d1 storing() sending signingSession1.localSigs calling endQuiescence(d1) @@ -1078,7 +1084,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with stay() using d.copy(spliceStatus = SpliceStatus.SpliceAborted) sending TxAbort(d.channelId, f.getMessage) case Right(signingSession1) => val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx) - watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt) + watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) val commitments1 = d.commitments.add(signingSession1.commitment) val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice) log.info("publishing funding tx for channelId={} fundingTxId={}", d.channelId, signingSession1.fundingTx.sharedTx.txId) @@ -1095,7 +1101,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx) d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => - watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks)) + watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None) maybeEmitEventsPostSplice(d.shortIds, d.commitments, commitments1) stay() using d.copy(commitments = commitments1) storing() sending SpliceLocked(d.channelId, w.tx.hash) case Left(_) => stay() @@ -2163,7 +2169,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => log.info(s"zero-conf funding txid=${w.tx.txid} has been published") - watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks)) + watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None) val d1 = d match { // NB: we discard remote's stashed channel_ready, they will send it back at reconnection case d: DATA_WAIT_FOR_FUNDING_CONFIRMED => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala index 8cb81a6812..edd7427d1c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala @@ -380,7 +380,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { // That's why we move on immediately to the next step, and will update our unsigned funding tx when we // receive their tx_sigs. val minDepth_opt = d.channelParams.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx) - watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt) + watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) val commitments = Commitments( params = d.channelParams, changes = CommitmentChanges.init(), @@ -403,7 +403,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { goto(CLOSED) sending Error(d.channelId, f.getMessage) case Right(signingSession) => val minDepth_opt = d.channelParams.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession.fundingTx.sharedTx.tx) - watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt) + watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) val commitments = Commitments( params = d.channelParams, changes = CommitmentChanges.init(), @@ -468,7 +468,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { stay() using d.copy(rbfStatus = RbfStatus.RbfAborted) sending TxAbort(d.channelId, f.getMessage) case Right(signingSession1) => val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx) - watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt) + watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) val commitments1 = d.commitments.add(signingSession1.commitment) val d1 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments1, d.localPushAmount, d.remotePushAmount, d.waitingSince, d.lastChecked, RbfStatus.NoRbf, d.deferred) stay() using d1 storing() sending signingSession1.localSigs calling publishFundingTx(signingSession1.fundingTx) @@ -615,7 +615,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { stay() using d.copy(rbfStatus = RbfStatus.RbfWaitingForSigs(signingSession1)) case signingSession1: InteractiveTxSigningSession.SendingSigs => val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx) - watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt) + watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) val commitments1 = d.commitments.add(signingSession1.commitment) val d1 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments1, d.localPushAmount, d.remotePushAmount, d.waitingSince, d.lastChecked, RbfStatus.NoRbf, d.deferred) stay() using d1 storing() sending signingSession1.localSigs @@ -677,7 +677,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match { case Right((commitments1, _)) => // we still watch the funding tx for confirmation even if we can use the zero-conf channel right away - watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks)) + watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None) val realScidStatus = RealScidStatus.Unknown val shortIds = createShortIds(d.channelId, realScidStatus) val channelReady = createChannelReady(shortIds, d.commitments.params) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala index 79f6a445ef..32387998d9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala @@ -296,7 +296,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { context.system.eventStream.publish(ChannelSignatureReceived(self, commitments)) // NB: we don't send a ChannelSignatureSent for the first commit log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitment.fundingTxId}") - watchFundingConfirmed(commitment.fundingTxId, params.minDepthFundee(nodeParams.channelConf.minDepthBlocks, fundingAmount)) + watchFundingConfirmed(commitment.fundingTxId, params.minDepthFundee(nodeParams.channelConf.minDepthBlocks, fundingAmount), delay_opt = None) goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, nodeParams.currentBlockHeight, None, Right(fundingSigned)) storing() sending fundingSigned } } @@ -340,7 +340,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { val blockHeight = nodeParams.currentBlockHeight context.system.eventStream.publish(ChannelSignatureReceived(self, commitments)) log.info(s"publishing funding tx fundingTxid=${commitment.fundingTxId}") - watchFundingConfirmed(commitment.fundingTxId, params.minDepthFunder) + watchFundingConfirmed(commitment.fundingTxId, params.minDepthFunder, delay_opt = None) // we will publish the funding tx only after the channel state has been written to disk because we want to // make sure we first persist the commitment that returns back the funds to us in case of problem goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, blockHeight, None, Left(fundingCreated)) storing() calling publishFundingTx(d.channelId, fundingTx, fundingTxFee, d.replyTo) @@ -394,7 +394,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Right((commitments1, _)) => log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId) // we still watch the funding tx for confirmation even if we can use the zero-conf channel right away - watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks)) + watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None) val realScidStatus = RealScidStatus.Unknown val shortIds = createShortIds(d.channelId, realScidStatus) val channelReady = createChannelReady(shortIds, d.commitments.params) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala index 1af7aaeca9..6630b8d2c7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala @@ -16,7 +16,7 @@ package fr.acinq.eclair.channel.fsm -import akka.actor.typed.scaladsl.adapter.actorRefAdapter +import akka.actor.typed.scaladsl.adapter.{TypedActorRefOps, actorRefAdapter} import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.ScriptFlags import fr.acinq.bitcoin.scalacompat.{ByteVector32, Transaction} @@ -29,7 +29,7 @@ import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChan import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream} -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.{Failure, Success, Try} /** @@ -40,17 +40,25 @@ trait CommonFundingHandlers extends CommonHandlers { this: Channel => - def watchFundingSpent(commitment: Commitment, additionalKnownSpendingTxs: Set[ByteVector32] = Set.empty): Unit = { + def watchFundingSpent(commitment: Commitment, additionalKnownSpendingTxs: Set[ByteVector32], delay_opt: Option[FiniteDuration]): Unit = { val knownSpendingTxs = Set(commitment.localCommit.commitTxAndRemoteSig.commitTx.tx.txid, commitment.remoteCommit.txid) ++ commitment.nextRemoteCommit_opt.map(_.commit.txid).toSet ++ additionalKnownSpendingTxs - blockchain ! WatchFundingSpent(self, commitment.commitInput.outPoint.txid, commitment.commitInput.outPoint.index.toInt, knownSpendingTxs) + val watch = WatchFundingSpent(self, commitment.commitInput.outPoint.txid, commitment.commitInput.outPoint.index.toInt, knownSpendingTxs) + delay_opt match { + case Some(delay) => context.system.scheduler.scheduleOnce(delay, blockchain.toClassic, watch) + case None => blockchain ! watch + } } - def watchFundingConfirmed(fundingTxId: ByteVector32, minDepth_opt: Option[Long]): Unit = { - minDepth_opt match { - case Some(fundingMinDepth) => blockchain ! WatchFundingConfirmed(self, fundingTxId, fundingMinDepth) + def watchFundingConfirmed(fundingTxId: ByteVector32, minDepth_opt: Option[Long], delay_opt: Option[FiniteDuration]): Unit = { + val watch = minDepth_opt match { + case Some(fundingMinDepth) => WatchFundingConfirmed(self, fundingTxId, fundingMinDepth) // When using 0-conf, we make sure that the transaction was successfully published, otherwise there is a risk // of accidentally double-spending it later (e.g. restarting bitcoind would remove the utxo locks). - case None => blockchain ! WatchPublished(self, fundingTxId) + case None => WatchPublished(self, fundingTxId) + } + delay_opt match { + case Some(delay) => context.system.scheduler.scheduleOnce(delay, blockchain.toClassic, watch) + case None => blockchain ! watch } } @@ -73,7 +81,7 @@ trait CommonFundingHandlers extends CommonHandlers { d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map { case (commitments1, commitment) => // first of all, we watch the funding tx that is now confirmed - watchFundingSpent(commitment) + watchFundingSpent(commitment, Set.empty, None) // in the dual-funding case we can forget all other transactions, they have been double spent by the tx that just confirmed rollbackDualFundingTxs(d.commitments.active // note how we use the unpruned original commitments .filter(c => c.fundingTxIndex == commitment.fundingTxIndex && c.fundingTxId != commitment.fundingTxId) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 21bd98694c..c0ca409a1c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -114,6 +114,7 @@ object TestConstants { maxExpiryDelta = CltvExpiryDelta(2016), fulfillSafetyBeforeTimeout = CltvExpiryDelta(6), minFinalExpiryDelta = CltvExpiryDelta(18), + maxRestartWatchDelay = 0 millis, maxBlockProcessingDelay = 10 millis, maxTxPublishRetryDelay = 10 millis, htlcMinimum = 0 msat, @@ -274,6 +275,7 @@ object TestConstants { maxExpiryDelta = CltvExpiryDelta(2016), fulfillSafetyBeforeTimeout = CltvExpiryDelta(6), minFinalExpiryDelta = CltvExpiryDelta(18), + maxRestartWatchDelay = 5 millis, maxBlockProcessingDelay = 10 millis, maxTxPublishRetryDelay = 10 millis, htlcMinimum = 1000 msat,