Skip to content

Commit

Permalink
Add hook for peer connection status changes (#624)
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal authored and prashantv committed May 31, 2017
1 parent cf1a624 commit d096b5a
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 22 deletions.
25 changes: 14 additions & 11 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ type ChannelOptions struct {
// The name of the process, for logging and reporting to peers
ProcessName string

// OnPeerStatusChanged
// OnPeerStatusChanged is an optional callback that receives a notification
// whenever the channel establishes a usable connection to a peer, or loses
// a connection to a peer.
OnPeerStatusChanged func(*Peer)

// The logger to use for this channel
Expand Down Expand Up @@ -123,14 +125,15 @@ const (
type Channel struct {
channelConnectionCommon

chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
handler Handler
chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
handler Handler
onPeerStatusChanged func(*Peer)

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -220,7 +223,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
}
ch.peers = newRootPeerList(ch).newChild()
ch.peers = newRootPeerList(ch, opts.OnPeerStatusChanged).newChild()

if opts.Handler != nil {
ch.handler = opts.Handler
Expand Down Expand Up @@ -740,7 +743,7 @@ func (ch *Channel) State() ChannelState {
// Close starts a graceful Close for the channel. This does not happen immediately:
// 1. This call closes the Listener and starts closing connections.
// 2. When all incoming connections are drained, the connection blocks new outgoing calls.
// 3. When all connections are drainged, the channel's state is updated to Closed.
// 3. When all connections are drained, the channel's state is updated to Closed.
func (ch *Channel) Close() {
ch.Logger().Info("Channel.Close called.")
var connections []*Connection
Expand Down
102 changes: 102 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,3 +868,105 @@ func TestTosPriority(t *testing.T) {
require.NoError(t, err, "Failed to write to outbound conn")
})
}

func TestPeerStatusChangeClientReduction(t *testing.T) {
sopts := testutils.NewOpts().NoRelay()
testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) {
server := ts.Server()
testutils.RegisterEcho(server, nil)
changes := make(chan int, 2)

copts := testutils.NewOpts().SetOnPeerStatusChanged(func(p *Peer) {
i, o := p.NumConnections()
assert.Equal(t, 0, i, "no inbound connections to client")
changes <- o
})

// Induce the creation of a connection from client to server.
client := ts.NewClient(copts)
require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil))
assert.Equal(t, 1, <-changes, "event for first connection")

// Re-use
testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName())

// Induce the destruction of a connection from the server to the client.
server.Close()
assert.Equal(t, 0, <-changes, "event for second disconnection")

client.Close()
assert.Len(t, changes, 0, "unexpected peer status changes")
})
}

func TestPeerStatusChangeClient(t *testing.T) {
sopts := testutils.NewOpts().NoRelay()
testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) {
server := ts.Server()
testutils.RegisterEcho(server, nil)
changes := make(chan int, 2)

copts := testutils.NewOpts().SetOnPeerStatusChanged(func(p *Peer) {
i, o := p.NumConnections()
assert.Equal(t, 0, i, "no inbound connections to client")
changes <- o
})

// Induce the creation of a connection from client to server.
client := ts.NewClient(copts)
require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil))
assert.Equal(t, 1, <-changes, "event for first connection")

// Re-use
testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName())

// Induce the creation of a second connection from client to server.
pl := client.RootPeers()
p := pl.GetOrAdd(ts.HostPort())
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, testutils.Timeout(100*time.Millisecond))
defer cancel()
_, err := p.Connect(ctx)
require.NoError(t, err)
assert.Equal(t, 2, <-changes, "event for second connection")

// Induce the destruction of a connection from the server to the client.
server.Close()
<-changes // May be 1 or 0 depending on timing.
assert.Equal(t, 0, <-changes, "event for second disconnection")

client.Close()
assert.Len(t, changes, 0, "unexpected peer status changes")
})
}

func TestPeerStatusChangeServer(t *testing.T) {
changes := make(chan int, 10)
sopts := testutils.NewOpts().NoRelay().SetOnPeerStatusChanged(func(p *Peer) {
i, o := p.NumConnections()
assert.Equal(t, 0, o, "no outbound connections from server")
changes <- i
})
testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) {
server := ts.Server()
testutils.RegisterEcho(server, nil)

copts := testutils.NewOpts()
for i := 0; i < 5; i++ {
client := ts.NewClient(copts)

// Open
testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName())
assert.Equal(t, 1, <-changes, "one event on new connection")

// Re-use
testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName())
assert.Len(t, changes, 0, "no new events on re-used connection")

// Close
client.Close()
assert.Equal(t, 0, <-changes, "one event on lost connection")
}
})
assert.Len(t, changes, 0, "unexpected peer status changes")
}
22 changes: 17 additions & 5 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ type Peer struct {

channel Connectable
hostPort string
onStatusChanged func(*Peer)
onClosedConnRemoved func(*Peer)

// scCount is the number of subchannels that this peer is added to.
Expand All @@ -335,13 +336,17 @@ type Peer struct {
onUpdate func(*Peer)
}

func newPeer(channel Connectable, hostPort string, onClosedConnRemoved func(*Peer)) *Peer {
func newPeer(channel Connectable, hostPort string, onStatusChanged func(*Peer), onClosedConnRemoved func(*Peer)) *Peer {
if hostPort == "" {
panic("Cannot create peer with blank hostPort")
}
if onStatusChanged == nil {
onStatusChanged = noopOnStatusChanged
}
return &Peer{
channel: channel,
hostPort: hostPort,
onStatusChanged: onStatusChanged,
onClosedConnRemoved: onClosedConnRemoved,
}
}
Expand Down Expand Up @@ -461,18 +466,21 @@ func (p *Peer) canRemove() bool {
}

// addConnection adds an active connection to the peer's connection list.
// If a connection is not active, ErrInvalidConnectionState is returned.
// If a connection is not active, returns ErrInvalidConnectionState.
func (p *Peer) addConnection(c *Connection, direction connectionDirection) error {
conns := p.connectionsFor(direction)

p.Lock()
defer p.Unlock()

if c.readState() != connectionActive {
return ErrInvalidConnectionState
}

p.Lock()
*conns = append(*conns, c)
p.Unlock()

// Inform third parties that a peer gained a connection.
p.onStatusChanged(p)

return nil
}

Expand Down Expand Up @@ -517,6 +525,8 @@ func (p *Peer) connectionCloseStateChange(changed *Connection) {

if found {
p.onClosedConnRemoved(p)
// Inform third parties that a peer lost a connection.
p.onStatusChanged(p)
}
}

Expand Down Expand Up @@ -596,6 +606,8 @@ func (p *Peer) callOnUpdateComplete() {
}
}

func noopOnStatusChanged(*Peer) {}

// isEphemeralHostPort returns if hostPort is the default ephemeral hostPort.
func isEphemeralHostPort(hostPort string) bool {
return hostPort == "" || hostPort == ephemeralHostPort || strings.HasSuffix(hostPort, ":0")
Expand Down
14 changes: 8 additions & 6 deletions root_peer_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import "sync"
type RootPeerList struct {
sync.RWMutex

channel Connectable
peersByHostPort map[string]*Peer
channel Connectable
onPeerStatusChanged func(*Peer)
peersByHostPort map[string]*Peer
}

func newRootPeerList(ch Connectable) *RootPeerList {
func newRootPeerList(ch Connectable, onPeerStatusChanged func(*Peer)) *RootPeerList {
return &RootPeerList{
channel: ch,
peersByHostPort: make(map[string]*Peer),
channel: ch,
onPeerStatusChanged: onPeerStatusChanged,
peersByHostPort: make(map[string]*Peer),
}
}

Expand Down Expand Up @@ -65,7 +67,7 @@ func (l *RootPeerList) Add(hostPort string) *Peer {
var p *Peer
// To avoid duplicate connections, only the root list should create new
// peers. All other lists should keep refs to the root list's peers.
p = newPeer(l.channel, hostPort, l.onClosedConnRemoved)
p = newPeer(l.channel, hostPort, l.onPeerStatusChanged, l.onClosedConnRemoved)
l.peersByHostPort[hostPort] = p
return p
}
Expand Down
7 changes: 7 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
return o
}

// SetOnPeerStatusChanged sets the callback for channel status change
// noficiations.
func (o *ChannelOpts) SetOnPeerStatusChanged(f func(*tchannel.Peer)) *ChannelOpts {
o.ChannelOptions.OnPeerStatusChanged = f
return o
}

func defaultString(v string, defaultValue string) string {
if v == "" {
return defaultValue
Expand Down

0 comments on commit d096b5a

Please sign in to comment.