Skip to content

Commit

Permalink
Merge pull request #570 from uber/dev
Browse files Browse the repository at this point in the history
Release version 1.2.3
  • Loading branch information
prashantv authored Jan 20, 2017
2 parents 2caa315 + dd39c35 commit aeda483
Show file tree
Hide file tree
Showing 39 changed files with 1,279 additions and 175 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
Changelog
=========

# v1.2.3

* Improve error messages when an argument reader is closed without
reading the EOF. (#567)
* thrift: Fix an issue where we return `nil` if we expected a Thrift exception
but none was found (e.g., exception is from the future). (#566)
* Fix ListenIP selecting docker interfaces over physical networks. (#565)
* Fix for error when a Thrift payload has completed decoding and attempts
to close the argument reader without waiting till EOF. (#564)
* thrift-gen: Fix "namespace go" being ignored even though the Apache thrift
generated code was respecting it. (#559)

# v1.2.2

* Add a unique channel ID for introspection (#548)
Expand Down
6 changes: 0 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ ARCH := $(shell uname -m)
THRIFT_REL := ./scripts/travis/thrift-release/$(PLATFORM)-$(ARCH)

OLD_GOPATH := $(GOPATH)
VENDOR_PATH := $(PWD)/.tmp/vendor

export PATH := $(realpath $(THRIFT_REL)):$(PATH)
export GOPATH := $(VENDOR_PATH):$(GOPATH)

# Cross language test args
TEST_HOST=127.0.0.1
Expand Down Expand Up @@ -58,9 +56,6 @@ get_thrift:
# Note that glide itself is still executed against the original GOPATH.
install:
GOPATH=$(OLD_GOPATH) glide --debug install --cache --cache-gopath
rm -rf $(VENDOR_PATH)
mkdir -p $(VENDOR_PATH)
mv vendor $(VENDOR_PATH)/src

install_lint:
ifdef SHOULD_LINT
Expand Down Expand Up @@ -176,7 +171,6 @@ thrift_gen:
$(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go
rm -rf trace/thrift/gen-go/tcollector && $(BUILD)/thrift-gen --generateThrift --inputFile trace/tcollector.thrift --outputDir trace/thrift/gen-go/

release_thrift_gen: clean setup
GOOS=linux GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_LINUX)/thrift-gen ./thrift/thrift-gen
Expand Down
5 changes: 5 additions & 0 deletions arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"encoding/json"
"io"
"io/ioutil"

"github.com/uber/tchannel-go/internal/argreader"
)

// ArgReader is the interface for the arg2 and arg3 streams on an
Expand Down Expand Up @@ -74,6 +76,9 @@ func (r ArgReadHelper) read(f func() error) error {
if err := f(); err != nil {
return err
}
if err := argreader.EnsureEmpty(r.reader, "read arg"); err != nil {
return err
}
return r.reader.Close()
}

Expand Down
11 changes: 11 additions & 0 deletions arguments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package tchannel
import (
"bytes"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -72,6 +74,15 @@ func TestJSONInputOutput(t *testing.T) {
assert.Equal(t, 20756, outObj.Value)
}

func TestReadNotEmpty(t *testing.T) {
// Note: The contents need to be larger than the default buffer size of bufio.NewReader.
r := bytes.NewReader([]byte("{}" + strings.Repeat("{}\n", 10000)))

var data map[string]interface{}
reader := NewArgReader(ioutil.NopCloser(r), nil)
require.Error(t, reader.ReadJSON(&data), "Read should fail due to extra bytes")
}

func BenchmarkArgReaderWriter(b *testing.B) {
obj := testObject{Name: "Foo", Value: 20756}
outObj := testObject{}
Expand Down
137 changes: 75 additions & 62 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@ import (
"golang.org/x/net/context"
)

const (
// CurrentProtocolVersion is the current version of the TChannel protocol
// supported by this stack
CurrentProtocolVersion = 0x02

// DefaultConnectTimeout is the default timeout used by net.Dial, if no timeout
// is specified in the context.
DefaultConnectTimeout = 5 * time.Second

// defaultConnectionBufferSize is the default size for the connection's
// read and write channels.
defaultConnectionBufferSize = 512
)

// PeerVersion contains version related information for a specific peer.
// These values are extracted from the init headers.
type PeerVersion struct {
Language string `json:"language"`
LanguageVersion string `json:"languageVersion"`
TChannelVersion string `json:"tchannelVersion"`
}

// PeerInfo contains information about a TChannel peer
type PeerInfo struct {
// The host and port that can be used to contact the peer, as encoded by net.JoinHostPort
Expand All @@ -49,6 +71,9 @@ type PeerInfo struct {

// IsEphemeral returns whether the remote host:port is ephemeral (e.g. not listening).
IsEphemeral bool `json:"isEphemeral"`

// Version returns the version information for the remote peer.
Version PeerVersion `json:"version"`
}

func (p PeerInfo) String() string {
Expand All @@ -72,14 +97,6 @@ func (p LocalPeerInfo) String() string {
return fmt.Sprintf("%v: %v", p.ServiceName, p.PeerInfo)
}

// CurrentProtocolVersion is the current version of the TChannel protocol
// supported by this stack
const CurrentProtocolVersion = 0x02

// DefaultConnectTimeout is the default timeout used by net.Dial, if no timeout
// is specified in the context.
const DefaultConnectTimeout = 5 * time.Second

var (
// ErrConnectionClosed is returned when a caller performs an method
// on a closed connection
Expand Down Expand Up @@ -115,7 +132,7 @@ type ConnectionOptions struct {
// The frame pool, allowing better management of frame buffers. Defaults to using raw heap.
FramePool FramePool

// The size of receive channel buffers. Defaults to 512.
// NOTE: This is deprecated and not used for anything.
RecvBufferSize int

// The size of send channel buffers. Defaults to 512.
Expand All @@ -142,8 +159,7 @@ type Connection struct {
channelConnectionCommon

connID uint32
checksumType ChecksumType
framePool FramePool
opts ConnectionOptions
conn net.Conn
localPeerInfo LocalPeerInfo
remotePeerInfo PeerInfo
Expand Down Expand Up @@ -227,6 +243,19 @@ func getTimeout(ctx context.Context) time.Duration {
return deadline.Sub(time.Now())
}

func (co ConnectionOptions) withDefaults() ConnectionOptions {
if co.ChecksumType == ChecksumTypeNone {
co.ChecksumType = ChecksumTypeCrc32
}
if co.FramePool == nil {
co.FramePool = DefaultFramePool
}
if co.SendBufferSize <= 0 {
co.SendBufferSize = defaultConnectionBufferSize
}
return co
}

// Creates a new Connection around an outbound connection initiated to a peer
func (ch *Channel) newOutboundConnection(ctx context.Context, hostPort string, events connectionEvents) (*Connection, error) {
timeout := getTimeout(ctx)
Expand Down Expand Up @@ -262,27 +291,7 @@ func (ch *Channel) newInboundConnection(conn net.Conn, events connectionEvents)

// Creates a new connection in a given initial state
func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState connectionState, events connectionEvents) *Connection {
opts := &ch.connectionOptions

checksumType := opts.ChecksumType
if checksumType == ChecksumTypeNone {
checksumType = ChecksumTypeCrc32C
}

sendBufferSize := opts.SendBufferSize
if sendBufferSize <= 0 {
sendBufferSize = 512
}

recvBufferSize := opts.RecvBufferSize
if recvBufferSize <= 0 {
recvBufferSize = 512
}

framePool := opts.FramePool
if framePool == nil {
framePool = DefaultFramePool
}
opts := ch.connectionOptions.withDefaults()

connID := _nextConnID.Inc()
log := ch.log.WithFields(LogFields{
Expand All @@ -298,13 +307,12 @@ func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState

connID: connID,
conn: conn,
framePool: framePool,
opts: opts,
state: initialState,
sendCh: make(chan *Frame, sendBufferSize),
sendCh: make(chan *Frame, opts.SendBufferSize),
stopCh: make(chan struct{}),
localPeerInfo: peerInfo,
outboundHP: outboundHP,
checksumType: checksumType,
inbound: newMessageExchangeSet(log, messageExchangeSetInbound),
outbound: newMessageExchangeSet(log, messageExchangeSetOutbound),
handler: channelHandler{ch},
Expand Down Expand Up @@ -404,7 +412,7 @@ func (c *Connection) sendInit(ctx context.Context) error {
}
defer c.pendingExchangeMethodDone()

mex, err := c.outbound.newExchange(ctx, c.framePool, req.messageType(), req.ID(), 1)
mex, err := c.outbound.newExchange(ctx, c.opts.FramePool, req.messageType(), req.ID(), 1)
if err != nil {
return c.connectionError("create init req", err)
}
Expand Down Expand Up @@ -441,18 +449,11 @@ func (c *Connection) handleInitReq(frame *Frame) {
return
}

var ok bool
if c.remotePeerInfo.HostPort, ok = req.initParams[InitParamHostPort]; !ok {
c.protocolError(id, fmt.Errorf("header %v is required", InitParamHostPort))
return
}
if c.remotePeerInfo.ProcessName, ok = req.initParams[InitParamProcessName]; !ok {
c.protocolError(id, fmt.Errorf("header %v is required", InitParamProcessName))
if err := c.parseRemotePeer(req.initParams); err != nil {
c.protocolError(id, err)
return
}

c.parseRemotePeerAddress()

res := initRes{initMessage{id: frame.Header.ID}}
res.initParams = c.getInitParams()
res.Version = CurrentProtocolVersion
Expand All @@ -469,7 +470,6 @@ func (c *Connection) handleInitReq(frame *Frame) {

return nil
})

c.callOnActive()
}

Expand All @@ -482,7 +482,7 @@ func (c *Connection) ping(ctx context.Context) error {
defer c.pendingExchangeMethodDone()

req := &pingReq{id: c.NextMessageID()}
mex, err := c.outbound.newExchange(ctx, c.framePool, req.messageType(), req.ID(), 1)
mex, err := c.outbound.newExchange(ctx, c.opts.FramePool, req.messageType(), req.ID(), 1)
if err != nil {
return c.connectionError("create ping exchange", err)
}
Expand Down Expand Up @@ -563,12 +563,9 @@ func (c *Connection) handleInitRes(frame *Frame) bool {
c.protocolError(frame.Header.ID, fmt.Errorf("unsupported protocol version %d from peer", res.Version))
return true
}
if _, ok := res.initParams[InitParamHostPort]; !ok {
c.protocolError(frame.Header.ID, fmt.Errorf("header %v is required", InitParamHostPort))
return true
}
if _, ok := res.initParams[InitParamProcessName]; !ok {
c.protocolError(frame.Header.ID, fmt.Errorf("header %v is required", InitParamProcessName))

if err := c.parseRemotePeer(res.initParams); err != nil {
c.protocolError(frame.Header.ID, err)
return true
}

Expand Down Expand Up @@ -607,9 +604,9 @@ func (c *Connection) handleInitRes(frame *Frame) bool {

// sendMessage sends a standalone message (typically a control message)
func (c *Connection) sendMessage(msg message) error {
frame := c.framePool.Get()
frame := c.opts.FramePool.Get()
if err := frame.write(msg); err != nil {
c.framePool.Release(frame)
c.opts.FramePool.Release(frame)
return err
}

Expand All @@ -633,7 +630,7 @@ func (c *Connection) recvMessage(ctx context.Context, msg message, mex *messageE
}

err = frame.read(msg)
c.framePool.Release(frame)
c.opts.FramePool.Release(frame)
return err
}

Expand All @@ -649,7 +646,7 @@ func (c *Connection) NextMessageID() uint32 {

// SendSystemError sends an error frame for the given system error.
func (c *Connection) SendSystemError(id uint32, span Span, err error) error {
frame := c.framePool.Get()
frame := c.opts.FramePool.Get()

if err := frame.write(&errorMessage{
id: id,
Expand Down Expand Up @@ -706,7 +703,7 @@ func (c *Connection) logConnectionError(site string, err error) error {
errCode = se.Code()
logger.Error("Connection error.")
} else {
logger.Warn("Connection error.")
logger.Info("Connection error.")
}
}
return NewWrappedSystemError(errCode, err)
Expand Down Expand Up @@ -778,14 +775,14 @@ func (c *Connection) readState() connectionState {
// since we cannot process new frames until the initialization is complete.
func (c *Connection) readFrames(_ uint32) {
for {
frame := c.framePool.Get()
frame := c.opts.FramePool.Get()
if err := frame.ReadIn(c.conn); err != nil {
if c.closeNetworkCalled.Load() == 0 {
c.connectionError("read frames", err)
} else {
c.log.Debugf("Ignoring error after connection was closed: %v", err)
}
c.framePool.Release(frame)
c.opts.FramePool.Release(frame)
return
}

Expand All @@ -796,7 +793,7 @@ func (c *Connection) readFrames(_ uint32) {
releaseFrame = c.handleFrameRelay(frame)
}
if releaseFrame {
c.framePool.Release(frame)
c.opts.FramePool.Release(frame)
}
}
}
Expand Down Expand Up @@ -862,7 +859,7 @@ func (c *Connection) writeFrames(_ uint32) {
}

err := f.WriteOut(c.conn)
c.framePool.Release(f)
c.opts.FramePool.Release(f)
if err != nil {
c.connectionError("write frames", err)
return
Expand Down Expand Up @@ -1010,6 +1007,22 @@ func (c *Connection) closeNetwork() {
}
}

func (c *Connection) parseRemotePeer(p initParams) error {
var ok bool
if c.remotePeerInfo.HostPort, ok = p[InitParamHostPort]; !ok {
return fmt.Errorf("header %v is required", InitParamHostPort)
}
if c.remotePeerInfo.ProcessName, ok = p[InitParamProcessName]; !ok {
return fmt.Errorf("header %v is required", InitParamProcessName)
}

c.parseRemotePeerAddress()
c.remotePeerInfo.Version.Language = p[InitParamTChannelLanguage]
c.remotePeerInfo.Version.LanguageVersion = p[InitParamTChannelLanguageVersion]
c.remotePeerInfo.Version.TChannelVersion = p[InitParamTChannelVersion]
return nil
}

// parseRemotePeerAddress parses remote peer info into individual components and
// caches them on the Connection to be used to set peer tags on OpenTracing Span.
func (c *Connection) parseRemotePeerAddress() {
Expand Down
Loading

0 comments on commit aeda483

Please sign in to comment.