Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fn: add goroutine manager #9141

Merged
merged 1 commit into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions fn/goroutine_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fn

import (
"context"
"errors"
"sync"
)

// ErrStopping is returned when trying to add a new goroutine while stopping.
var ErrStopping = errors.New("can not add goroutine, stopping")

// GoroutineManager is used to launch goroutines until context expires or the
// manager is stopped. The Stop method blocks until all started goroutines stop.
type GoroutineManager struct {
wg sync.WaitGroup
mu sync.Mutex
ctx context.Context
cancel func()
}

// NewGoroutineManager constructs and returns a new instance of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the GoroutineManager I mean it's good that we are introducing it, but I think we should disincentivize its use, because there is something wrong with the design if you need to use it, maybe we should mark this in the comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added the following note:

// NOTE: while it is safe to use GoroutineManager, it is recommended to change
// the design of the code using it to avoid launching goroutines except at start
// time. Ideally, goroutine launches and Stop() call should not compete.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I have to chime in here. Why are we adding an abstraction that we are explicitly discouraging? Seems like this will be counterproductive in the long run.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm ok, so you are actually fine with this way to use the waitGroup ? I thought it was a bandaid to the current situation to not rewrite the whole htlc switch/payment code, but using the waitgroup in that way is not good design imo, because the waitgroup is inherently thread safe and does not need a mutex generally when it is made sure Adds are called before Wait().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that building infrastructure to support bad code design is just weird. We should fix the design or patch over it. We don't want to make it easier to write bad code in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common scenario is like this:

Some type 
  - New() returns new instance of the type
  - Some method starts an auxiliary goroutine which may keep working after the method completes
   (though this is not needed to have the problem with goroutines)
  - Method Stop() instructs all such background running workers to stop and waits for them to
   actually stop to avoid goroutine leak

(It is not only htlc swich, also found in ChannelRouter.SendPaymentAsync, maybe in other cases too.)

Such a scenario is not thread-safe to use a WaitGroup (without a mutex) to track goroutines.

But is this always a bad scenario which we should discourage in favor of a design using fixed number of goroutines started in New() and stopped in Stop()? The later scenario is thread safe for an unprotected WaitGroup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the issue is with fixed vs variable number of goroutines. This issue arises from having multiple threads responsible for managing the WaitGroup. By sending requests into some object's main event loop (which is responsible for managing the waitgroup) and having it launch the new goroutine would handily solve the issue. However right now we cannot make the assumption that the methods of an object are called from the same thread, which is what causes the problem.

This brings me to a new theorem of mine: WaitGroups that manage a runtime determined number of threads should only exist in the stack memory of a goroutine, not in a field on some object. If the number of goroutines is statically known at compile time, then it may be on the field of the object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the theorem! It is a good rule resulting in cleaner code. Also I think it is possible to enforce it at compile time (lint rule?).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to write lint rules right now but if that's possible that sounds great!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the note.

// GoroutineManager.
func NewGoroutineManager(ctx context.Context) *GoroutineManager {
ctx, cancel := context.WithCancel(ctx)

return &GoroutineManager{
ctx: ctx,
cancel: cancel,
}
}

// Go starts a new goroutine if the manager is not stopping.
func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
// condition, since it is not clear should Wait() block or not. This
// kind of race condition is detected by Go runtime and results in a
// crash if running with `-race`. To prevent this, whole Go method is
// protected with a mutex. The call to wg.Wait() inside Stop() can still
// run in parallel with Go, but in that case g.ctx is in expired state,
// because cancel() was called in Stop, so Go returns before wg.Add(1)
// call.
g.mu.Lock()
defer g.mu.Unlock()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit add a comment, that calling Add when already a Wait() is acitve will cause the golang env to panic ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also added a similar comment in GoroutineManager.Stop method.

if g.ctx.Err() != nil {
return ErrStopping
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
f(g.ctx)
}()

return nil
}

// Stop prevents new goroutines from being added and waits for all running
// goroutines to finish.
func (g *GoroutineManager) Stop() {
g.mu.Lock()
g.cancel()
g.mu.Unlock()

// Wait for all goroutines to finish. Note that this wg.Wait() call is
// safe, since it can't run in parallel with wg.Add(1) call in Go, since
// we just cancelled the context and even if Go call starts running here
// after acquiring the mutex, it would see that the context has expired
// and return ErrStopping instead of calling wg.Add(1).
g.wg.Wait()
}

// Done returns a channel which is closed when either the context passed to
// NewGoroutineManager expires or when Stop is called.
func (g *GoroutineManager) Done() <-chan struct{} {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return g.ctx.Done()
}
121 changes: 121 additions & 0 deletions fn/goroutine_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package fn

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestGoroutineManager tests that the GoroutineManager starts goroutines until
// ctx expires. It also makes sure it fails to start new goroutines after the
// context expired and the GoroutineManager is in the process of waiting for
// already started goroutines in the Stop method.
func TestGoroutineManager(t *testing.T) {
t.Parallel()

m := NewGoroutineManager(context.Background())

taskChan := make(chan struct{})

require.NoError(t, m.Go(func(ctx context.Context) {
<-taskChan
}))

t1 := time.Now()

// Close taskChan in 1s, causing the goroutine to stop.
time.AfterFunc(time.Second, func() {
close(taskChan)
})

m.Stop()
stopDelay := time.Since(t1)

// Make sure Stop was waiting for the goroutine to stop.
require.Greater(t, stopDelay, time.Second)

// Make sure new goroutines do not start after Stop.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
ProofOfKeags marked this conversation as resolved.
Show resolved Hide resolved

// When Stop() is called, the internal context expires and m.Done() is
// closed. Test this.
select {
case <-m.Done():
default:
t.Errorf("Done() channel must be closed at this point")
}
}

// TestGoroutineManagerContextExpires tests the effect of context expiry.
func TestGoroutineManagerContextExpires(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())

m := NewGoroutineManager(ctx)

require.NoError(t, m.Go(func(ctx context.Context) {
<-ctx.Done()
}))

// The Done channel of the manager should not be closed, so the
// following call must block.
select {
case <-m.Done():
t.Errorf("Done() channel must not be closed at this point")
default:
}

cancel()

// The Done channel of the manager should be closed, so the following
// call must not block.
select {
case <-m.Done():
default:
t.Errorf("Done() channel must be closed at this point")
}

// Make sure new goroutines do not start after context expiry.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)

// Stop will wait for all goroutines to stop.
m.Stop()
}

// TestGoroutineManagerStress starts many goroutines while calling Stop. It
// is needed to make sure the GoroutineManager does not crash if this happen.
// If the mutex was not used, it would crash because of a race condition between
// wg.Add(1) and wg.Wait().
func TestGoroutineManagerStress(t *testing.T) {
ProofOfKeags marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

m := NewGoroutineManager(context.Background())

stopChan := make(chan struct{})

time.AfterFunc(1*time.Millisecond, func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the panic happens in the golang runtime if the counter is actually 0 and we all Add(), or doesn't it matter but as soon as an Add() is called after the Wait was called the panic happens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://pkg.go.dev/sync#WaitGroup.Add

Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time.

So if the counter is 0, Add(1) and Wait() must not be called in parallel.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the code, and it even panics if you call Add() while Wait is already waiting for all goroutine to finish. Then Wait() will panic, in your above example Add() will panic if the counter is zero and we already have waiters registered.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it even panics if you call Add() while Wait is already waiting for all goroutine to finish

Even with a non-zero counter?

If the counter is 0, Wait() should unblock immediately, not wait.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes true, what you are referring to is basically that the wait() method returned because the Wait() method compares if the state was changed before returning if that was the case it panics.

m.Stop()
close(stopChan)
})

// Starts 100 goroutines sequentially. Sequential order is needed to
// keep wg.counter low (0 or 1) to increase probability of race
// condition to be caught if it exists. If mutex is removed in the
// implementation, this test crashes under `-race`.
for i := 0; i < 100; i++ {
taskChan := make(chan struct{})
err := m.Go(func(ctx context.Context) {
close(taskChan)
})
// If goroutine was started, wait for its completion.
if err == nil {
<-taskChan
}
}

// Wait for Stop to complete.
<-stopChan
}
Loading