Skip to content

Commit

Permalink
Integrate timer wheel into TChannel package
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay Shah committed Dec 15, 2016
1 parent 72a464d commit eb30dec
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 21 deletions.
18 changes: 12 additions & 6 deletions all_channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,41 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tchannel
package tchannel_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/testutils"
)

func TestAllChannelsRegistered(t *testing.T) {
introspectOpts := &IntrospectionOptions{IncludeOtherChannels: true}
introspectOpts := &tchannel.IntrospectionOptions{IncludeOtherChannels: true}

ch1_1, err := NewChannel("ch1", nil)
ch1_1, err := tchannel.NewChannel("ch1", nil)
require.NoError(t, err, "Channel create failed")
ch1_2, err := NewChannel("ch1", nil)
ch1_2, err := tchannel.NewChannel("ch1", nil)
require.NoError(t, err, "Channel create failed")
ch2_1, err := NewChannel("ch2", nil)
ch2_1, err := tchannel.NewChannel("ch2", nil)
require.NoError(t, err, "Channel create failed")

state := ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 1, len(state.OtherChannels["ch1"]))
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))

ch1_2.Close()
time.Sleep(testutils.Timeout(10 * time.Millisecond))

state = ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))

ch2_2, err := NewChannel("ch2", nil)
ch2_2, err := tchannel.NewChannel("ch2", nil)

state = ch1_1.IntrospectState(introspectOpts)
require.NoError(t, err, "Channel create failed")
Expand All @@ -57,6 +62,7 @@ func TestAllChannelsRegistered(t *testing.T) {
ch1_1.Close()
ch2_1.Close()
ch2_2.Close()
time.Sleep(testutils.Timeout(10 * time.Millisecond))

state = ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
Expand Down
22 changes: 19 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/uber/tchannel-go/timers"
"github.com/uber/tchannel-go/tnet"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -70,6 +71,10 @@ type ChannelOptions struct {
// clamped to this value). Passing zero uses the default of 2m.
RelayMaxTimeout time.Duration

// RelayTimeoutTick is the granularity of the timer wheel used to manage
// timeouts.
RelayTimeoutTick time.Duration

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -121,6 +126,8 @@ type Channel struct {
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
relayTimeoutTick time.Duration
relayTimeoutWheel *timers.Wheel

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -209,6 +216,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
connectionOptions: opts.DefaultConnectionOptions,
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger),
}
ch.peers = newRootPeerList(ch).newChild()

Expand All @@ -230,6 +238,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {

if opts.RelayHost != nil {
opts.RelayHost.SetChannel(ch)
ch.startWheel()
}
return ch, nil
}
Expand Down Expand Up @@ -665,13 +674,16 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
chState, minState)

if updatedToState == ChannelClosed {
ch.onClosed()
go ch.onClosed()
}
}

func (ch *Channel) onClosed() {
removeClosedChannel(ch)
ch.log.Infof("Channel closed.")
if ch.relayTimeoutWheel != nil {
ch.relayTimeoutWheel.Stop()
}
ch.log.Info("Channel closed.")
}

// Closed returns whether this channel has been closed with .Close()
Expand Down Expand Up @@ -717,7 +729,7 @@ func (ch *Channel) Close() {
}

if channelClosed {
ch.onClosed()
go ch.onClosed()
}
}

Expand All @@ -726,6 +738,10 @@ func (ch *Channel) RelayHost() RelayHost {
return ch.relayHost
}

func (ch *Channel) startWheel() {
ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout)
}

func toStringSet(ss []string) map[string]struct{} {
set := make(map[string]struct{}, len(ss))
for _, s := range ss {
Expand Down
4 changes: 2 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func TestTimeout(t *testing.T) {
testHandler := onErrorTestHandler{newTestHandler(t), onError}
ts.Register(raw.Wrap(testHandler), "block")

ctx, cancel := NewContext(testutils.Timeout(15 * time.Millisecond))
ctx, cancel := NewContext(testutils.Timeout(25 * time.Millisecond))
defer cancel()

_, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "block", []byte("Arg2"), []byte("Arg3"))
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestReadTimeout(t *testing.T) {
func TestWriteTimeout(t *testing.T) {
testutils.WithTestServer(t, nil, func(ts *testutils.TestServer) {
ch := ts.Server()
ctx, cancel := NewContext(testutils.Timeout(15 * time.Millisecond))
ctx, cancel := NewContext(testutils.Timeout(25 * time.Millisecond))
defer cancel()

call, err := ch.BeginCall(ctx, ts.HostPort(), ch.ServiceName(), "call", nil)
Expand Down
2 changes: 2 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type RelayerRuntimeState struct {
InboundItems RelayItemSetState `json:"inboundItems"`
OutboundItems RelayItemSetState `json:"outboundItems"`
MaxTimeout time.Duration `json:"maxTimeout"`
TimeoutTick time.Duration `json:"timeoutTick"`
}

// ExchangeSetRuntimeState is the runtime state for a message exchange set.
Expand Down Expand Up @@ -360,6 +361,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat
InboundItems: r.inbound.IntrospectState(opts, "inbound"),
OutboundItems: r.outbound.IntrospectState(opts, "outbound"),
MaxTimeout: r.maxTimeout,
TimeoutTick: r.timeoutTick,
}
}

Expand Down
43 changes: 33 additions & 10 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/tchannel-go/relay"
"github.com/uber/tchannel-go/timers"

"github.com/uber-go/atomic"
)
Expand All @@ -40,6 +41,9 @@ const (
_relayTombTTL = 3 * time.Second
// _defaultRelayMaxTimeout is the default max TTL for relayed calls.
_defaultRelayMaxTimeout = 2 * time.Minute
// _defaultRelayTimeoutTick is the default tick duration for processing
// relay timeouts.
_defaultRelayTimeoutTick = 5 * time.Millisecond
)

var (
Expand All @@ -53,7 +57,7 @@ var (
type relayConn Connection

type relayItem struct {
*time.Timer
*timers.Timer

remapID uint32
tomb bool
Expand All @@ -67,14 +71,16 @@ type relayItems struct {
sync.RWMutex

logger Logger
wheel *timers.Wheel
tombs uint64
items map[uint32]relayItem
}

func newRelayItems(logger Logger) *relayItems {
func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems {
return &relayItems{
items: make(map[uint32]relayItem),
logger: logger,
wheel: wheel,
}
}

Expand Down Expand Up @@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo
r.items[id] = item
r.Unlock()

// TODO: We should be clearing these out in batches, rather than creating
// individual timers for each item.
time.AfterFunc(deleteAfter, func() { r.Delete(id) })
r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) })
return item, true
}

Expand All @@ -165,8 +169,10 @@ const (

// A Relayer forwards frames.
type Relayer struct {
relayHost RelayHost
maxTimeout time.Duration
relayHost RelayHost
maxTimeout time.Duration
timeoutTick time.Duration
wheel *timers.Wheel

// localHandlers is the set of service names that are handled by the local
// channel.
Expand All @@ -190,12 +196,15 @@ type Relayer struct {

// NewRelayer constructs a Relayer.
func NewRelayer(ch *Channel, conn *Connection) *Relayer {
wheel := ch.relayTimeoutWheel
return &Relayer{
relayHost: ch.RelayHost(),
maxTimeout: ch.relayMaxTimeout,
timeoutTick: ch.relayTimeoutTick,
wheel: wheel,
localHandler: ch.relayLocal,
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
outbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "inbound"})),
peers: ch.rootPeers(),
conn: conn,
logger: conn.log,
Expand Down Expand Up @@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio
if isOriginator {
items = r.outbound
}
item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
items.Add(id, item)
return item
}
Expand Down Expand Up @@ -600,3 +609,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration {
).Warn("Configured RelayMaxTimeout is invalid, using default instead.")
return _defaultRelayMaxTimeout
}

func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration {
if d > 0 {
return d
}
if d == 0 {
return _defaultRelayTimeoutTick
}
logger.WithFields(
LogField{"configuredTimeoutTick", d},
LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick},
).Warn("Configured RelayTimeoutTick is invalid, using default instead.")
return _defaultRelayTimeoutTick
}
6 changes: 6 additions & 0 deletions relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,12 @@ func TestRelayRejectsDuringClose(t *testing.T) {
require.Error(t, err, "Expect call to fail after relay is shutdown")
assert.Contains(t, err.Error(), "incoming connection is not active")
close(block)
wg.Wait()
// FIXME:
// We fire all pending timers when we shut down the relay, which
// includes the timeout for the blocked handler. That unblocks the
// handler before we get to the call above, which obviates the point of
// this test.

// We have a successful call that ran in the goroutine
// and a failed call that we just checked the error on.
Expand Down
6 changes: 6 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
return o
}

// SetRelayTimeoutTick sets the coarseness of relay timeouts.
func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts {
o.ChannelOptions.RelayTimeoutTick = d
return o
}

func defaultString(v string, defaultValue string) string {
if v == "" {
return defaultValue
Expand Down

0 comments on commit eb30dec

Please sign in to comment.