Skip to content

Commit

Permalink
Implement on-the-fly funding
Browse files Browse the repository at this point in the history
Implement the on-the-fly funding protocol: when a payment cannot be
relayed because of a liquidity issue, we notify the `Peer` actor that
we'd like to trigger on-the-fly funding if available. If available, we
we send a funding proposal to our peer and keep track of its status.

Once a matching funding transaction is signed, we persist this funding
attempt and wait for the additional liquidity to be available (once the
channel is ready or the splice locked). We will then frequently try to
relay the payment to get paid our liquidity fees. If the payment keeps
getting rejected, or we cannot connect to our peer, we abandon the
payment when it reaches its CLTV expiry, which ensures that the upstream
channels are not at risk.

When using on-the-fly funding, we use a single channel with our peer.
If they try to open another channel while one is available, we reject
their request and expect a splice instead.
  • Loading branch information
t-bast committed Jul 5, 2024
1 parent d63084a commit e704f28
Show file tree
Hide file tree
Showing 47 changed files with 3,052 additions and 174 deletions.
8 changes: 8 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ eclair {
]
}

// On-the-fly funding leverages liquidity ads to fund channels with wallet peers based on their payment patterns.
on-the-fly-funding {
wake-up-timeout = 30 seconds
// If our peer doesn't respond to our funding proposal, we must fail the corresponding upstream HTLCs.
// Since MPP may be used, we should use a timeout greater than the MPP timeout.
proposal-timeout = 90 seconds
}

peer-connection {
auth-timeout = 15 seconds // will disconnect if connection authentication doesn't happen within that timeframe
init-timeout = 15 seconds // will disconnect if initialization doesn't happen within that timeframe
Expand Down
8 changes: 6 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import fr.acinq.eclair.db._
import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy}
import fr.acinq.eclair.io.PeerConnection
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
Expand Down Expand Up @@ -89,7 +90,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
purgeInvoicesInterval: Option[FiniteDuration],
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
willFundRates_opt: Option[LiquidityAds.WillFundRates],
wakeUpTimeout: FiniteDuration) {
onTheFlyFundingConfig: OnTheFlyFunding.Config) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -649,7 +650,10 @@ object NodeParams extends Logging {
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
),
willFundRates_opt = willFundRates_opt,
wakeUpTimeout = 30 seconds,
onTheFlyFundingConfig = OnTheFlyFunding.Config(
wakeUpTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.wake-up-timeout").getSeconds, TimeUnit.SECONDS),
proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS),
),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object Upstream {
val expiryIn: CltvExpiry = add.cltvExpiry
}
/** Our node is forwarding a payment based on a set of HTLCs from potentially multiple upstream channels. */
case class Trampoline(received: Seq[Channel]) extends Hot {
case class Trampoline(received: List[Channel]) extends Hot {
override val amountIn: MilliSatoshi = received.map(_.add.amountMsat).sum
// We must use the lowest expiry of the incoming HTLC set.
val expiryIn: CltvExpiry = received.map(_.add.cltvExpiry).min
Expand All @@ -165,6 +165,10 @@ object Upstream {

/** Our node is forwarding a single incoming HTLC. */
case class Channel(originChannelId: ByteVector32, originHtlcId: Long, amountIn: MilliSatoshi) extends Cold
object Channel {
def apply(add: UpdateAddHtlc): Channel = Channel(add.channelId, add.id, add.amountMsat)
}

/** Our node is forwarding a payment based on a set of HTLCs from potentially multiple upstream channels. */
case class Trampoline(originHtlcs: List[Channel]) extends Cold { override val amountIn: MilliSatoshi = originHtlcs.map(_.amountIn).sum }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class ChannelAborted(channel: ActorRef, remoteNodeId: PublicKey, channelId:
/** This event will be sent once a channel has been successfully opened and is ready to process payments. */
case class ChannelOpened(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32) extends ChannelEvent

/** This event is sent once channel_ready or splice_locked have been exchanged. */
case class ChannelReadyForPayments(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, fundingTxIndex: Long) extends ChannelEvent

case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortIds: ShortIds, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, commitments: Commitments) extends ChannelEvent {
/**
* We always include the local alias because we must always be able to route based on it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.io.Peer.LiquidityLeaseSigned
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.{Bolt11Invoice, PaymentSettlingOnChain}
import fr.acinq.eclair.router.Announcements
Expand Down Expand Up @@ -1094,10 +1095,13 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
log.info("ignoring outgoing interactive-tx message {} from previous session", msg.getClass.getSimpleName)
stay()
}
case InteractiveTxBuilder.Succeeded(signingSession, commitSig) =>
case InteractiveTxBuilder.Succeeded(signingSession, commitSig, lease_opt) =>
log.info(s"splice tx created with fundingTxIndex=${signingSession.fundingTxIndex} fundingTxId=${signingSession.fundingTx.txId}")
cmd_opt.foreach(cmd => cmd.replyTo ! RES_SPLICE(fundingTxIndex = signingSession.fundingTxIndex, signingSession.fundingTx.txId, signingSession.fundingParams.fundingAmount, signingSession.localCommit.fold(_.spec, _.spec).toLocal))
remoteCommitSig_opt.foreach(self ! _)
lease_opt.collect {
case lease if !signingSession.fundingParams.isInitiator => peer ! LiquidityLeaseSigned(d.channelId, signingSession.fundingTx.txId, signingSession.fundingTxIndex, d.commitments.params.remoteParams.htlcMinimum, lease)
}
val d1 = d.copy(spliceStatus = SpliceStatus.SpliceWaitingForSigs(signingSession))
stay() using d1 storing() sending commitSig
case f: InteractiveTxBuilder.Failed =>
Expand Down Expand Up @@ -2138,6 +2142,12 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
}
}

// We tell the peer that the channel is ready to process payments that may be queued.
if (!shutdownInProgress) {
val fundingTxIndex = commitments1.active.map(_.fundingTxIndex).min
peer ! ChannelReadyForPayments(self, remoteNodeId, d.channelId, fundingTxIndex)
}

goto(NORMAL) using d.copy(commitments = commitments1, spliceStatus = spliceStatus1) sending sendQueue
}

Expand Down Expand Up @@ -2709,6 +2719,12 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
if (oldCommitments.availableBalanceForSend != newCommitments.availableBalanceForSend || oldCommitments.availableBalanceForReceive != newCommitments.availableBalanceForReceive) {
context.system.eventStream.publish(AvailableBalanceChanged(self, newCommitments.channelId, shortIds, newCommitments))
}
if (oldCommitments.active.size != newCommitments.active.size) {
// Some commitments have been deactivated, which means our available balance changed, which may allow forwarding
// payments that couldn't be forwarded before.
val fundingTxIndex = newCommitments.active.map(_.fundingTxIndex).min
peer ! ChannelReadyForPayments(self, remoteNodeId, newCommitments.channelId, fundingTxIndex)
}
}

private def maybeUpdateMaxHtlcAmount(currentMaxHtlcAmount: MilliSatoshi, newCommitments: Commitments): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.{FullySignedSharedTrans
import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession}
import fr.acinq.eclair.channel.publish.TxPublisher.SetChannelId
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.io.Peer.OpenChannelResponse
import fr.acinq.eclair.io.Peer.{LiquidityLeaseSigned, OpenChannelResponse}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{MilliSatoshiLong, RealShortChannelId, ToMilliSatoshiConversion, UInt64, randomBytes32}

Expand Down Expand Up @@ -339,9 +339,12 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {

case Event(msg: InteractiveTxBuilder.Response, d: DATA_WAIT_FOR_DUAL_FUNDING_CREATED) => msg match {
case InteractiveTxBuilder.SendMessage(_, msg) => stay() sending msg
case InteractiveTxBuilder.Succeeded(status, commitSig) =>
case InteractiveTxBuilder.Succeeded(status, commitSig, lease_opt) =>
d.deferred.foreach(self ! _)
d.replyTo_opt.foreach(_ ! OpenChannelResponse.Created(d.channelId, status.fundingTx.txId, status.fundingTx.tx.localFees.truncateToSatoshi))
lease_opt.collect {
case lease if !status.fundingParams.isInitiator => peer ! LiquidityLeaseSigned(d.channelId, status.fundingTx.txId, status.fundingTxIndex, d.channelParams.remoteParams.htlcMinimum, lease)
}
val d1 = DATA_WAIT_FOR_DUAL_FUNDING_SIGNED(d.channelParams, d.secondRemotePerCommitmentPoint, d.localPushAmount, d.remotePushAmount, status, None)
goto(WAIT_FOR_DUAL_FUNDING_SIGNED) using d1 storing() sending commitSig
case f: InteractiveTxBuilder.Failed =>
Expand Down Expand Up @@ -687,9 +690,12 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
case RbfStatus.RbfInProgress(cmd_opt, _, remoteCommitSig_opt) =>
msg match {
case InteractiveTxBuilder.SendMessage(_, msg) => stay() sending msg
case InteractiveTxBuilder.Succeeded(signingSession, commitSig) =>
case InteractiveTxBuilder.Succeeded(signingSession, commitSig, lease_opt) =>
cmd_opt.foreach(cmd => cmd.replyTo ! RES_BUMP_FUNDING_FEE(rbfIndex = d.previousFundingTxs.length, signingSession.fundingTx.txId, signingSession.fundingTx.tx.localFees.truncateToSatoshi))
remoteCommitSig_opt.foreach(self ! _)
lease_opt.collect {
case lease if !signingSession.fundingParams.isInitiator => peer ! LiquidityLeaseSigned(d.channelId, signingSession.fundingTx.txId, signingSession.fundingTxIndex, d.commitments.params.remoteParams.htlcMinimum, lease)
}
val d1 = d.copy(rbfStatus = RbfStatus.RbfWaitingForSigs(signingSession))
stay() using d1 storing() sending commitSig
case f: InteractiveTxBuilder.Failed =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ trait CommonFundingHandlers extends CommonHandlers {
// used to get the final shortChannelId, used in announcements (if minDepth >= ANNOUNCEMENTS_MINCONF this event will fire instantly)
blockchain ! WatchFundingDeeplyBuried(self, commitments.latest.fundingTxId, ANNOUNCEMENTS_MINCONF)
val commitments1 = commitments.modify(_.remoteNextCommitInfo).setTo(Right(channelReady.nextPerCommitmentPoint))
peer ! ChannelReadyForPayments(self, remoteNodeId, commitments.channelId, fundingTxIndex = 0)
DATA_NORMAL(commitments1, shortIds1, None, initialChannelUpdate, None, None, None, SpliceStatus.NoSplice)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object InteractiveTxBuilder {

sealed trait Response
case class SendMessage(sessionId: ByteVector32, msg: LightningMessage) extends Response
case class Succeeded(signingSession: InteractiveTxSigningSession.WaitingForSigs, commitSig: CommitSig) extends Response
case class Succeeded(signingSession: InteractiveTxSigningSession.WaitingForSigs, commitSig: CommitSig, liquidityPurchase_opt: Option[LiquidityAds.Purchase]) extends Response
sealed trait Failed extends Response { def cause: ChannelException }
case class LocalFailure(cause: ChannelException) extends Failed
case class RemoteFailure(cause: ChannelException) extends Failed
Expand Down Expand Up @@ -370,12 +370,24 @@ object InteractiveTxBuilder {
// Note that pending HTLCs are ignored: splices only affect the main outputs.
val nextLocalBalance = purpose.previousLocalBalance + fundingParams.localContribution - localPushAmount + remotePushAmount - liquidityFee
val nextRemoteBalance = purpose.previousRemoteBalance + fundingParams.remoteContribution - remotePushAmount + localPushAmount + liquidityFee
val liquidityPaymentTypeOk = liquidityPurchase_opt match {
case Some(l) if !fundingParams.isInitiator => l.paymentDetails match {
case LiquidityAds.PaymentDetails.FromChannelBalance | _: LiquidityAds.PaymentDetails.FromChannelBalanceForFutureHtlc => true
// If our peer has enough balance to pay the liquidity fees, they shouldn't use future HTLCs which
// involves trust: they should directly pay from their channel balance.
case _: LiquidityAds.PaymentDetails.FromFutureHtlc | _: LiquidityAds.PaymentDetails.FromFutureHtlcWithPreimage => nextRemoteBalance < l.fees.total
}
case _ => true
}
if (fundingParams.fundingAmount < fundingParams.dustLimit) {
replyTo ! LocalFailure(FundingAmountTooLow(channelParams.channelId, fundingParams.fundingAmount, fundingParams.dustLimit))
Behaviors.stopped
} else if (nextLocalBalance < 0.msat || nextRemoteBalance < 0.msat) {
replyTo ! LocalFailure(InvalidFundingBalances(channelParams.channelId, fundingParams.fundingAmount, nextLocalBalance, nextRemoteBalance))
Behaviors.stopped
} else if (!liquidityPaymentTypeOk) {
replyTo ! LocalFailure(InvalidLiquidityAdsPaymentType(channelParams.channelId, liquidityPurchase_opt.get.paymentDetails.paymentType, Set(LiquidityAds.PaymentType.FromChannelBalance, LiquidityAds.PaymentType.FromChannelBalanceForFutureHtlc)))
Behaviors.stopped
} else {
val actor = new InteractiveTxBuilder(replyTo, sessionId, nodeParams, channelParams, fundingParams, purpose, localPushAmount, remotePushAmount, liquidityPurchase_opt, wallet, stash, context)
actor.start()
Expand Down Expand Up @@ -805,7 +817,7 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
Behaviors.receiveMessagePartial {
case SignTransactionResult(signedTx) =>
log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length)
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig)
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig, liquidityPurchase_opt)
Behaviors.stopped
case WalletFailure(t) =>
log.error("could not sign funding transaction: ", t)
Expand Down
5 changes: 5 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait Databases {
def peers: PeersDb
def payments: PaymentsDb
def pendingCommands: PendingCommandsDb
def onTheFlyFunding: OnTheFlyFundingDb
//@formatter:on
}

Expand All @@ -65,6 +66,7 @@ object Databases extends Logging {
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
pendingCommands: SqlitePendingCommandsDb,
onTheFlyFunding: SqliteOnTheFlyFundingDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
statement => {
Expand All @@ -83,6 +85,7 @@ object Databases extends Logging {
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
onTheFlyFunding = new SqliteOnTheFlyFundingDb(eclairJdbc),
backupConnection = eclairJdbc
)
}
Expand All @@ -94,6 +97,7 @@ object Databases extends Logging {
peers: PgPeersDb,
payments: PgPaymentsDb,
pendingCommands: PgPendingCommandsDb,
onTheFlyFunding: PgOnTheFlyFundingDb,
dataSource: HikariDataSource,
lock: PgLock) extends Databases with ExclusiveLock {
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
Expand Down Expand Up @@ -154,6 +158,7 @@ object Databases extends Logging {
peers = new PgPeersDb,
payments = new PgPaymentsDb,
pendingCommands = new PgPendingCommandsDb,
onTheFlyFunding = new PgOnTheFlyFundingDb,
dataSource = ds,
lock = lock)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _, _) =>
channelsDb.updateChannelMeta(fromChannelId, ChannelEvent.EventType.PaymentReceived)
channelsDb.updateChannelMeta(toChannelId, ChannelEvent.EventType.PaymentSent)
case OnTheFlyFundingPaymentRelayed(_, incoming, outgoing) =>
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent))
}
auditDb.add(e)

Expand Down
Loading

0 comments on commit e704f28

Please sign in to comment.