Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery+graph: track job set dependencies in vb #9241

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ type AuthenticatedGossiper struct {
// AuthenticatedGossiper lock.
chanUpdateRateLimiter map[uint64][2]*rate.Limiter

// vb is used to enforce job dependency ordering of gossip messages.
vb *graph.ValidationBarrier

sync.Mutex
}

Expand All @@ -537,6 +540,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
banman: newBanman(),
}

gossiper.vb = graph.NewValidationBarrier(1000, gossiper.quit)

gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
Expand Down Expand Up @@ -1398,10 +1403,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
}

// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := graph.NewValidationBarrier(1000, d.quit)

for {
select {
// A new policy update has arrived. We'll commit it to the
Expand Down Expand Up @@ -1470,11 +1471,17 @@ func (d *AuthenticatedGossiper) networkHandler() {
// We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg)
annJobID, err := d.vb.InitJobDependencies(
announcement.msg,
)
if err != nil {
announcement.err <- err
continue
}

d.wg.Add(1)
go d.handleNetworkMessages(
announcement, &announcements, validationBarrier,
announcement, &announcements, annJobID,
)

// The trickle timer has ticked, which indicates we should
Expand Down Expand Up @@ -1525,18 +1532,18 @@ func (d *AuthenticatedGossiper) networkHandler() {
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
deDuped *deDupedAnnouncements, jobID graph.JobID) {

defer d.wg.Done()
defer vb.CompleteJob()
defer d.vb.CompleteJob()

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
err := d.vb.WaitForParents(jobID, nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)
Expand Down Expand Up @@ -1566,7 +1573,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,

// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)
err = d.vb.SignalDependents(nMsg.msg, jobID)
if err != nil {
// Something is wrong if SignalDependents returns an error.
log.Errorf("SignalDependents returned error for msg=%v with "+
"JobID=%v", spew.Sdump(nMsg.msg), jobID)

nMsg.err <- err

return
}

// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
Expand Down
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@
added and they will be removed in a future release. The defaults values for
these options have also been increased from max 3 log files to 10 and from
max 10 MB to 20 MB.

* Refactored the `ValidationBarrier` to use
[set-based dependency tracking](https://github.com/lightningnetwork/lnd/pull/9241).

* [Deprecate `dust-threshold`
config option](https://github.com/lightningnetwork/lnd/pull/9182) and introduce
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn/v2 v2.0.2
github.com/lightningnetwork/lnd/fn/v2 v2.0.7
github.com/lightningnetwork/lnd/healthcheck v1.2.6
github.com/lightningnetwork/lnd/kvdb v1.4.12
github.com/lightningnetwork/lnd/queue v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn/v2 v2.0.2 h1:M7o2lYrh/zCp+lntPB3WP/rWTu5U+4ssyHW+kqNJ0fs=
github.com/lightningnetwork/lnd/fn/v2 v2.0.2/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/fn/v2 v2.0.7 h1:2LkgcGk20vXcUJyrlYLWMptnEouOBnCixskMsQW+GxU=
github.com/lightningnetwork/lnd/fn/v2 v2.0.7/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI=
github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ=
github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ=
Expand Down
40 changes: 5 additions & 35 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,45 +675,20 @@ func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier,
defer b.wg.Done()
defer vb.CompleteJob()
Crypt-iQ marked this conversation as resolved.
Show resolved Hide resolved

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(update.msg)
if err != nil {
switch {
case IsError(err, ErrVBarrierShuttingDown):
update.err <- err

case IsError(err, ErrParentValidationFailed):
update.err <- NewErrf(ErrIgnored, err.Error()) //nolint

default:
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
update.err <- err
}

return
}

// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
err = b.processUpdate(update.msg, update.op...)
err := b.processUpdate(update.msg, update.op...)
update.err <- err

// If this message had any dependencies, then we can now signal them to
// continue.
allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
vb.SignalDependants(update.msg, allowDependents)
notError := err == nil || IsError(err, ErrIgnored, ErrOutdated)

// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// We now decide to log an error or not. If allowDependents is
// false, it means there is an error and the error is neither
// ErrIgnored or ErrOutdated. In this case, we'll log an error.
// Otherwise, we'll add debug log only.
if allowDependents {
// Log as a debug message if this is not an error we need to be
// concerned about.
if notError {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
Expand Down Expand Up @@ -789,11 +764,6 @@ func (b *Builder) networkHandler() {
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
// We'll set up any dependants, and wait until a free
// slot for this job opens up, this allows us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(update.msg)

b.wg.Add(1)
go b.handleNetworkUpdate(validationBarrier, update)

Expand Down
Loading
Loading