Skip to content

Commit

Permalink
Reorder functions in NodeRelay.scala
Browse files Browse the repository at this point in the history
This commit doesn't contain any logical change, we just move code to
align with the FSM flow. It makes it easier to follow the progress of
the state machine to always scroll down when advancing states.
  • Loading branch information
t-bast committed Jun 13, 2024
1 parent 3277e6d commit f954ad7
Showing 1 changed file with 69 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,60 +262,6 @@ class NodeRelay private(nodeParams: NodeParams,
relay(upstream, nextPayload, nextPacket_opt)
}

/**
* Once the payment is forwarded, we're waiting for fail/fulfill responses from downstream nodes.
*
* @param upstream complete HTLC set received.
* @param nextPayload relay instructions.
* @param fulfilledUpstream true if we already fulfilled the payment upstream.
*/
private def sending(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay, startedAt: TimestampMilli, fulfilledUpstream: Boolean): Behavior[Command] =
Behaviors.receiveMessagePartial {
rejectExtraHtlcPartialFunction orElse {
// this is the fulfill that arrives from downstream channels
case WrappedPreimageReceived(PreimageReceived(_, paymentPreimage)) =>
if (!fulfilledUpstream) {
// We want to fulfill upstream as soon as we receive the preimage (even if not all HTLCs have fulfilled downstream).
context.log.debug("got preimage from downstream")
fulfillPayment(upstream, paymentPreimage)
sending(upstream, nextPayload, startedAt, fulfilledUpstream = true)
} else {
// we don't want to fulfill multiple times
Behaviors.same
}
case WrappedPaymentSent(paymentSent) =>
context.log.debug("trampoline payment fully resolved downstream")
success(upstream, fulfilledUpstream, paymentSent)
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
context.log.debug(s"trampoline payment failed downstream")
if (!fulfilledUpstream) {
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
}
recordRelayDuration(startedAt, isSuccess = fulfilledUpstream)
stopping()
}
}

/**
* Once the downstream payment is settled (fulfilled or failed), we reject new upstream payments while we wait for our parent to stop us.
*/
private def stopping(): Behavior[Command] = {
parent ! NodeRelayer.RelayComplete(context.self, paymentHash, paymentSecret)
Behaviors.receiveMessagePartial {
rejectExtraHtlcPartialFunction orElse {
case Stop => Behaviors.stopped
}
}
}

private val payFsmAdapters = {
context.messageAdapter[PreimageReceived](WrappedPreimageReceived)
context.messageAdapter[PaymentSent](WrappedPaymentSent)
context.messageAdapter[PaymentFailed](WrappedPaymentFailed)
}.toClassic

private def relay(upstream: Upstream.Trampoline, payloadOut: IntermediatePayload.NodeRelay, packetOut_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
val displayNodeId = payloadOut match {
case payloadOut: IntermediatePayload.NodeRelay.Standard => payloadOut.outgoingNodeId
Expand Down Expand Up @@ -345,23 +291,6 @@ class NodeRelay private(nodeParams: NodeParams,
}
}

private def relayToRecipient(upstream: Upstream.Trampoline,
payloadOut: IntermediatePayload.NodeRelay,
recipient: Recipient,
paymentCfg: SendPaymentConfig,
routeParams: RouteParams,
useMultiPart: Boolean): Behavior[Command] = {
val payment =
if (useMultiPart) {
SendMultiPartPayment(payFsmAdapters, recipient, nodeParams.maxPaymentAttempts, routeParams)
} else {
SendPaymentToNode(payFsmAdapters, recipient, nodeParams.maxPaymentAttempts, routeParams)
}
val payFSM = outgoingPaymentFactory.spawnOutgoingPayFSM(context, paymentCfg, useMultiPart)
payFSM ! payment
sending(upstream, payloadOut, TimestampMilli.now(), fulfilledUpstream = false)
}

/**
* Blinded paths in Bolt 12 invoices may encode the introduction node with an scid and a direction: we need to resolve
* that to a nodeId in order to reach that introduction node and use the blinded path.
Expand All @@ -384,6 +313,75 @@ class NodeRelay private(nodeParams: NodeParams,
relayToRecipient(upstream, payloadOut, recipient, paymentCfg, routeParams, features.hasFeature(Features.BasicMultiPartPayment))
}

private def relayToRecipient(upstream: Upstream.Trampoline,
payloadOut: IntermediatePayload.NodeRelay,
recipient: Recipient,
paymentCfg: SendPaymentConfig,
routeParams: RouteParams,
useMultiPart: Boolean): Behavior[Command] = {
val payFsmAdapters = {
context.messageAdapter[PreimageReceived](WrappedPreimageReceived)
context.messageAdapter[PaymentSent](WrappedPaymentSent)
context.messageAdapter[PaymentFailed](WrappedPaymentFailed)
}.toClassic
val payment = if (useMultiPart) {
SendMultiPartPayment(payFsmAdapters, recipient, nodeParams.maxPaymentAttempts, routeParams)
} else {
SendPaymentToNode(payFsmAdapters, recipient, nodeParams.maxPaymentAttempts, routeParams)
}
val payFSM = outgoingPaymentFactory.spawnOutgoingPayFSM(context, paymentCfg, useMultiPart)
payFSM ! payment
sending(upstream, payloadOut, TimestampMilli.now(), fulfilledUpstream = false)
}

/**
* Once the payment is forwarded, we're waiting for fail/fulfill responses from downstream nodes.
*
* @param upstream complete HTLC set received.
* @param nextPayload relay instructions.
* @param fulfilledUpstream true if we already fulfilled the payment upstream.
*/
private def sending(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay, startedAt: TimestampMilli, fulfilledUpstream: Boolean): Behavior[Command] =
Behaviors.receiveMessagePartial {
rejectExtraHtlcPartialFunction orElse {
// this is the fulfill that arrives from downstream channels
case WrappedPreimageReceived(PreimageReceived(_, paymentPreimage)) =>
if (!fulfilledUpstream) {
// We want to fulfill upstream as soon as we receive the preimage (even if not all HTLCs have fulfilled downstream).
context.log.debug("got preimage from downstream")
fulfillPayment(upstream, paymentPreimage)
sending(upstream, nextPayload, startedAt, fulfilledUpstream = true)
} else {
// we don't want to fulfill multiple times
Behaviors.same
}
case WrappedPaymentSent(paymentSent) =>
context.log.debug("trampoline payment fully resolved downstream")
success(upstream, fulfilledUpstream, paymentSent)
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
context.log.debug(s"trampoline payment failed downstream")
if (!fulfilledUpstream) {
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
}
recordRelayDuration(startedAt, isSuccess = fulfilledUpstream)
stopping()
}
}

/**
* Once the downstream payment is settled (fulfilled or failed), we reject new upstream payments while we wait for our parent to stop us.
*/
private def stopping(): Behavior[Command] = {
parent ! NodeRelayer.RelayComplete(context.self, paymentHash, paymentSecret)
Behaviors.receiveMessagePartial {
rejectExtraHtlcPartialFunction orElse {
case Stop => Behaviors.stopped
}
}
}

private def rejectExtraHtlcPartialFunction: PartialFunction[Command, Behavior[Command]] = {
case Relay(nodeRelayPacket) =>
rejectExtraHtlc(nodeRelayPacket.add)
Expand Down

0 comments on commit f954ad7

Please sign in to comment.