Skip to content

Commit

Permalink
Integrate timer wheel with relay
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay Shah committed Dec 19, 2016
1 parent 7ec1217 commit 9d8f788
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 14 deletions.
10 changes: 9 additions & 1 deletion all_channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tchannel
package tchannel_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

. "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/testutils"
)

func TestAllChannelsRegistered(t *testing.T) {
Expand All @@ -42,6 +46,8 @@ func TestAllChannelsRegistered(t *testing.T) {
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))

ch1_2.Close()
// TODO: replace this sleep with a callback hook.
time.Sleep(testutils.Timeout(10 * time.Millisecond))

state = ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
Expand All @@ -57,6 +63,8 @@ func TestAllChannelsRegistered(t *testing.T) {
ch1_1.Close()
ch2_1.Close()
ch2_2.Close()
// TODO: replace this sleep with a callback hook.
time.Sleep(testutils.Timeout(10 * time.Millisecond))

state = ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
Expand Down
23 changes: 20 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/uber/tchannel-go/timers"
"github.com/uber/tchannel-go/tnet"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -70,6 +71,10 @@ type ChannelOptions struct {
// clamped to this value). Passing zero uses the default of 2m.
RelayMaxTimeout time.Duration

// RelayTimeoutTick is the granularity of the timer wheel used to manage
// timeouts.
RelayTimeoutTick time.Duration

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -121,6 +126,8 @@ type Channel struct {
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
relayTimeoutTick time.Duration
relayTimeoutWheel *timers.Wheel

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -209,6 +216,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
connectionOptions: opts.DefaultConnectionOptions,
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger),
}
ch.peers = newRootPeerList(ch).newChild()

Expand All @@ -230,6 +238,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {

if opts.RelayHost != nil {
opts.RelayHost.SetChannel(ch)
ch.startWheel()
}
return ch, nil
}
Expand Down Expand Up @@ -665,13 +674,17 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
chState, minState)

if updatedToState == ChannelClosed {
ch.onClosed()
go ch.onClosed()
}
}

func (ch *Channel) onClosed() {
removeClosedChannel(ch)
ch.log.Infof("Channel closed.")
if ch.relayTimeoutWheel != nil {
ch.relayTimeoutWheel.Stop()
ch.log.Info("Timer wheel stopped.")
}
ch.log.Info("Channel closed.")
}

// Closed returns whether this channel has been closed with .Close()
Expand Down Expand Up @@ -717,7 +730,7 @@ func (ch *Channel) Close() {
}

if channelClosed {
ch.onClosed()
go ch.onClosed()
}
}

Expand All @@ -726,6 +739,10 @@ func (ch *Channel) RelayHost() RelayHost {
return ch.relayHost
}

func (ch *Channel) startWheel() {
ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout)
}

func toStringSet(ss []string) map[string]struct{} {
set := make(map[string]struct{}, len(ss))
for _, s := range ss {
Expand Down
2 changes: 2 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type RelayerRuntimeState struct {
InboundItems RelayItemSetState `json:"inboundItems"`
OutboundItems RelayItemSetState `json:"outboundItems"`
MaxTimeout time.Duration `json:"maxTimeout"`
TimeoutTick time.Duration `json:"timeoutTick"`
}

// ExchangeSetRuntimeState is the runtime state for a message exchange set.
Expand Down Expand Up @@ -360,6 +361,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat
InboundItems: r.inbound.IntrospectState(opts, "inbound"),
OutboundItems: r.outbound.IntrospectState(opts, "outbound"),
MaxTimeout: r.maxTimeout,
TimeoutTick: r.timeoutTick,
}
}

Expand Down
43 changes: 33 additions & 10 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/tchannel-go/relay"
"github.com/uber/tchannel-go/timers"

"github.com/uber-go/atomic"
)
Expand All @@ -40,6 +41,9 @@ const (
_relayTombTTL = 3 * time.Second
// _defaultRelayMaxTimeout is the default max TTL for relayed calls.
_defaultRelayMaxTimeout = 2 * time.Minute
// _defaultRelayTimeoutTick is the default tick duration for processing
// relay timeouts.
_defaultRelayTimeoutTick = 5 * time.Millisecond
)

var (
Expand All @@ -53,7 +57,7 @@ var (
type relayConn Connection

type relayItem struct {
*time.Timer
*timers.Timer

remapID uint32
tomb bool
Expand All @@ -67,14 +71,16 @@ type relayItems struct {
sync.RWMutex

logger Logger
wheel *timers.Wheel
tombs uint64
items map[uint32]relayItem
}

func newRelayItems(logger Logger) *relayItems {
func newRelayItems(logger Logger, wheel *timers.Wheel) *relayItems {
return &relayItems{
items: make(map[uint32]relayItem),
logger: logger,
wheel: wheel,
}
}

Expand Down Expand Up @@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo
r.items[id] = item
r.Unlock()

// TODO: We should be clearing these out in batches, rather than creating
// individual timers for each item.
time.AfterFunc(deleteAfter, func() { r.Delete(id) })
r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) })
return item, true
}

Expand All @@ -165,8 +169,10 @@ const (

// A Relayer forwards frames.
type Relayer struct {
relayHost RelayHost
maxTimeout time.Duration
relayHost RelayHost
maxTimeout time.Duration
timeoutTick time.Duration
wheel *timers.Wheel

// localHandlers is the set of service names that are handled by the local
// channel.
Expand All @@ -190,12 +196,15 @@ type Relayer struct {

// NewRelayer constructs a Relayer.
func NewRelayer(ch *Channel, conn *Connection) *Relayer {
wheel := ch.relayTimeoutWheel
return &Relayer{
relayHost: ch.RelayHost(),
maxTimeout: ch.relayMaxTimeout,
timeoutTick: ch.relayTimeoutTick,
wheel: wheel,
localHandler: ch.relayLocal,
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"}), wheel),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"}), wheel),
peers: ch.rootPeers(),
conn: conn,
logger: conn.log,
Expand Down Expand Up @@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio
if isOriginator {
items = r.outbound
}
item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
items.Add(id, item)
return item
}
Expand Down Expand Up @@ -596,3 +605,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration {
).Warn("Configured RelayMaxTimeout is invalid, using default instead.")
return _defaultRelayMaxTimeout
}

func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration {
if d > 0 {
return d
}
if d == 0 {
return _defaultRelayTimeoutTick
}
logger.WithFields(
LogField{"configuredTimeoutTick", d},
LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick},
).Warn("Configured RelayTimeoutTick is invalid, using default instead.")
return _defaultRelayTimeoutTick
}
6 changes: 6 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
return o
}

// SetRelayTimeoutTick sets the coarseness of relay timeouts.
func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts {
o.ChannelOptions.RelayTimeoutTick = d
return o
}

func defaultString(v string, defaultValue string) string {
if v == "" {
return defaultValue
Expand Down
14 changes: 14 additions & 0 deletions testutils/test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"runtime"
"runtime/debug"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -121,6 +122,7 @@ func WithTestServer(t testing.TB, chanOpts *ChannelOpts, f func(*TestServer)) {
withServer(t, chanOpts.Copy(), f)
}
}
forceReleaseUnusedMemory()
}

// SetVerifyOpts specifies the options we'll use during teardown to verify that
Expand Down Expand Up @@ -446,3 +448,15 @@ func withServer(t testing.TB, chanOpts *ChannelOpts, f func(*TestServer)) {
ts.Server().Logger().Debugf("TEST: Test function complete")
ts.CloseAndVerify()
}

// The timer wheels we allocate on relay channels are large enough that the Go
// runtime doesn't release their memory back to the OS immediately. Since we
// create and destroy them so quickly during tests, we quickly exceed the
// memory Travis allows each container to consume, which leads to mysterious
// test errors.
//
// Work around this issue by forcing a GC and free.
func forceReleaseUnusedMemory() {
runtime.GC()
debug.FreeOSMemory()
}

0 comments on commit 9d8f788

Please sign in to comment.