diff --git a/channel.go b/channel.go index 752925fe..69dcf315 100644 --- a/channel.go +++ b/channel.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/uber/tchannel-go/timers" "github.com/uber/tchannel-go/tnet" "github.com/opentracing/opentracing-go" @@ -69,6 +70,10 @@ type ChannelOptions struct { // clamped to this value). Passing zero uses the default of 2m. RelayMaxTimeout time.Duration + // RelayTimeoutTick is the granularity of the timer wheel used to manage + // timeouts. + RelayTimeoutTick time.Duration + // The reporter to use for reporting stats for this channel. StatsReporter StatsReporter @@ -119,6 +124,8 @@ type Channel struct { peers *PeerList relayHost RelayHost relayMaxTimeout time.Duration + relayTimeoutTick time.Duration + relayTimeoutWheel *timers.Wheel // mutable contains all the members of Channel which are mutable. mutable struct { @@ -200,6 +207,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { connectionOptions: opts.DefaultConnectionOptions, relayHost: opts.RelayHost, relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger), + relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger), } ch.peers = newRootPeerList(ch).newChild() @@ -221,6 +229,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { if opts.RelayHost != nil { opts.RelayHost.SetChannel(ch) + ch.startWheel() } return ch, nil } @@ -693,6 +702,11 @@ func (ch *Channel) Close() { for _, c := range connections { c.Close() } + + if ch.relayTimeoutWheel != nil { + ch.relayTimeoutWheel.Stop() + } + removeClosedChannel(ch) } @@ -701,6 +715,10 @@ func (ch *Channel) RelayHost() RelayHost { return ch.relayHost } +func (ch *Channel) startWheel() { + ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout) +} + func toStringSet(ss []string) map[string]struct{} { set := make(map[string]struct{}, len(ss)) for _, s := range ss { diff --git a/introspection.go b/introspection.go index 68377df3..c953b9af 100644 --- a/introspection.go +++ b/introspection.go @@ -153,6 +153,7 @@ type RelayerRuntimeState struct { InboundItems RelayItemSetState `json:"inboundItems"` OutboundItems RelayItemSetState `json:"outboundItems"` MaxTimeout time.Duration `json:"maxTimeout"` + TimeoutTick time.Duration `json:"timeoutTick"` } // ExchangeSetRuntimeState is the runtime state for a message exchange set. @@ -355,6 +356,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat InboundItems: r.inbound.IntrospectState(opts, "inbound"), OutboundItems: r.outbound.IntrospectState(opts, "outbound"), MaxTimeout: r.maxTimeout, + TimeoutTick: r.timeoutTick, } } diff --git a/relay.go b/relay.go index c366c2ea..8cb3eecd 100644 --- a/relay.go +++ b/relay.go @@ -28,6 +28,7 @@ import ( "time" "github.com/uber/tchannel-go/relay" + "github.com/uber/tchannel-go/timers" "github.com/uber-go/atomic" ) @@ -40,6 +41,9 @@ const ( _relayTombTTL = 3 * time.Second // _defaultRelayMaxTimeout is the default max TTL for relayed calls. _defaultRelayMaxTimeout = 2 * time.Minute + // _defaultRelayTimeoutTick is the default tick duration for processing + // relay timeouts. + _defaultRelayTimeoutTick = 5 * time.Millisecond ) var ( @@ -53,7 +57,7 @@ var ( type relayConn Connection type relayItem struct { - *time.Timer + *timers.Timer remapID uint32 tomb bool @@ -67,14 +71,16 @@ type relayItems struct { sync.RWMutex logger Logger + wheel *timers.Wheel tombs uint64 items map[uint32]relayItem } -func newRelayItems(logger Logger) *relayItems { +func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems { return &relayItems{ items: make(map[uint32]relayItem), logger: logger, + wheel: wheel, } } @@ -152,7 +158,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo // TODO: We should be clearing these out in batches, rather than creating // individual timers for each item. - time.AfterFunc(deleteAfter, func() { r.Delete(id) }) + r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) }) return item, true } @@ -165,8 +171,10 @@ const ( // A Relayer forwards frames. type Relayer struct { - relayHost RelayHost - maxTimeout time.Duration + relayHost RelayHost + maxTimeout time.Duration + timeoutTick time.Duration + wheel *timers.Wheel // localHandlers is the set of service names that are handled by the local // channel. @@ -190,12 +198,15 @@ type Relayer struct { // NewRelayer constructs a Relayer. func NewRelayer(ch *Channel, conn *Connection) *Relayer { + wheel := ch.relayTimeoutWheel return &Relayer{ relayHost: ch.RelayHost(), maxTimeout: ch.relayMaxTimeout, + timeoutTick: ch.relayTimeoutTick, + wheel: wheel, localHandler: ch.relayLocal, - outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})), - inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})), + outbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "outbound"})), + inbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "inbound"})), peers: ch.rootPeers(), conn: conn, logger: conn.log, @@ -455,7 +466,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio if isOriginator { items = r.outbound } - item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) }) + item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) }) items.Add(id, item) return item } @@ -600,3 +611,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration { ).Warn("Configured RelayMaxTimeout is invalid, using default instead.") return _defaultRelayMaxTimeout } + +func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration { + if d > 0 { + return d + } + if d == 0 { + return _defaultRelayTimeoutTick + } + logger.WithFields( + LogField{"configuredTimeoutTick", d}, + LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick}, + ).Warn("Configured RelayTimeoutTick is invalid, using default instead.") + return _defaultRelayTimeoutTick +} diff --git a/testutils/channel_opts.go b/testutils/channel_opts.go index 9a2ab3cd..d5a2b356 100644 --- a/testutils/channel_opts.go +++ b/testutils/channel_opts.go @@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts { return o } +// SetRelayTimeoutTick sets the coarseness of relay timeouts. +func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts { + o.ChannelOptions.RelayTimeoutTick = d + return o +} + func defaultString(v string, defaultValue string) string { if v == "" { return defaultValue