Skip to content

Commit

Permalink
Merge pull request #585 from uber/dev
Browse files Browse the repository at this point in the history
Release version 1.4.0
  • Loading branch information
prashantv authored Mar 1, 2017
2 parents 7938782 + 85cab57 commit 2feb388
Show file tree
Hide file tree
Showing 25 changed files with 932 additions and 438 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go:
- 1.5
- 1.6
- 1.7
- tip
- 1.8

matrix:
include:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

# v1.4.0

* Add version information to the channel's LocalPeerInfo.
* Add peers package for peer management utilities such as
consistent peer selection.
* Fix SetScoreStrategy not rescoring existing peers. (#583).

# v1.3.0

* Exposes the channel's RootPeerList with `channel.RootPeers()`.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ endif
PATH := $(GOPATH)/bin:$(PATH)
EXAMPLES=./examples/bench/server ./examples/bench/client ./examples/ping ./examples/thrift ./examples/hyperbahn/echo-server
ALL_PKGS := $(shell glide nv)
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
PROD_PKGS := . ./http ./hyperbahn ./json ./peers ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
THRIFT_GEN_RELEASE := ./thrift-gen-release
Expand Down
112 changes: 78 additions & 34 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -83,6 +85,10 @@ type ChannelOptions struct {
// Tracer is an OpenTracing Tracer used to manage distributed tracing spans.
// If not set, opentracing.GlobalTracer() is used.
Tracer opentracing.Tracer

// Handler is an alternate handler for all inbound requests, overriding the
// default handler that delegates to a subchannel.
Handler Handler
}

// ChannelState is the state of a channel.
Expand Down Expand Up @@ -124,6 +130,7 @@ type Channel struct {
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
handler Handler

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -209,25 +216,40 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
tracer: opts.Tracer,
},
chID: chID,
connectionOptions: opts.DefaultConnectionOptions,
connectionOptions: opts.DefaultConnectionOptions.withDefaults(),
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
}
ch.peers = newRootPeerList(ch).newChild()

if opts.Handler != nil {
ch.handler = opts.Handler
} else {
ch.handler = channelHandler{ch}
}

ch.mutable.peerInfo = LocalPeerInfo{
PeerInfo: PeerInfo{
ProcessName: processName,
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
},
},
ServiceName: serviceName,
}
ch.mutable.state = ChannelClient
ch.mutable.conns = make(map[uint32]*Connection)
ch.createCommonStats()

ch.registerInternal()
// Register internal unless the root handler has been overridden, since
// Register will panic.
if opts.Handler == nil {
ch.registerInternal()
}

registerNewChannel(ch)

Expand Down Expand Up @@ -322,7 +344,13 @@ type Registrar interface {
// under that. You may also use SetHandler on a SubChannel to set up a
// catch-all Handler for that service. See the docs for SetHandler for more
// information.
//
// Register panics if the channel was constructed with an alternate root
// handler.
func (ch *Channel) Register(h Handler, methodName string) {
if _, ok := ch.handler.(channelHandler); !ok {
panic("can't register handler when channel configured with alternate root handler")
}
ch.GetSubChannel(ch.PeerInfo().ServiceName).Register(h, methodName)
}

Expand Down Expand Up @@ -417,18 +445,18 @@ func (ch *Channel) serve() {

acceptBackoff = 0

// Register the connection in the peer once the channel is set up.
events := connectionEvents{
OnActive: ch.inboundConnectionActive,
OnCloseStateChange: ch.connectionCloseStateChange,
OnExchangeUpdated: ch.exchangeUpdated,
}
if _, err := ch.newInboundConnection(netConn, events); err != nil {
// Server is getting overloaded - begin rejecting new connections
ch.log.WithFields(ErrField(err)).Error("Couldn't create new TChannelConnection for incoming conn.")
netConn.Close()
continue
}
// Perform the connection handshake in a background goroutine.
go func() {
// Register the connection in the peer once the channel is set up.
events := connectionEvents{
OnActive: ch.inboundConnectionActive,
OnCloseStateChange: ch.connectionCloseStateChange,
OnExchangeUpdated: ch.exchangeUpdated,
}
if _, err := ch.inboundHandshake(context.Background(), netConn, events); err != nil {
netConn.Close()
}
}()
}
}

Expand Down Expand Up @@ -496,28 +524,45 @@ func (ch *Channel) Connect(ctx context.Context, hostPort string) (*Connection, e
return nil, GetContextError(err)
}

c, err := ch.newOutboundConnection(ctx, hostPort, events)
timeout := getTimeout(ctx)
tcpConn, err := dialContext(ctx, hostPort)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
ch.log.WithFields(
LogField{"remoteHostPort", hostPort},
LogField{"timeout", timeout},
).Info("Outbound net.Dial timed out.")
err = ErrTimeout
} else if ctx.Err() == context.Canceled {
ch.log.WithFields(
LogField{"remoteHostPort", hostPort},
).Info("Outbound net.Dial was cancelled.")
err = GetContextError(ErrRequestCancelled)
} else {
ch.log.WithFields(
ErrField(err),
LogField{"remoteHostPort", hostPort},
).Info("Outbound net.Dial failed.")
}
return nil, err
}

if err := c.sendInit(ctx); err != nil {
return nil, err
}

// It's possible that the connection we just created responds with a host:port
// that is not what we tried to connect to. E.g., we may have connected to
// 127.0.0.1:1234, but the returned host:port may be 10.0.0.1:1234.
// In this case, the connection won't be added to 127.0.0.1:1234 peer
// and so future calls to that peer may end up creating new connections. To
// avoid this issue, and to avoid clients being aware of any TCP relays, we
// add the connection to the intended peer.
if hostPort != c.remotePeerInfo.HostPort {
c.log.Debugf("Outbound connection host:port mismatch, adding to peer %v", c.remotePeerInfo.HostPort)
ch.addConnectionToPeer(hostPort, c, outbound)
conn, err := ch.outboundHandshake(ctx, tcpConn, hostPort, events)
if conn != nil {
// It's possible that the connection we just created responds with a host:port
// that is not what we tried to connect to. E.g., we may have connected to
// 127.0.0.1:1234, but the returned host:port may be 10.0.0.1:1234.
// In this case, the connection won't be added to 127.0.0.1:1234 peer
// and so future calls to that peer may end up creating new connections. To
// avoid this issue, and to avoid clients being aware of any TCP relays, we
// add the connection to the intended peer.
if hostPort != conn.remotePeerInfo.HostPort {
conn.log.Debugf("Outbound connection host:port mismatch, adding to peer %v", conn.remotePeerInfo.HostPort)
ch.addConnectionToPeer(hostPort, conn, outbound)
}
}

return c, nil
return conn, err
}

// exchangeUpdated updates the peer heap.
Expand All @@ -537,7 +582,7 @@ func (ch *Channel) exchangeUpdated(c *Connection) {

// updatePeer updates the score of the peer and update it's position in heap as well.
func (ch *Channel) updatePeer(p *Peer) {
ch.peers.updatePeer(p)
ch.peers.onPeerChange(p)
ch.subChannels.updatePeer(p)
p.callOnUpdateComplete()
}
Expand Down Expand Up @@ -569,8 +614,7 @@ func (ch *Channel) connectionActive(c *Connection, direction connectionDirection

if added := ch.addConnection(c, direction); !added {
// The channel isn't in a valid state to accept this connection, close the connection.
c.log.Debugf("Closing connection due to closing channel state")
c.Close()
c.close(LogField{"reason", "new active connection on closing channel"})
return
}

Expand Down Expand Up @@ -716,7 +760,7 @@ func (ch *Channel) Close() {
ch.mutable.Unlock()

for _, c := range connections {
c.Close()
c.close(LogField{"reason", "channel closing"})
}

if channelClosed {
Expand Down
23 changes: 23 additions & 0 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"io/ioutil"
"math"
"os"
"runtime"
"strings"
"testing"
"time"

Expand All @@ -41,6 +43,27 @@ func toMap(fields LogFields) map[string]interface{} {
return m
}

func TestNewChannel(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
},
},
}, ch.PeerInfo(), "Wrong local peer info")
}

func TestLoggers(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
Logger: NewLogger(ioutil.Discard),
Expand Down
Loading

0 comments on commit 2feb388

Please sign in to comment.