From d096b5a16b1ee118ef87c1049dd824d459a60405 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Wed, 31 May 2017 11:34:21 -0700 Subject: [PATCH] Add hook for peer connection status changes (#624) --- channel.go | 25 ++++++---- connection_test.go | 102 ++++++++++++++++++++++++++++++++++++++ peer.go | 22 ++++++-- root_peer_list.go | 14 +++--- testutils/channel_opts.go | 7 +++ 5 files changed, 148 insertions(+), 22 deletions(-) diff --git a/channel.go b/channel.go index 66822470..6c2746ed 100644 --- a/channel.go +++ b/channel.go @@ -57,7 +57,9 @@ type ChannelOptions struct { // The name of the process, for logging and reporting to peers ProcessName string - // OnPeerStatusChanged + // OnPeerStatusChanged is an optional callback that receives a notification + // whenever the channel establishes a usable connection to a peer, or loses + // a connection to a peer. OnPeerStatusChanged func(*Peer) // The logger to use for this channel @@ -123,14 +125,15 @@ const ( type Channel struct { channelConnectionCommon - chID uint32 - createdStack string - commonStatsTags map[string]string - connectionOptions ConnectionOptions - peers *PeerList - relayHost RelayHost - relayMaxTimeout time.Duration - handler Handler + chID uint32 + createdStack string + commonStatsTags map[string]string + connectionOptions ConnectionOptions + peers *PeerList + relayHost RelayHost + relayMaxTimeout time.Duration + handler Handler + onPeerStatusChanged func(*Peer) // mutable contains all the members of Channel which are mutable. mutable struct { @@ -220,7 +223,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { relayHost: opts.RelayHost, relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger), } - ch.peers = newRootPeerList(ch).newChild() + ch.peers = newRootPeerList(ch, opts.OnPeerStatusChanged).newChild() if opts.Handler != nil { ch.handler = opts.Handler @@ -740,7 +743,7 @@ func (ch *Channel) State() ChannelState { // Close starts a graceful Close for the channel. This does not happen immediately: // 1. This call closes the Listener and starts closing connections. // 2. When all incoming connections are drained, the connection blocks new outgoing calls. -// 3. When all connections are drainged, the channel's state is updated to Closed. +// 3. When all connections are drained, the channel's state is updated to Closed. func (ch *Channel) Close() { ch.Logger().Info("Channel.Close called.") var connections []*Connection diff --git a/connection_test.go b/connection_test.go index ef69d873..d6957a1a 100644 --- a/connection_test.go +++ b/connection_test.go @@ -868,3 +868,105 @@ func TestTosPriority(t *testing.T) { require.NoError(t, err, "Failed to write to outbound conn") }) } + +func TestPeerStatusChangeClientReduction(t *testing.T) { + sopts := testutils.NewOpts().NoRelay() + testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) { + server := ts.Server() + testutils.RegisterEcho(server, nil) + changes := make(chan int, 2) + + copts := testutils.NewOpts().SetOnPeerStatusChanged(func(p *Peer) { + i, o := p.NumConnections() + assert.Equal(t, 0, i, "no inbound connections to client") + changes <- o + }) + + // Induce the creation of a connection from client to server. + client := ts.NewClient(copts) + require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)) + assert.Equal(t, 1, <-changes, "event for first connection") + + // Re-use + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + + // Induce the destruction of a connection from the server to the client. + server.Close() + assert.Equal(t, 0, <-changes, "event for second disconnection") + + client.Close() + assert.Len(t, changes, 0, "unexpected peer status changes") + }) +} + +func TestPeerStatusChangeClient(t *testing.T) { + sopts := testutils.NewOpts().NoRelay() + testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) { + server := ts.Server() + testutils.RegisterEcho(server, nil) + changes := make(chan int, 2) + + copts := testutils.NewOpts().SetOnPeerStatusChanged(func(p *Peer) { + i, o := p.NumConnections() + assert.Equal(t, 0, i, "no inbound connections to client") + changes <- o + }) + + // Induce the creation of a connection from client to server. + client := ts.NewClient(copts) + require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)) + assert.Equal(t, 1, <-changes, "event for first connection") + + // Re-use + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + + // Induce the creation of a second connection from client to server. + pl := client.RootPeers() + p := pl.GetOrAdd(ts.HostPort()) + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, testutils.Timeout(100*time.Millisecond)) + defer cancel() + _, err := p.Connect(ctx) + require.NoError(t, err) + assert.Equal(t, 2, <-changes, "event for second connection") + + // Induce the destruction of a connection from the server to the client. + server.Close() + <-changes // May be 1 or 0 depending on timing. + assert.Equal(t, 0, <-changes, "event for second disconnection") + + client.Close() + assert.Len(t, changes, 0, "unexpected peer status changes") + }) +} + +func TestPeerStatusChangeServer(t *testing.T) { + changes := make(chan int, 10) + sopts := testutils.NewOpts().NoRelay().SetOnPeerStatusChanged(func(p *Peer) { + i, o := p.NumConnections() + assert.Equal(t, 0, o, "no outbound connections from server") + changes <- i + }) + testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) { + server := ts.Server() + testutils.RegisterEcho(server, nil) + + copts := testutils.NewOpts() + for i := 0; i < 5; i++ { + client := ts.NewClient(copts) + + // Open + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + assert.Equal(t, 1, <-changes, "one event on new connection") + + // Re-use + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + assert.Len(t, changes, 0, "no new events on re-used connection") + + // Close + client.Close() + assert.Equal(t, 0, <-changes, "one event on lost connection") + } + }) + assert.Len(t, changes, 0, "unexpected peer status changes") +} diff --git a/peer.go b/peer.go index 28178286..47ef6735 100644 --- a/peer.go +++ b/peer.go @@ -320,6 +320,7 @@ type Peer struct { channel Connectable hostPort string + onStatusChanged func(*Peer) onClosedConnRemoved func(*Peer) // scCount is the number of subchannels that this peer is added to. @@ -335,13 +336,17 @@ type Peer struct { onUpdate func(*Peer) } -func newPeer(channel Connectable, hostPort string, onClosedConnRemoved func(*Peer)) *Peer { +func newPeer(channel Connectable, hostPort string, onStatusChanged func(*Peer), onClosedConnRemoved func(*Peer)) *Peer { if hostPort == "" { panic("Cannot create peer with blank hostPort") } + if onStatusChanged == nil { + onStatusChanged = noopOnStatusChanged + } return &Peer{ channel: channel, hostPort: hostPort, + onStatusChanged: onStatusChanged, onClosedConnRemoved: onClosedConnRemoved, } } @@ -461,18 +466,21 @@ func (p *Peer) canRemove() bool { } // addConnection adds an active connection to the peer's connection list. -// If a connection is not active, ErrInvalidConnectionState is returned. +// If a connection is not active, returns ErrInvalidConnectionState. func (p *Peer) addConnection(c *Connection, direction connectionDirection) error { conns := p.connectionsFor(direction) - p.Lock() - defer p.Unlock() - if c.readState() != connectionActive { return ErrInvalidConnectionState } + p.Lock() *conns = append(*conns, c) + p.Unlock() + + // Inform third parties that a peer gained a connection. + p.onStatusChanged(p) + return nil } @@ -517,6 +525,8 @@ func (p *Peer) connectionCloseStateChange(changed *Connection) { if found { p.onClosedConnRemoved(p) + // Inform third parties that a peer lost a connection. + p.onStatusChanged(p) } } @@ -596,6 +606,8 @@ func (p *Peer) callOnUpdateComplete() { } } +func noopOnStatusChanged(*Peer) {} + // isEphemeralHostPort returns if hostPort is the default ephemeral hostPort. func isEphemeralHostPort(hostPort string) bool { return hostPort == "" || hostPort == ephemeralHostPort || strings.HasSuffix(hostPort, ":0") diff --git a/root_peer_list.go b/root_peer_list.go index ba62a6d9..127160fd 100644 --- a/root_peer_list.go +++ b/root_peer_list.go @@ -27,14 +27,16 @@ import "sync" type RootPeerList struct { sync.RWMutex - channel Connectable - peersByHostPort map[string]*Peer + channel Connectable + onPeerStatusChanged func(*Peer) + peersByHostPort map[string]*Peer } -func newRootPeerList(ch Connectable) *RootPeerList { +func newRootPeerList(ch Connectable, onPeerStatusChanged func(*Peer)) *RootPeerList { return &RootPeerList{ - channel: ch, - peersByHostPort: make(map[string]*Peer), + channel: ch, + onPeerStatusChanged: onPeerStatusChanged, + peersByHostPort: make(map[string]*Peer), } } @@ -65,7 +67,7 @@ func (l *RootPeerList) Add(hostPort string) *Peer { var p *Peer // To avoid duplicate connections, only the root list should create new // peers. All other lists should keep refs to the root list's peers. - p = newPeer(l.channel, hostPort, l.onClosedConnRemoved) + p = newPeer(l.channel, hostPort, l.onPeerStatusChanged, l.onClosedConnRemoved) l.peersByHostPort[hostPort] = p return p } diff --git a/testutils/channel_opts.go b/testutils/channel_opts.go index 4dd83f8c..b40874f9 100644 --- a/testutils/channel_opts.go +++ b/testutils/channel_opts.go @@ -200,6 +200,13 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts { return o } +// SetOnPeerStatusChanged sets the callback for channel status change +// noficiations. +func (o *ChannelOpts) SetOnPeerStatusChanged(f func(*tchannel.Peer)) *ChannelOpts { + o.ChannelOptions.OnPeerStatusChanged = f + return o +} + func defaultString(v string, defaultValue string) string { if v == "" { return defaultValue