diff --git a/all_channels_test.go b/all_channels_test.go index 5f506b58..18ec478f 100644 --- a/all_channels_test.go +++ b/all_channels_test.go @@ -18,23 +18,27 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package tchannel +package tchannel_test import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/uber/tchannel-go" + "github.com/uber/tchannel-go/testutils" ) func TestAllChannelsRegistered(t *testing.T) { - introspectOpts := &IntrospectionOptions{IncludeOtherChannels: true} + introspectOpts := &tchannel.IntrospectionOptions{IncludeOtherChannels: true} - ch1_1, err := NewChannel("ch1", nil) + ch1_1, err := tchannel.NewChannel("ch1", nil) require.NoError(t, err, "Channel create failed") - ch1_2, err := NewChannel("ch1", nil) + ch1_2, err := tchannel.NewChannel("ch1", nil) require.NoError(t, err, "Channel create failed") - ch2_1, err := NewChannel("ch2", nil) + ch2_1, err := tchannel.NewChannel("ch2", nil) require.NoError(t, err, "Channel create failed") state := ch1_1.IntrospectState(introspectOpts) @@ -42,12 +46,13 @@ func TestAllChannelsRegistered(t *testing.T) { assert.Equal(t, 1, len(state.OtherChannels["ch2"])) ch1_2.Close() + time.Sleep(testutils.Timeout(10 * time.Millisecond)) state = ch1_1.IntrospectState(introspectOpts) assert.Equal(t, 0, len(state.OtherChannels["ch1"])) assert.Equal(t, 1, len(state.OtherChannels["ch2"])) - ch2_2, err := NewChannel("ch2", nil) + ch2_2, err := tchannel.NewChannel("ch2", nil) state = ch1_1.IntrospectState(introspectOpts) require.NoError(t, err, "Channel create failed") @@ -57,6 +62,7 @@ func TestAllChannelsRegistered(t *testing.T) { ch1_1.Close() ch2_1.Close() ch2_2.Close() + time.Sleep(testutils.Timeout(10 * time.Millisecond)) state = ch1_1.IntrospectState(introspectOpts) assert.Equal(t, 0, len(state.OtherChannels["ch1"])) diff --git a/channel.go b/channel.go index e62e3dfa..cb6801ff 100644 --- a/channel.go +++ b/channel.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/uber/tchannel-go/timers" "github.com/uber/tchannel-go/tnet" "github.com/opentracing/opentracing-go" @@ -70,6 +71,10 @@ type ChannelOptions struct { // clamped to this value). Passing zero uses the default of 2m. RelayMaxTimeout time.Duration + // RelayTimeoutTick is the granularity of the timer wheel used to manage + // timeouts. + RelayTimeoutTick time.Duration + // The reporter to use for reporting stats for this channel. StatsReporter StatsReporter @@ -121,6 +126,8 @@ type Channel struct { peers *PeerList relayHost RelayHost relayMaxTimeout time.Duration + relayTimeoutTick time.Duration + relayTimeoutWheel *timers.Wheel // mutable contains all the members of Channel which are mutable. mutable struct { @@ -209,6 +216,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { connectionOptions: opts.DefaultConnectionOptions, relayHost: opts.RelayHost, relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger), + relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger), } ch.peers = newRootPeerList(ch).newChild() @@ -230,6 +238,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { if opts.RelayHost != nil { opts.RelayHost.SetChannel(ch) + ch.startWheel() } return ch, nil } @@ -665,13 +674,16 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) { chState, minState) if updatedToState == ChannelClosed { - ch.onClosed() + go ch.onClosed() } } func (ch *Channel) onClosed() { removeClosedChannel(ch) - ch.log.Infof("Channel closed.") + if ch.relayTimeoutWheel != nil { + ch.relayTimeoutWheel.Stop() + } + ch.log.Info("Channel closed.") } // Closed returns whether this channel has been closed with .Close() @@ -717,7 +729,7 @@ func (ch *Channel) Close() { } if channelClosed { - ch.onClosed() + go ch.onClosed() } } @@ -726,6 +738,10 @@ func (ch *Channel) RelayHost() RelayHost { return ch.relayHost } +func (ch *Channel) startWheel() { + ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout) +} + func toStringSet(ss []string) map[string]struct{} { set := make(map[string]struct{}, len(ss)) for _, s := range ss { diff --git a/connection_test.go b/connection_test.go index de2e1721..9780458c 100644 --- a/connection_test.go +++ b/connection_test.go @@ -398,7 +398,7 @@ func TestTimeout(t *testing.T) { testHandler := onErrorTestHandler{newTestHandler(t), onError} ts.Register(raw.Wrap(testHandler), "block") - ctx, cancel := NewContext(testutils.Timeout(15 * time.Millisecond)) + ctx, cancel := NewContext(testutils.Timeout(25 * time.Millisecond)) defer cancel() _, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "block", []byte("Arg2"), []byte("Arg3")) @@ -662,7 +662,7 @@ func TestReadTimeout(t *testing.T) { func TestWriteTimeout(t *testing.T) { testutils.WithTestServer(t, nil, func(ts *testutils.TestServer) { ch := ts.Server() - ctx, cancel := NewContext(testutils.Timeout(15 * time.Millisecond)) + ctx, cancel := NewContext(testutils.Timeout(25 * time.Millisecond)) defer cancel() call, err := ch.BeginCall(ctx, ts.HostPort(), ch.ServiceName(), "call", nil) diff --git a/introspection.go b/introspection.go index 8539d224..362de6f2 100644 --- a/introspection.go +++ b/introspection.go @@ -156,6 +156,7 @@ type RelayerRuntimeState struct { InboundItems RelayItemSetState `json:"inboundItems"` OutboundItems RelayItemSetState `json:"outboundItems"` MaxTimeout time.Duration `json:"maxTimeout"` + TimeoutTick time.Duration `json:"timeoutTick"` } // ExchangeSetRuntimeState is the runtime state for a message exchange set. @@ -360,6 +361,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat InboundItems: r.inbound.IntrospectState(opts, "inbound"), OutboundItems: r.outbound.IntrospectState(opts, "outbound"), MaxTimeout: r.maxTimeout, + TimeoutTick: r.timeoutTick, } } diff --git a/relay.go b/relay.go index c366c2ea..4fd5b7bc 100644 --- a/relay.go +++ b/relay.go @@ -28,6 +28,7 @@ import ( "time" "github.com/uber/tchannel-go/relay" + "github.com/uber/tchannel-go/timers" "github.com/uber-go/atomic" ) @@ -40,6 +41,9 @@ const ( _relayTombTTL = 3 * time.Second // _defaultRelayMaxTimeout is the default max TTL for relayed calls. _defaultRelayMaxTimeout = 2 * time.Minute + // _defaultRelayTimeoutTick is the default tick duration for processing + // relay timeouts. + _defaultRelayTimeoutTick = 5 * time.Millisecond ) var ( @@ -53,7 +57,7 @@ var ( type relayConn Connection type relayItem struct { - *time.Timer + *timers.Timer remapID uint32 tomb bool @@ -67,14 +71,16 @@ type relayItems struct { sync.RWMutex logger Logger + wheel *timers.Wheel tombs uint64 items map[uint32]relayItem } -func newRelayItems(logger Logger) *relayItems { +func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems { return &relayItems{ items: make(map[uint32]relayItem), logger: logger, + wheel: wheel, } } @@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo r.items[id] = item r.Unlock() - // TODO: We should be clearing these out in batches, rather than creating - // individual timers for each item. - time.AfterFunc(deleteAfter, func() { r.Delete(id) }) + r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) }) return item, true } @@ -165,8 +169,10 @@ const ( // A Relayer forwards frames. type Relayer struct { - relayHost RelayHost - maxTimeout time.Duration + relayHost RelayHost + maxTimeout time.Duration + timeoutTick time.Duration + wheel *timers.Wheel // localHandlers is the set of service names that are handled by the local // channel. @@ -190,12 +196,15 @@ type Relayer struct { // NewRelayer constructs a Relayer. func NewRelayer(ch *Channel, conn *Connection) *Relayer { + wheel := ch.relayTimeoutWheel return &Relayer{ relayHost: ch.RelayHost(), maxTimeout: ch.relayMaxTimeout, + timeoutTick: ch.relayTimeoutTick, + wheel: wheel, localHandler: ch.relayLocal, - outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})), - inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})), + outbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "outbound"})), + inbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "inbound"})), peers: ch.rootPeers(), conn: conn, logger: conn.log, @@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio if isOriginator { items = r.outbound } - item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) }) + item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) }) items.Add(id, item) return item } @@ -600,3 +609,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration { ).Warn("Configured RelayMaxTimeout is invalid, using default instead.") return _defaultRelayMaxTimeout } + +func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration { + if d > 0 { + return d + } + if d == 0 { + return _defaultRelayTimeoutTick + } + logger.WithFields( + LogField{"configuredTimeoutTick", d}, + LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick}, + ).Warn("Configured RelayTimeoutTick is invalid, using default instead.") + return _defaultRelayTimeoutTick +} diff --git a/relay_test.go b/relay_test.go index 0d48db9b..602c982f 100644 --- a/relay_test.go +++ b/relay_test.go @@ -565,6 +565,12 @@ func TestRelayRejectsDuringClose(t *testing.T) { require.Error(t, err, "Expect call to fail after relay is shutdown") assert.Contains(t, err.Error(), "incoming connection is not active") close(block) + wg.Wait() + // FIXME: + // We fire all pending timers when we shut down the relay, which + // includes the timeout for the blocked handler. That unblocks the + // handler before we get to the call above, which obviates the point of + // this test. // We have a successful call that ran in the goroutine // and a failed call that we just checked the error on. diff --git a/testutils/channel_opts.go b/testutils/channel_opts.go index 9a2ab3cd..d5a2b356 100644 --- a/testutils/channel_opts.go +++ b/testutils/channel_opts.go @@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts { return o } +// SetRelayTimeoutTick sets the coarseness of relay timeouts. +func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts { + o.ChannelOptions.RelayTimeoutTick = d + return o +} + func defaultString(v string, defaultValue string) string { if v == "" { return defaultValue