Skip to content

Commit

Permalink
Clean up Channel lifecycle, add ID per channel (#548)
Browse files Browse the repository at this point in the history
Add a unique channel ID, and have an onClosed method for when a channel
is actually closed. This is useful for:
 - Stopping related objects that are no longer needed on shutdown
 - Raising an event when a channel is closed
  • Loading branch information
prashantv authored Dec 10, 2016
1 parent 36b3d10 commit 271fb02
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
31 changes: 28 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/tchannel-go/tnet"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/atomic"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -113,6 +114,7 @@ const (
type Channel struct {
channelConnectionCommon

chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
Expand Down Expand Up @@ -141,6 +143,9 @@ type channelConnectionCommon struct {
timeNow func() time.Time
}

// _nextChID is used to allocate unique IDs to every channel for debugging purposes.
var _nextChID atomic.Uint32

// Tracer returns the OpenTracing Tracer for this channel. If no tracer was provided
// in the configuration, returns opentracing.GlobalTracer(). Note that this approach
// allows opentracing.GlobalTracer() to be initialized _after_ the channel is created.
Expand Down Expand Up @@ -187,16 +192,20 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
timeNow = time.Now
}

chID := _nextChID.Inc()
ch := &Channel{
channelConnectionCommon: channelConnectionCommon{
log: logger,
log: logger.WithFields(
LogField{"chID", chID},
LogField{"service", serviceName},
LogField{"process", processName}),
relayLocal: toStringSet(opts.RelayLocalHandlers),
statsReporter: statsReporter,
subChannels: &subChannelMap{},
timeNow: timeNow,
tracer: opts.Tracer,
},

chID: chID,
connectionOptions: opts.DefaultConnectionOptions,
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
Expand Down Expand Up @@ -639,19 +648,30 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
updateTo = ChannelInboundClosed
}

var updatedToState ChannelState
if updateTo > 0 {
ch.mutable.Lock()
// Recheck the state as it's possible another goroutine changed the state
// from what we expected, and so we might make a stale change.
if ch.mutable.state == chState {
ch.mutable.state = updateTo
updatedToState = updateTo
}
ch.mutable.Unlock()
chState = updateTo
}

c.log.Debugf("ConnectionCloseStateChange channel state = %v connection minState = %v",
chState, minState)

if updatedToState == ChannelClosed {
ch.onClosed()
}
}

func (ch *Channel) onClosed() {
removeClosedChannel(ch)
ch.log.Infof("Channel closed.")
}

// Closed returns whether this channel has been closed with .Close()
Expand All @@ -675,6 +695,7 @@ func (ch *Channel) State() ChannelState {
func (ch *Channel) Close() {
ch.Logger().Info("Channel.Close called.")
var connections []*Connection
var channelClosed bool
ch.mutable.Lock()

if ch.mutable.l != nil {
Expand All @@ -684,6 +705,7 @@ func (ch *Channel) Close() {
ch.mutable.state = ChannelStartClose
if len(ch.mutable.conns) == 0 {
ch.mutable.state = ChannelClosed
channelClosed = true
}
for _, c := range ch.mutable.conns {
connections = append(connections, c)
Expand All @@ -693,7 +715,10 @@ func (ch *Channel) Close() {
for _, c := range connections {
c.Close()
}
removeClosedChannel(ch)

if channelClosed {
ch.onClosed()
}
}

// RelayHost returns the channel's RelayHost, if any.
Expand Down
6 changes: 3 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ type Connection struct {
}
}

// nextConnID gives an ID for each connection for debugging purposes.
var nextConnID atomic.Uint32
// _nextConnID is used to allocate unique IDs to every connection for debugging purposes.
var _nextConnID atomic.Uint32

type connectionState int

Expand Down Expand Up @@ -290,7 +290,7 @@ func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState
framePool = DefaultFramePool
}

connID := nextConnID.Inc()
connID := _nextConnID.Inc()
log := ch.log.WithFields(LogFields{
{"connID", connID},
{"localAddr", conn.LocalAddr()},
Expand Down
5 changes: 5 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type RuntimeVersion struct {

// RuntimeState is a snapshot of the runtime state for a channel.
type RuntimeState struct {
ID uint32 `json:"id"`

// CreatedStack is the stack for how this channel was created.
CreatedStack string `json:"createdStack"`

Expand Down Expand Up @@ -91,6 +93,7 @@ type GoRuntimeStateOptions struct {

// ChannelInfo is the state of other channels in the same process.
type ChannelInfo struct {
ID uint32 `json:"id"`
CreatedStack string `json:"createdStack"`
LocalPeer LocalPeerInfo `json:"localPeer"`
}
Expand Down Expand Up @@ -209,6 +212,7 @@ func (ch *Channel) IntrospectState(opts *IntrospectionOptions) *RuntimeState {
ch.mutable.RUnlock()

return &RuntimeState{
ID: ch.chID,
CreatedStack: ch.createdStack,
LocalPeer: ch.PeerInfo(),
SubChannels: ch.subChannels.IntrospectState(opts),
Expand Down Expand Up @@ -248,6 +252,7 @@ func (ch *Channel) IntrospectOthers(opts *IntrospectionOptions) map[string][]Cha
// ReportInfo returns ChannelInfo for a channel.
func (ch *Channel) ReportInfo(opts *IntrospectionOptions) ChannelInfo {
return ChannelInfo{
ID: ch.chID,
CreatedStack: ch.createdStack,
LocalPeer: ch.PeerInfo(),
}
Expand Down

0 comments on commit 271fb02

Please sign in to comment.