Skip to content

Commit

Permalink
Merge pull request #612 from uber/dev
Browse files Browse the repository at this point in the history
Release version 1.5.0
  • Loading branch information
prashantv authored Mar 21, 2017
2 parents 2feb388 + 2a1bef0 commit 0b7f160
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 32 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

# v1.5.0

* Add `PeerList.Len` to expose the number of peers in the peer list.
* Add `PeerList.GetNew` to only return previously unselected peers.

# v1.4.0

* Add version information to the channel's LocalPeerInfo.
Expand Down
2 changes: 1 addition & 1 deletion benchmark/real_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type fixedHosts struct {
pickI atomic.Int32
}

func (fh *fixedHosts) Get(cf relay.CallFrame, _ relay.Conn) (string, error) {
func (fh *fixedHosts) Get(cf relay.CallFrame, _ *tchannel.Connection) (string, error) {
peers := fh.hosts[string(cf.Service())]
if len(peers) == 0 {
return "", errors.New("no peers")
Expand Down
35 changes: 31 additions & 4 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
// ErrPeerNotFound indicates that the specified peer was not found.
ErrPeerNotFound = errors.New("peer not found")

// ErrNoNewPeers indicates that no previously unselected peer is available.
ErrNoNewPeers = errors.New("no new peer available")

peerRng = trand.NewSeeded()
)

Expand Down Expand Up @@ -116,11 +119,12 @@ func (l *PeerList) Add(hostPort string) *Peer {
return p
}

// Get returns a peer from the peer list, or nil if none can be found.
func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) {
// GetNew returns a new, previously unselected peer from the peer list, or nil,
// if no new unselected peer can be found.
func (l *PeerList) GetNew(prevSelected map[string]struct{}) (*Peer, error) {
l.Lock()
defer l.Unlock()
if l.peerHeap.Len() == 0 {
l.Unlock()
return nil, ErrNoPeers
}

Expand All @@ -131,9 +135,25 @@ func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) {
peer = l.choosePeer(prevSelected, false /* avoidHost */)
}
if peer == nil {
return nil, ErrNoNewPeers
}
return peer, nil
}

// Get returns a peer from the peer list, or nil if none can be found,
// will avoid previously selected peers if possible.
func (l *PeerList) Get(prevSelected map[string]struct{}) (*Peer, error) {
peer, err := l.GetNew(prevSelected)
if err == ErrNoNewPeers {
l.Lock()
peer = l.choosePeer(nil, false /* avoidHost */)
l.Unlock()
} else if err != nil {
return nil, err
}
if peer == nil {
return nil, ErrNoPeers
}
l.Unlock()
return peer, nil
}

Expand Down Expand Up @@ -214,6 +234,13 @@ func (l *PeerList) Copy() map[string]*Peer {
return listCopy
}

// Len returns the length of the PeerList.
func (l *PeerList) Len() int {
l.RLock()
defer l.RUnlock()
return l.peerHeap.Len()
}

// exists checks if a hostport exists in the peer list.
func (l *PeerList) exists(hostPort string) (*peerScore, bool) {
l.RLock()
Expand Down
27 changes: 27 additions & 0 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ func TestGetPeerSinglePeer(t *testing.T) {
assert.Equal(t, "1.1.1.1:1234", peer.HostPort(), "returned peer mismatch")
}

func TestPeerUpdatesLen(t *testing.T) {
ch := testutils.NewClient(t, nil)
defer ch.Close()
assert.Zero(t, ch.Peers().Len())
for i := 1; i < 5; i++ {
ch.Peers().Add(fmt.Sprintf("1.1.1.1:%d", i))
assert.Equal(t, ch.Peers().Len(), i)
}
for i := 4; i > 0; i-- {
assert.Equal(t, ch.Peers().Len(), i)
ch.Peers().Remove(fmt.Sprintf("1.1.1.1:%d", i))
}
assert.Zero(t, ch.Peers().Len())
}

func TestGetPeerAvoidPrevSelected(t *testing.T) {
const (
peer1 = "1.1.1.1:1"
Expand Down Expand Up @@ -153,6 +168,18 @@ func TestGetPeerAvoidPrevSelected(t *testing.T) {
continue
}

newPeer, err := peers.GetNew(rs.PrevSelectedPeers())
if len(tt.peers) == len(tt.prevSelected) {
if newPeer != nil || err != ErrNoNewPeers {
t.Errorf("%s: newPeer should not have been found %v: %v\n", tt.msg, newPeer, err)
}
} else {
if gotPeer != newPeer || err != nil {
t.Errorf("%s: expected equal peers, got %v new %v: %v\n",
tt.msg, gotPeer, newPeer, err)
}
}

got := gotPeer.HostPort()
if _, ok := tt.expected[got]; !ok {
t.Errorf("%s: got unexpected peer, expected one of %v got %v\n Peers = %v PrevSelected = %v",
Expand Down
14 changes: 6 additions & 8 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ var (
errUnknownID = errors.New("non-callReq for inactive ID")
)

// relayConn implements the relay.Connection interface.
type relayConn Connection

type relayItem struct {
*time.Timer

Expand Down Expand Up @@ -334,7 +331,7 @@ func (r *Relayer) handleCallReq(f lazyCallReq) error {
return nil
}

call, err := r.relayHost.Start(f, (*relayConn)(r.conn))
call, err := r.relayHost.Start(f, r.conn)
if err != nil {
// If we have a RateLimitDropError we record the statistic, but
// we *don't* send an error frame back to the client.
Expand All @@ -353,6 +350,11 @@ func (r *Relayer) handleCallReq(f lazyCallReq) error {
call.End()
}
r.conn.SendSystemError(f.Header.ID, f.Span(), err)

// If the RelayHost returns a protocol error, close the connection.
if GetSystemErrorCode(err) == ErrCodeProtocol {
return r.conn.close(LogField{"reason", "RelayHost returned protocol error"})
}
return nil
}

Expand Down Expand Up @@ -552,10 +554,6 @@ func (r *Relayer) handleLocalCallReq(cr lazyCallReq) bool {
return true
}

func (r *relayConn) RemoteHostPort() string {
return (*Connection)(r).RemotePeerInfo().HostPort
}

func frameTypeFor(f *Frame) frameType {
switch t := f.Header.messageType; t {
case messageTypeCallRes, messageTypeCallResContinue, messageTypeError, messageTypePingRes:
Expand Down
7 changes: 0 additions & 7 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@
// backwards-compatibility guarantee.
package relay

// Conn is an interface that exposes a bit of information about the underlying
// connection.
type Conn interface {
// RemoteHostPort returns the host:port of the remote peer.
RemoteHostPort() string
}

// CallFrame is an interface that abstracts access to the call req frame.
type CallFrame interface {
// Caller is the name of the originating service.
Expand Down
8 changes: 4 additions & 4 deletions relay/relaytest/func_host.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package relaytest

import (
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/relay"
)

type hostFunc struct {
ch *tchannel.Channel
stats *MockStats
fn func(cf relay.CallFrame, conn relay.Conn) (string, error)
fn func(relay.CallFrame, *tchannel.Connection) (string, error)
}

type hostFuncPeer struct {
Expand All @@ -18,15 +18,15 @@ type hostFuncPeer struct {
}

// HostFunc wraps a given function to implement tchannel.RelayHost.
func HostFunc(fn func(cf relay.CallFrame, conn relay.Conn) (string, error)) tchannel.RelayHost {
func HostFunc(fn func(relay.CallFrame, *tchannel.Connection) (string, error)) tchannel.RelayHost {
return &hostFunc{nil, NewMockStats(), fn}
}

func (hf *hostFunc) SetChannel(ch *tchannel.Channel) {
hf.ch = ch
}

func (hf *hostFunc) Start(cf relay.CallFrame, conn relay.Conn) (tchannel.RelayCall, error) {
func (hf *hostFunc) Start(cf relay.CallFrame, conn *tchannel.Connection) (tchannel.RelayCall, error) {
var peer *tchannel.Peer

peerHP, err := hf.fn(cf, conn)
Expand Down
2 changes: 1 addition & 1 deletion relay/relaytest/stub_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (rh *StubRelayHost) SetChannel(ch *tchannel.Channel) {
}

// Start starts a new RelayCall for the given call on a specific connection.
func (rh *StubRelayHost) Start(cf relay.CallFrame, conn relay.Conn) (tchannel.RelayCall, error) {
func (rh *StubRelayHost) Start(cf relay.CallFrame, _ *tchannel.Connection) (tchannel.RelayCall, error) {
// Get a peer from the subchannel.
peer, err := rh.ch.GetSubChannel(string(cf.Service())).Peers().Get(nil)
return &stubCall{rh.stats.Begin(cf), peer}, err
Expand Down
2 changes: 1 addition & 1 deletion relay_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type RelayHost interface {
// Start starts a new RelayCall given the call frame and connection.
// It may return a call and an error, in which case the caller will
// call Failed/End on the RelayCall.
Start(relay.CallFrame, relay.Conn) (RelayCall, error)
Start(relay.CallFrame, *Connection) (RelayCall, error)
}

// RelayCall abstracts away peer selection, stats, and any other business
Expand Down
39 changes: 34 additions & 5 deletions relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestRelayErrorsOnGetPeer(t *testing.T) {
}

for _, tt := range tests {
f := func(relay.CallFrame, relay.Conn) (string, error) {
f := func(relay.CallFrame, *Connection) (string, error) {
return tt.returnPeer, tt.returnErr
}

Expand Down Expand Up @@ -502,8 +502,8 @@ func TestRelayMakeOutgoingCall(t *testing.T) {
func TestRelayConnection(t *testing.T) {
var errTest = errors.New("test")
var wantHostPort string
getHost := func(call relay.CallFrame, conn relay.Conn) (string, error) {
assert.Equal(t, wantHostPort, conn.RemoteHostPort(), "Unexpected RemoteHostPort")
getHost := func(call relay.CallFrame, conn *Connection) (string, error) {
assert.Equal(t, wantHostPort, conn.RemotePeerInfo().HostPort, "Unexpected RemoteHostPort")
return "", errTest
}

Expand All @@ -519,6 +519,35 @@ func TestRelayConnection(t *testing.T) {
err := testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)
require.Error(t, err, "Expected CallEcho to fail")
assert.Contains(t, err.Error(), errTest.Error(), "Unexpected error")

// Verify that the relay has not closed any connections.
assert.Equal(t, 1, ts.Relay().IntrospectNumConnections(), "Relay should maintain client connection")
})
}

func TestRelayConnectionClosed(t *testing.T) {
protocolErr := NewSystemError(ErrCodeProtocol, "invalid service name")
getHost := func(call relay.CallFrame, conn *Connection) (string, error) {
return "", protocolErr
}

opts := testutils.NewOpts().
SetRelayOnly().
SetRelayHost(relaytest.HostFunc(getHost))
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
// The client receives a protocol error which causes the following logs.
opts := testutils.NewOpts().
AddLogFilter("Peer reported protocol error", 1).
AddLogFilter("Connection error", 1)
client := ts.NewClient(opts)

err := testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)
assert.Equal(t, protocolErr, err, "Unexpected error on call")

closedAll := testutils.WaitFor(time.Second, func() bool {
return ts.Relay().IntrospectNumConnections() == 0
})
assert.True(t, closedAll, "Relay should close client connection")
})
}

Expand Down Expand Up @@ -577,7 +606,7 @@ func TestRelayRejectsDuringClose(t *testing.T) {
}

func TestRelayRateLimitDrop(t *testing.T) {
getHost := func(call relay.CallFrame, conn relay.Conn) (string, error) {
getHost := func(call relay.CallFrame, _ *Connection) (string, error) {
return "", relay.RateLimitDropError{}
}

Expand Down Expand Up @@ -691,7 +720,7 @@ func TestRelayThroughSeparateRelay(t *testing.T) {
SetRelayOnly()
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
serverHP := ts.Server().PeerInfo().HostPort
dummyFactory := func(relay.CallFrame, relay.Conn) (string, error) {
dummyFactory := func(relay.CallFrame, *Connection) (string, error) {
panic("should not get invoked")
}
relay2Opts := testutils.NewOpts().SetRelayHost(relaytest.HostFunc(dummyFactory))
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ package tchannel
// VersionInfo identifies the version of the TChannel library.
// Due to lack of proper package management, this version string will
// be maintained manually.
const VersionInfo = "1.4.0"
const VersionInfo = "1.5.0"

0 comments on commit 0b7f160

Please sign in to comment.