-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fn: add goroutine manager #9141
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package fn | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync" | ||
) | ||
|
||
// ErrStopping is returned when trying to add a new goroutine while stopping. | ||
var ErrStopping = errors.New("can not add goroutine, stopping") | ||
|
||
// GoroutineManager is used to launch goroutines until context expires or the | ||
// manager is stopped. The Stop method blocks until all started goroutines stop. | ||
type GoroutineManager struct { | ||
wg sync.WaitGroup | ||
mu sync.Mutex | ||
ctx context.Context | ||
cancel func() | ||
} | ||
|
||
// NewGoroutineManager constructs and returns a new instance of | ||
// GoroutineManager. | ||
func NewGoroutineManager(ctx context.Context) *GoroutineManager { | ||
ctx, cancel := context.WithCancel(ctx) | ||
|
||
return &GoroutineManager{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
} | ||
} | ||
|
||
// Go starts a new goroutine if the manager is not stopping. | ||
func (g *GoroutineManager) Go(f func(ctx context.Context)) error { | ||
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race | ||
// condition, since it is not clear should Wait() block or not. This | ||
// kind of race condition is detected by Go runtime and results in a | ||
// crash if running with `-race`. To prevent this, whole Go method is | ||
// protected with a mutex. The call to wg.Wait() inside Stop() can still | ||
// run in parallel with Go, but in that case g.ctx is in expired state, | ||
// because cancel() was called in Stop, so Go returns before wg.Add(1) | ||
// call. | ||
g.mu.Lock() | ||
defer g.mu.Unlock() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit add a comment, that calling Add when already a Wait() is acitve will cause the golang env to panic ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Also added a similar comment in GoroutineManager.Stop method. |
||
if g.ctx.Err() != nil { | ||
return ErrStopping | ||
} | ||
|
||
g.wg.Add(1) | ||
go func() { | ||
defer g.wg.Done() | ||
f(g.ctx) | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
// Stop prevents new goroutines from being added and waits for all running | ||
// goroutines to finish. | ||
func (g *GoroutineManager) Stop() { | ||
g.mu.Lock() | ||
g.cancel() | ||
g.mu.Unlock() | ||
|
||
// Wait for all goroutines to finish. Note that this wg.Wait() call is | ||
// safe, since it can't run in parallel with wg.Add(1) call in Go, since | ||
// we just cancelled the context and even if Go call starts running here | ||
// after acquiring the mutex, it would see that the context has expired | ||
// and return ErrStopping instead of calling wg.Add(1). | ||
g.wg.Wait() | ||
} | ||
|
||
// Done returns a channel which is closed when either the context passed to | ||
// NewGoroutineManager expires or when Stop is called. | ||
func (g *GoroutineManager) Done() <-chan struct{} { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ |
||
return g.ctx.Done() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package fn | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
// TestGoroutineManager tests that the GoroutineManager starts goroutines until | ||
// ctx expires. It also makes sure it fails to start new goroutines after the | ||
// context expired and the GoroutineManager is in the process of waiting for | ||
// already started goroutines in the Stop method. | ||
func TestGoroutineManager(t *testing.T) { | ||
t.Parallel() | ||
|
||
m := NewGoroutineManager(context.Background()) | ||
|
||
taskChan := make(chan struct{}) | ||
|
||
require.NoError(t, m.Go(func(ctx context.Context) { | ||
<-taskChan | ||
})) | ||
|
||
t1 := time.Now() | ||
|
||
// Close taskChan in 1s, causing the goroutine to stop. | ||
time.AfterFunc(time.Second, func() { | ||
close(taskChan) | ||
}) | ||
|
||
m.Stop() | ||
stopDelay := time.Since(t1) | ||
|
||
// Make sure Stop was waiting for the goroutine to stop. | ||
require.Greater(t, stopDelay, time.Second) | ||
|
||
// Make sure new goroutines do not start after Stop. | ||
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping) | ||
ProofOfKeags marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// When Stop() is called, the internal context expires and m.Done() is | ||
// closed. Test this. | ||
select { | ||
case <-m.Done(): | ||
default: | ||
t.Errorf("Done() channel must be closed at this point") | ||
} | ||
} | ||
|
||
// TestGoroutineManagerContextExpires tests the effect of context expiry. | ||
func TestGoroutineManagerContextExpires(t *testing.T) { | ||
t.Parallel() | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
m := NewGoroutineManager(ctx) | ||
|
||
require.NoError(t, m.Go(func(ctx context.Context) { | ||
<-ctx.Done() | ||
})) | ||
|
||
// The Done channel of the manager should not be closed, so the | ||
// following call must block. | ||
select { | ||
case <-m.Done(): | ||
t.Errorf("Done() channel must not be closed at this point") | ||
default: | ||
} | ||
|
||
cancel() | ||
|
||
// The Done channel of the manager should be closed, so the following | ||
// call must not block. | ||
select { | ||
case <-m.Done(): | ||
default: | ||
t.Errorf("Done() channel must be closed at this point") | ||
} | ||
|
||
// Make sure new goroutines do not start after context expiry. | ||
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping) | ||
|
||
// Stop will wait for all goroutines to stop. | ||
m.Stop() | ||
} | ||
|
||
// TestGoroutineManagerStress starts many goroutines while calling Stop. It | ||
// is needed to make sure the GoroutineManager does not crash if this happen. | ||
// If the mutex was not used, it would crash because of a race condition between | ||
// wg.Add(1) and wg.Wait(). | ||
func TestGoroutineManagerStress(t *testing.T) { | ||
ProofOfKeags marked this conversation as resolved.
Show resolved
Hide resolved
|
||
t.Parallel() | ||
|
||
m := NewGoroutineManager(context.Background()) | ||
|
||
stopChan := make(chan struct{}) | ||
|
||
time.AfterFunc(1*time.Millisecond, func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so the panic happens in the golang runtime if the counter is actually 0 and we all Add(), or doesn't it matter but as soon as an Add() is called after the Wait was called the panic happens. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From https://pkg.go.dev/sync#WaitGroup.Add
So if the counter is 0, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checked the code, and it even panics if you call Add() while Wait is already waiting for all goroutine to finish. Then Wait() will panic, in your above example Add() will panic if the counter is zero and we already have waiters registered. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Even with a non-zero counter? If the counter is 0, Wait() should unblock immediately, not wait. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes true, what you are referring to is basically that the wait() method returned because the Wait() method compares if the state was changed before returning if that was the case it panics. |
||
m.Stop() | ||
close(stopChan) | ||
}) | ||
|
||
// Starts 100 goroutines sequentially. Sequential order is needed to | ||
// keep wg.counter low (0 or 1) to increase probability of race | ||
// condition to be caught if it exists. If mutex is removed in the | ||
// implementation, this test crashes under `-race`. | ||
for i := 0; i < 100; i++ { | ||
taskChan := make(chan struct{}) | ||
err := m.Go(func(ctx context.Context) { | ||
close(taskChan) | ||
}) | ||
// If goroutine was started, wait for its completion. | ||
if err == nil { | ||
<-taskChan | ||
} | ||
} | ||
|
||
// Wait for Stop to complete. | ||
<-stopChan | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding the
GoroutineManager
I mean it's good that we are introducing it, but I think we should disincentivize its use, because there is something wrong with the design if you need to use it, maybe we should mark this in the commentThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Added the following note:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I have to chime in here. Why are we adding an abstraction that we are explicitly discouraging? Seems like this will be counterproductive in the long run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm ok, so you are actually fine with this way to use the waitGroup ? I thought it was a bandaid to the current situation to not rewrite the whole htlc switch/payment code, but using the waitgroup in that way is not good design imo, because the waitgroup is inherently thread safe and does not need a mutex generally when it is made sure Adds are called before Wait().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is that building infrastructure to support bad code design is just weird. We should fix the design or patch over it. We don't want to make it easier to write bad code in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The common scenario is like this:
(It is not only htlc swich, also found in ChannelRouter.SendPaymentAsync, maybe in other cases too.)
Such a scenario is not thread-safe to use a WaitGroup (without a mutex) to track goroutines.
But is this always a bad scenario which we should discourage in favor of a design using fixed number of goroutines started in
New()
and stopped inStop()
? The later scenario is thread safe for an unprotected WaitGroup.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the issue is with fixed vs variable number of goroutines. This issue arises from having multiple threads responsible for managing the WaitGroup. By sending requests into some object's main event loop (which is responsible for managing the waitgroup) and having it launch the new goroutine would handily solve the issue. However right now we cannot make the assumption that the methods of an object are called from the same thread, which is what causes the problem.
This brings me to a new theorem of mine: WaitGroups that manage a runtime determined number of threads should only exist in the stack memory of a goroutine, not in a field on some object. If the number of goroutines is statically known at compile time, then it may be on the field of the object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the theorem! It is a good rule resulting in cleaner code. Also I think it is possible to enforce it at compile time (lint rule?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how to write lint rules right now but if that's possible that sounds great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the note.