diff --git a/.gitignore b/.gitignore index 6c4c09bf..0ba2f093 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ lint.log .idea tchannel-go.iml .vscode +.bin/ diff --git a/CHANGELOG.md b/CHANGELOG.md index ba06703a..22af7ce2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,14 @@ Changelog ========= +# v1.6.0 + +* Locks Apache Thrift to version 0.9.3 to maintain backward-compatibility. +* Add `OnPeerStatusChanged` channel option to receive a notification each time + the number of available connections changes for any given peer. +* Set DiffServ (QoS) bit on outbound connections. +* Improve resilience of the frame parser. + # v1.5.0 * Add `PeerList.Len` to expose the number of peers in the peer list. diff --git a/Makefile b/Makefile index ca6fb7b9..d3dace9d 100644 --- a/Makefile +++ b/Makefile @@ -23,11 +23,10 @@ SRCS := $(foreach pkg,$(PKGS),$(wildcard $(pkg)/*.go)) PLATFORM := $(shell uname -s | tr '[:upper:]' '[:lower:]') ARCH := $(shell uname -m) -THRIFT_REL := ./scripts/travis/thrift-release/$(PLATFORM)-$(ARCH) OLD_GOPATH := $(GOPATH) -export PATH := $(realpath $(THRIFT_REL)):$(PATH) +BIN := $(shell pwd)/.bin # Cross language test args TEST_HOST=127.0.0.1 @@ -37,6 +36,10 @@ TEST_PORT=0 all: test examples +$(BIN)/thrift: + mkdir -p $(BIN) + scripts/install-thrift.sh $(BIN) + packages_test: go list -json ./... | jq -r '. | select ((.TestGoFiles | length) > 0) | .ImportPath' @@ -46,9 +49,6 @@ setup: mkdir -p $(THRIFT_GEN_RELEASE_LINUX) mkdir -p $(THRIFT_GEN_RELEASE_DARWIN) -get_thrift: - scripts/travis/get-thrift.sh - # We want to remove `vendor` dir because thrift-gen tests don't work with it. # However, glide install even with --cache-gopath option leaves GOPATH at HEAD, # not at the desired versions from glide.lock, which are only applied to `vendor` @@ -70,7 +70,7 @@ install_glide: # but have to pin to 0.12.3 due to https://github.com/Masterminds/glide/issues/745 GOPATH=$(OLD_GOPATH) go get -u github.com/Masterminds/glide && cd $(OLD_GOPATH)/src/github.com/Masterminds/glide && git checkout v0.12.3 && go install -install_ci: install_glide install_lint get_thrift install +install_ci: $(BIN)/thrift install_glide install_lint install GOPATH=$(OLD_GOPATH) go get -u github.com/mattn/goveralls ifdef CROSSDOCK $(MAKE) install_docker_ci @@ -100,23 +100,23 @@ else $(MAKE) test endif -test: clean setup install_test check_no_test_deps +test: clean setup install_test check_no_test_deps $(BIN)/thrift @echo Testing packages: - go test -parallel=4 $(TEST_ARG) $(ALL_PKGS) + PATH=$(BIN):$$PATH go test -parallel=4 $(TEST_ARG) $(ALL_PKGS) @echo Running frame pool tests - go test -run TestFramesReleased -stressTest $(TEST_ARG) + PATH=$(BIN):$$PATH go test -run TestFramesReleased -stressTest $(TEST_ARG) check_no_test_deps: ! go list -json $(PROD_PKGS) | jq -r .Deps[] | grep -e test -e mock -benchmark: clean setup +benchmark: clean setup $(BIN)/thrift echo Running benchmarks: - go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE + PATH=$(BIN)::$$PATH go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE -cover_profile: clean setup +cover_profile: clean setup $(BIN)/thrift @echo Testing packages: mkdir -p $(BUILD) - go test ./ $(TEST_ARG) -coverprofile=$(BUILD)/coverage.out + PATH=$(BIN)::$$PATH go test ./ $(TEST_ARG) -coverprofile=$(BUILD)/coverage.out cover: cover_profile go tool cover -html=$(BUILD)/coverage.out @@ -167,12 +167,12 @@ examples: clean setup thrift_example go build -o $(BUILD)/examples/bench/runner ./examples/bench/runner.go go build -o $(BUILD)/examples/test_server ./examples/test_server -thrift_gen: +thrift_gen: $(BIN)/thrift go build -o $(BUILD)/thrift-gen ./thrift/thrift-gen - $(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/ - $(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go - $(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go - $(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go + PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/ + PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go + PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go + PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go release_thrift_gen: clean setup GOOS=linux GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_LINUX)/thrift-gen ./thrift/thrift-gen @@ -180,5 +180,5 @@ release_thrift_gen: clean setup tar -czf thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE) mv thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)/ -.PHONY: all help clean fmt format get_thrift install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint +.PHONY: all help clean fmt format install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint .SILENT: all help clean fmt format test lint diff --git a/all_channels.go b/all_channels.go index 3c4e4f2f..01d81403 100644 --- a/all_channels.go +++ b/all_channels.go @@ -20,7 +20,10 @@ package tchannel -import "sync" +import ( + "fmt" + "sync" +) // channelMap is used to ensure that applications don't create multiple channels with // the same service name in a single process. @@ -34,7 +37,10 @@ var channelMap = struct { func registerNewChannel(ch *Channel) { serviceName := ch.ServiceName() ch.createdStack = string(getStacks(false /* all */)) - ch.log.Debugf("NewChannel created at %s", ch.createdStack) + ch.log.WithFields( + LogField{"channelPtr", fmt.Sprintf("%p", ch)}, + LogField{"createdStack", ch.createdStack}, + ).Info("Created new channel.") channelMap.Lock() defer channelMap.Unlock() diff --git a/channel.go b/channel.go index b410cdc3..6c2746ed 100644 --- a/channel.go +++ b/channel.go @@ -57,7 +57,9 @@ type ChannelOptions struct { // The name of the process, for logging and reporting to peers ProcessName string - // OnPeerStatusChanged + // OnPeerStatusChanged is an optional callback that receives a notification + // whenever the channel establishes a usable connection to a peer, or loses + // a connection to a peer. OnPeerStatusChanged func(*Peer) // The logger to use for this channel @@ -123,14 +125,15 @@ const ( type Channel struct { channelConnectionCommon - chID uint32 - createdStack string - commonStatsTags map[string]string - connectionOptions ConnectionOptions - peers *PeerList - relayHost RelayHost - relayMaxTimeout time.Duration - handler Handler + chID uint32 + createdStack string + commonStatsTags map[string]string + connectionOptions ConnectionOptions + peers *PeerList + relayHost RelayHost + relayMaxTimeout time.Duration + handler Handler + onPeerStatusChanged func(*Peer) // mutable contains all the members of Channel which are mutable. mutable struct { @@ -220,7 +223,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { relayHost: opts.RelayHost, relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger), } - ch.peers = newRootPeerList(ch).newChild() + ch.peers = newRootPeerList(ch, opts.OnPeerStatusChanged).newChild() if opts.Handler != nil { ch.handler = opts.Handler @@ -287,7 +290,9 @@ func (ch *Channel) Serve(l net.Listener) error { ch.log = ch.log.WithFields(LogField{"hostPort", mutable.peerInfo.HostPort}) peerInfo := mutable.peerInfo - ch.log.Debugf("%v (%v) listening on %v", peerInfo.ProcessName, peerInfo.ServiceName, peerInfo.HostPort) + ch.log.WithFields( + LogField{"hostPort", peerInfo.HostPort}, + ).Info("Channel is listening.") go ch.serve() return nil } @@ -738,7 +743,7 @@ func (ch *Channel) State() ChannelState { // Close starts a graceful Close for the channel. This does not happen immediately: // 1. This call closes the Listener and starts closing connections. // 2. When all incoming connections are drained, the connection blocks new outgoing calls. -// 3. When all connections are drainged, the channel's state is updated to Closed. +// 3. When all connections are drained, the channel's state is updated to Closed. func (ch *Channel) Close() { ch.Logger().Info("Channel.Close called.") var connections []*Connection diff --git a/connection.go b/connection.go index 581b21b0..b8c0cd65 100644 --- a/connection.go +++ b/connection.go @@ -29,8 +29,12 @@ import ( "sync" "time" + "github.com/uber/tchannel-go/tos" + "github.com/uber-go/atomic" "golang.org/x/net/context" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" ) const ( @@ -129,6 +133,9 @@ type ConnectionOptions struct { // The type of checksum to use when sending messages. ChecksumType ChecksumType + + // ToS class name marked on outbound packets. + TosPriority tos.ToS } // connectionEvents are the events that can be triggered by a connection. @@ -176,10 +183,6 @@ type Connection struct { stoppedExchanges atomic.Uint32 // pendingMethods is the number of methods running that may block closing of sendCh. pendingMethods atomic.Int64 - // ignoreRemotePeer is used to avoid a data race between setting the RemotePeerInfo - // and the connection failing, causing a read of the RemotePeerInfo at the same time. - ignoreRemotePeer bool - // remotePeerAddress is used as a cache for remote peer address parsed into individual // components that can be used to set peer tags on OpenTracing Span. remotePeerAddress peerAddressComponents @@ -237,6 +240,23 @@ func (co ConnectionOptions) withDefaults() ConnectionOptions { return co } +func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) error { + tcpAddr, isTCP := c.RemoteAddr().(*net.TCPAddr) + if !isTCP { + return nil + } + + // Handle dual stack listeners and set Traffic Class. + var err error + switch ip := tcpAddr.IP; { + case ip.To16() != nil && ip.To4() == nil: + err = ipv6.NewConn(c).SetTrafficClass(int(tosPriority)) + case ip.To4() != nil: + err = ipv4.NewConn(c).SetTOS(int(tosPriority)) + } + return err +} + func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection { opts := ch.connectionOptions.withDefaults() @@ -279,6 +299,12 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str commonStatsTags: ch.commonStatsTags, } + if tosPriority := opts.TosPriority; tosPriority > 0 { + if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil { + log.WithFields(ErrField(err)).Error("Failed to set ToS priority.") + } + } + c.nextMessageID.Store(initialID) c.log = log c.inbound.onRemoved = c.checkExchanges @@ -501,12 +527,6 @@ func (c *Connection) logConnectionError(site string, err error) error { // connectionError handles a connection level error func (c *Connection) connectionError(site string, err error) error { - // Avoid racing with setting the peer info. - c.withStateLock(func() error { - c.ignoreRemotePeer = true - return nil - }) - var closeLogFields LogFields if err == io.EOF { closeLogFields = LogFields{{"reason", "network connection EOF"}} diff --git a/connection_test.go b/connection_test.go index 2286d719..0060fdc3 100644 --- a/connection_test.go +++ b/connection_test.go @@ -36,10 +36,13 @@ import ( "github.com/uber/tchannel-go/relay/relaytest" "github.com/uber/tchannel-go/testutils" "github.com/uber/tchannel-go/testutils/testreader" + "github.com/uber/tchannel-go/tos" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" ) // Values used in tests @@ -106,6 +109,20 @@ func writeFlushStr(w ArgWriter, d string) error { return w.Flush() } +func isTosPriority(c net.Conn, tosPriority tos.ToS) (bool, error) { + var connTosPriority int + var err error + + switch ip := c.RemoteAddr().(*net.TCPAddr).IP; { + case ip.To16() != nil && ip.To4() == nil: + connTosPriority, err = ipv6.NewConn(c).TrafficClass() + case ip.To4() != nil: + connTosPriority, err = ipv4.NewConn(c).TOS() + } + + return connTosPriority == int(tosPriority), err +} + func TestRoundTrip(t *testing.T) { testutils.WithTestServer(t, nil, func(ts *testutils.TestServer) { handler := newTestHandler(t) @@ -546,15 +563,15 @@ func TestWriteArg3AfterTimeout(t *testing.T) { } ts.Register(HandlerFunc(handler), "call") - ctx, cancel := NewContext(testutils.Timeout(50 * time.Millisecond)) + ctx, cancel := NewContext(testutils.Timeout(100 * time.Millisecond)) defer cancel() _, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "call", nil, nil) assert.Equal(t, err, ErrTimeout, "Call should timeout") - // Wait for the write to complete, make sure there's no errors. + // Wait for the write to complete, make sure there are no errors. select { - case <-time.After(testutils.Timeout(30 * time.Millisecond)): + case <-time.After(testutils.Timeout(60 * time.Millisecond)): t.Errorf("Handler should have failed due to timeout") case <-timedOut: } @@ -831,3 +848,125 @@ func TestConnectionIDs(t *testing.T) { assert.Equal(t, []uint32{1}, inbound, "Unexpected outbound IDs") }) } + +func TestTosPriority(t *testing.T) { + ctx, cancel := NewContext(time.Second) + defer cancel() + + opts := testutils.NewOpts().SetServiceName("s1").SetTosPriority(tos.Lowdelay) + testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) { + ts.Register(raw.Wrap(newTestHandler(t)), "echo") + + outbound, err := ts.Server().BeginCall(ctx, ts.HostPort(), "s1", "echo", nil) + require.NoError(t, err, "BeginCall failed") + + _, outboundNetConn := OutboundConnection(outbound) + connTosPriority, err := isTosPriority(outboundNetConn, tos.Lowdelay) + require.NoError(t, err, "Checking TOS priority failed") + assert.Equal(t, connTosPriority, true) + _, _, _, err = raw.WriteArgs(outbound, []byte("arg2"), []byte("arg3")) + require.NoError(t, err, "Failed to write to outbound conn") + }) +} + +func TestPeerStatusChangeClientReduction(t *testing.T) { + sopts := testutils.NewOpts().NoRelay() + testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) { + server := ts.Server() + testutils.RegisterEcho(server, nil) + changes := make(chan int, 2) + + copts := testutils.NewOpts().SetOnPeerStatusChanged(func(p *Peer) { + i, o := p.NumConnections() + assert.Equal(t, 0, i, "no inbound connections to client") + changes <- o + }) + + // Induce the creation of a connection from client to server. + client := ts.NewClient(copts) + require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)) + assert.Equal(t, 1, <-changes, "event for first connection") + + // Re-use + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + + // Induce the destruction of a connection from the server to the client. + server.Close() + assert.Equal(t, 0, <-changes, "event for second disconnection") + + client.Close() + assert.Len(t, changes, 0, "unexpected peer status changes") + }) +} + +func TestPeerStatusChangeClient(t *testing.T) { + sopts := testutils.NewOpts().NoRelay() + testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) { + server := ts.Server() + testutils.RegisterEcho(server, nil) + changes := make(chan int, 2) + + copts := testutils.NewOpts().SetOnPeerStatusChanged(func(p *Peer) { + i, o := p.NumConnections() + assert.Equal(t, 0, i, "no inbound connections to client") + changes <- o + }) + + // Induce the creation of a connection from client to server. + client := ts.NewClient(copts) + require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil)) + assert.Equal(t, 1, <-changes, "event for first connection") + + // Re-use + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + + // Induce the creation of a second connection from client to server. + pl := client.RootPeers() + p := pl.GetOrAdd(ts.HostPort()) + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, testutils.Timeout(100*time.Millisecond)) + defer cancel() + _, err := p.Connect(ctx) + require.NoError(t, err) + assert.Equal(t, 2, <-changes, "event for second connection") + + // Induce the destruction of a connection from the server to the client. + server.Close() + <-changes // May be 1 or 0 depending on timing. + assert.Equal(t, 0, <-changes, "event for second disconnection") + + client.Close() + assert.Len(t, changes, 0, "unexpected peer status changes") + }) +} + +func TestPeerStatusChangeServer(t *testing.T) { + changes := make(chan int, 10) + sopts := testutils.NewOpts().NoRelay().SetOnPeerStatusChanged(func(p *Peer) { + i, o := p.NumConnections() + assert.Equal(t, 0, o, "no outbound connections from server") + changes <- i + }) + testutils.WithTestServer(t, sopts, func(ts *testutils.TestServer) { + server := ts.Server() + testutils.RegisterEcho(server, nil) + + copts := testutils.NewOpts() + for i := 0; i < 5; i++ { + client := ts.NewClient(copts) + + // Open + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + assert.Equal(t, 1, <-changes, "one event on new connection") + + // Re-use + testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName()) + assert.Len(t, changes, 0, "no new events on re-used connection") + + // Close + client.Close() + assert.Equal(t, 0, <-changes, "one event on lost connection") + } + }) + assert.Len(t, changes, 0, "unexpected peer status changes") +} diff --git a/frame.go b/frame.go index 85ace7c1..646cac1d 100644 --- a/frame.go +++ b/frame.go @@ -136,7 +136,10 @@ func (f *Frame) ReadIn(r io.Reader) error { if err := f.Header.read(&rbuf); err != nil { return err } - if f.Header.PayloadSize() > 0 { + switch payloadSize := f.Header.PayloadSize(); { + case payloadSize > MaxFramePayloadSize: + return fmt.Errorf("invalid frame size %v", f.Header.size) + case payloadSize > 0: if _, err := io.ReadFull(r, f.SizedPayload()); err != nil { return err } diff --git a/frame_test.go b/frame_test.go index 949ab307..56e177e0 100644 --- a/frame_test.go +++ b/frame_test.go @@ -22,10 +22,13 @@ package tchannel import ( "bytes" + "encoding/binary" "encoding/json" "io" + "math" "testing" "testing/iotest" + "testing/quick" "github.com/uber/tchannel-go/testutils/testreader" "github.com/uber/tchannel-go/typed" @@ -142,3 +145,139 @@ func TestMessageType(t *testing.T) { require.NoError(t, err, "Error writing message to frame.") assert.Equal(t, messageTypeCallReq, frame.messageType(), "Failed to read message type from frame.") } + +func TestFrameReadIn(t *testing.T) { + maxPayload := bytes.Repeat([]byte{1}, MaxFramePayloadSize) + tests := []struct { + msg string + bs []byte + wantFrameHeader FrameHeader + wantFramePayload []byte + wantErr string + }{ + { + msg: "frame with no payload", + bs: []byte{ + 0, 16 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + 9, 8, 7, 6, 5, 4, 3, 2, // reserved + }, + wantFrameHeader: FrameHeader{ + size: 16, + messageType: 1, + reserved1: 2, + ID: 3, + // reserved: [8]byte{9, 8, 7, 6, 5, 4, 3, 2}, // currently ignored. + }, + wantFramePayload: []byte{}, + }, + { + msg: "frame with small payload", + bs: []byte{ + 0, 18 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + 9, 8, 7, 6, 5, 4, 3, 2, // reserved + 100, 200, // payload + }, + wantFrameHeader: FrameHeader{ + size: 18, + messageType: 1, + reserved1: 2, + ID: 3, + // reserved: [8]byte{9, 8, 7, 6, 5, 4, 3, 2}, // currently ignored. + }, + wantFramePayload: []byte{100, 200}, + }, + { + msg: "frame with max size", + bs: append([]byte{ + math.MaxUint8, math.MaxUint8 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + 9, 8, 7, 6, 5, 4, 3, 2, // reserved + }, maxPayload...), + wantFrameHeader: FrameHeader{ + size: math.MaxUint16, + messageType: 1, + reserved1: 2, + ID: 3, + // currently ignored. + // reserved: [8]byte{9, 8, 7, 6, 5, 4, 3, 2}, + }, + wantFramePayload: maxPayload, + }, + { + msg: "frame with 0 size", + bs: []byte{ + 0, 0 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + 9, 8, 7, 6, 5, 4, 3, 2, // reserved + }, + wantErr: "invalid frame size 0", + }, + { + msg: "frame with size < HeaderSize", + bs: []byte{ + 0, 15 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + 9, 8, 7, 6, 5, 4, 3, 2, // reserved + }, + wantErr: "invalid frame size 15", + }, + { + msg: "frame with partial header", + bs: []byte{ + 0, 16 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + // missing reserved bytes + }, + wantErr: "unexpected EOF", + }, + { + msg: "frame with partial payload", + bs: []byte{ + 0, 24 /* size */, 1 /* type */, 2 /* reserved */, 0, 0, 0, 3, /* id */ + 9, 8, 7, 6, 5, 4, 3, 2, // reserved + 1, 2, // partial payload + }, + wantErr: "unexpected EOF", + }, + } + + for _, tt := range tests { + f := DefaultFramePool.Get() + r := bytes.NewReader(tt.bs) + err := f.ReadIn(r) + if tt.wantErr != "" { + require.Error(t, err, tt.msg) + assert.Contains(t, err.Error(), tt.wantErr, tt.msg) + continue + } + + require.NoError(t, err, tt.msg) + assert.Equal(t, tt.wantFrameHeader, f.Header, "%v: header mismatch", tt.msg) + assert.Equal(t, tt.wantFramePayload, f.SizedPayload(), "%v: unexpected payload") + } +} + +func frameReadIn(bs []byte) (decoded bool) { + frame := DefaultFramePool.Get() + defer DefaultFramePool.Release(frame) + + defer func() { + if r := recover(); r != nil { + decoded = false + } + }() + frame.ReadIn(bytes.NewReader(bs)) + return true +} + +func TestQuickFrameReadIn(t *testing.T) { + // Try to read any set of bytes as a frame. + err := quick.Check(frameReadIn, &quick.Config{MaxCount: 10000}) + require.NoError(t, err, "Failed to fuzz test ReadIn") + + // Limit the search space to just headers. + err = quick.Check(func(size uint16, t byte, id uint32) bool { + bs := make([]byte, FrameHeaderSize) + binary.BigEndian.PutUint16(bs[0:2], size) + bs[2] = t + binary.BigEndian.PutUint32(bs[4:8], id) + return frameReadIn(bs) + }, &quick.Config{MaxCount: 10000}) + require.NoError(t, err, "Failed to fuzz test ReadIn") +} diff --git a/peer.go b/peer.go index 28178286..47ef6735 100644 --- a/peer.go +++ b/peer.go @@ -320,6 +320,7 @@ type Peer struct { channel Connectable hostPort string + onStatusChanged func(*Peer) onClosedConnRemoved func(*Peer) // scCount is the number of subchannels that this peer is added to. @@ -335,13 +336,17 @@ type Peer struct { onUpdate func(*Peer) } -func newPeer(channel Connectable, hostPort string, onClosedConnRemoved func(*Peer)) *Peer { +func newPeer(channel Connectable, hostPort string, onStatusChanged func(*Peer), onClosedConnRemoved func(*Peer)) *Peer { if hostPort == "" { panic("Cannot create peer with blank hostPort") } + if onStatusChanged == nil { + onStatusChanged = noopOnStatusChanged + } return &Peer{ channel: channel, hostPort: hostPort, + onStatusChanged: onStatusChanged, onClosedConnRemoved: onClosedConnRemoved, } } @@ -461,18 +466,21 @@ func (p *Peer) canRemove() bool { } // addConnection adds an active connection to the peer's connection list. -// If a connection is not active, ErrInvalidConnectionState is returned. +// If a connection is not active, returns ErrInvalidConnectionState. func (p *Peer) addConnection(c *Connection, direction connectionDirection) error { conns := p.connectionsFor(direction) - p.Lock() - defer p.Unlock() - if c.readState() != connectionActive { return ErrInvalidConnectionState } + p.Lock() *conns = append(*conns, c) + p.Unlock() + + // Inform third parties that a peer gained a connection. + p.onStatusChanged(p) + return nil } @@ -517,6 +525,8 @@ func (p *Peer) connectionCloseStateChange(changed *Connection) { if found { p.onClosedConnRemoved(p) + // Inform third parties that a peer lost a connection. + p.onStatusChanged(p) } } @@ -596,6 +606,8 @@ func (p *Peer) callOnUpdateComplete() { } } +func noopOnStatusChanged(*Peer) {} + // isEphemeralHostPort returns if hostPort is the default ephemeral hostPort. func isEphemeralHostPort(hostPort string) bool { return hostPort == "" || hostPort == ephemeralHostPort || strings.HasSuffix(hostPort, ":0") diff --git a/root_peer_list.go b/root_peer_list.go index ba62a6d9..127160fd 100644 --- a/root_peer_list.go +++ b/root_peer_list.go @@ -27,14 +27,16 @@ import "sync" type RootPeerList struct { sync.RWMutex - channel Connectable - peersByHostPort map[string]*Peer + channel Connectable + onPeerStatusChanged func(*Peer) + peersByHostPort map[string]*Peer } -func newRootPeerList(ch Connectable) *RootPeerList { +func newRootPeerList(ch Connectable, onPeerStatusChanged func(*Peer)) *RootPeerList { return &RootPeerList{ - channel: ch, - peersByHostPort: make(map[string]*Peer), + channel: ch, + onPeerStatusChanged: onPeerStatusChanged, + peersByHostPort: make(map[string]*Peer), } } @@ -65,7 +67,7 @@ func (l *RootPeerList) Add(hostPort string) *Peer { var p *Peer // To avoid duplicate connections, only the root list should create new // peers. All other lists should keep refs to the root list's peers. - p = newPeer(l.channel, hostPort, l.onClosedConnRemoved) + p = newPeer(l.channel, hostPort, l.onPeerStatusChanged, l.onClosedConnRemoved) l.peersByHostPort[hostPort] = p return p } diff --git a/scripts/install-thrift.sh b/scripts/install-thrift.sh new file mode 100755 index 00000000..a29d753f --- /dev/null +++ b/scripts/install-thrift.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +set -euo pipefail + +if [ -z "${1}" ]; then + echo "usage: ${0} installDirPath" >&2 + exit 1 +fi + +BIN_FILE="thrift-1" +TAR_FILE="${BIN_FILE}-$(uname -s | tr '[:upper:]' '[:lower:]')-$(uname -m).tar.gz" +TAR_LOCATION="https://github.com/uber/tchannel-go/releases/download/thrift-v1.0.0-dev/${TAR_FILE}" + +mkdir -p "${1}" +cd "${1}" +wget "${TAR_LOCATION}" +tar xzf "${TAR_FILE}" +rm -f "${TAR_FILE}" +mv "${BIN_FILE}" "thrift" diff --git a/scripts/travis/get-thrift.sh b/scripts/travis/get-thrift.sh deleted file mode 100755 index 4f0d2e32..00000000 --- a/scripts/travis/get-thrift.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -set -e - -cd "$(dirname "$0")" -rm -rf thrift-release.zip -wget https://github.com/prashantv/thrift/releases/download/p0.0.1/thrift-release.zip -unzip thrift-release.zip - diff --git a/testutils/channel_opts.go b/testutils/channel_opts.go index cff8478e..b40874f9 100644 --- a/testutils/channel_opts.go +++ b/testutils/channel_opts.go @@ -26,6 +26,7 @@ import ( "time" "github.com/uber/tchannel-go" + "github.com/uber/tchannel-go/tos" "github.com/uber-go/atomic" ) @@ -124,6 +125,12 @@ func (o *ChannelOpts) SetSendBufferSize(bufSize int) *ChannelOpts { return o } +// SetTosPriority set TosPriority in DefaultConnectionOptions. +func (o *ChannelOpts) SetTosPriority(tosPriority tos.ToS) *ChannelOpts { + o.DefaultConnectionOptions.TosPriority = tosPriority + return o +} + // SetTimeNow sets TimeNow in ChannelOptions. func (o *ChannelOpts) SetTimeNow(timeNow func() time.Time) *ChannelOpts { o.TimeNow = timeNow @@ -193,6 +200,13 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts { return o } +// SetOnPeerStatusChanged sets the callback for channel status change +// noficiations. +func (o *ChannelOpts) SetOnPeerStatusChanged(f func(*tchannel.Peer)) *ChannelOpts { + o.ChannelOptions.OnPeerStatusChanged = f + return o +} + func defaultString(v string, defaultValue string) string { if v == "" { return defaultValue diff --git a/tos/tos.go b/tos/tos.go new file mode 100644 index 00000000..0d6389c5 --- /dev/null +++ b/tos/tos.go @@ -0,0 +1,77 @@ +// Copyright (c) 2015 Uber Technologies, Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tos + +// ToS represents a const value DF, CS3 etc +// Assured Forwarding (x=class, y=drop precedence) (RFC2597) +// Class Selector (RFC 2474) +// IP Precedence (Linux Socket Compat RFC 791 +type ToS uint8 + +// Assured Forwarding (x=class, y=drop precedence) (RFC2597) +// Class Selector (RFC 2474) + +const ( + // CS3 Class Selector 3 + CS3 ToS = 0x18 + // CS4 Class Selector 4 + CS4 ToS = 0x20 + // CS5 Class Selector 5 + CS5 ToS = 0x28 + // CS6 Class Selector 6 + CS6 ToS = 0x30 + // CS7 Class Selector 7 + CS7 ToS = 0x38 + // AF11 Assured Forward 11 + AF11 ToS = 0x0a + // AF12 Assured Forward 11 + AF12 ToS = 0x0c + // AF13 Assured Forward 12 + AF13 ToS = 0x0e + // AF21 Assured Forward 13 + AF21 ToS = 0x12 + // AF22 Assured Forward 21 + AF22 ToS = 0x14 + // AF23 Assured Forward 22 + AF23 ToS = 0x16 + // AF31 Assured Forward 23 + AF31 ToS = 0x1a + // AF32 Assured Forward 31 + AF32 ToS = 0x1c + // AF33 Assured Forward 32 + AF33 ToS = 0x1e + // AF41 Assured Forward 33 + AF41 ToS = 0x22 + // AF42 Assured Forward 41 + AF42 ToS = 0x24 + // AF43 Assured Forward 42 + AF43 ToS = 0x26 + // EF Expedited Forwarding (RFC 3246) + EF ToS = 0x2e + // Lowdelay 10 + Lowdelay ToS = 0x10 + // Throughput 8 + Throughput ToS = 0x08 + // Reliability 4 + Reliability ToS = 0x04 + // Lowcost 2 + Lowcost ToS = 0x02 +) diff --git a/tos/tos_string.go b/tos/tos_string.go new file mode 100644 index 00000000..13e2d4cf --- /dev/null +++ b/tos/tos_string.go @@ -0,0 +1,53 @@ +package tos + +import "fmt" + +var ( + _tosNameToValue map[string]ToS + _tosValueToName = map[ToS]string{ + CS3: "CS3", + CS4: "CS4", + CS5: "CS5", + CS6: "CS6", + CS7: "CS7", + AF11: "AF11", + AF12: "AF12", + AF13: "AF13", + AF21: "AF21", + AF22: "AF22", + AF23: "AF23", + AF31: "AF31", + AF32: "AF32", + AF33: "AF33", + AF41: "AF41", + AF42: "AF42", + AF43: "AF43", + EF: "EF", + Lowdelay: "Lowdelay", + Throughput: "Throughput", + Reliability: "Reliability", + Lowcost: "Lowcost", + } +) + +func init() { + _tosNameToValue = make(map[string]ToS, len(_tosValueToName)) + for tos, tosString := range _tosValueToName { + _tosNameToValue[tosString] = tos + } +} + +// MarshalText implements TextMarshaler from encoding +func (r ToS) MarshalText() ([]byte, error) { + return []byte(_tosValueToName[r]), nil +} + +// UnmarshalText implements TextUnMarshaler from encoding +func (r *ToS) UnmarshalText(data []byte) error { + if v, ok := _tosNameToValue[string(data)]; ok { + *r = v + return nil + } + + return fmt.Errorf("invalid ToS %q", string(data)) +} diff --git a/tos/tos_test.go b/tos/tos_test.go new file mode 100644 index 00000000..2ae32528 --- /dev/null +++ b/tos/tos_test.go @@ -0,0 +1,47 @@ +// Copyright (c) 2015 Uber Technologies, Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tos + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMarshal(t *testing.T) { + for tos, wantMarshalled := range _tosValueToName { + marshalled, err := tos.MarshalText() + require.NoError(t, err, "Failed to marshal %v", tos) + assert.Equal(t, wantMarshalled, string(marshalled)) + + var got ToS + err = got.UnmarshalText(marshalled) + require.NoError(t, err, "Failed to unmarshal %v", string(marshalled)) + assert.Equal(t, tos, got) + } +} + +func TestUnmarshalUnknown(t *testing.T) { + var tos ToS + err := tos.UnmarshalText([]byte("unknown")) + require.Error(t, err, "Should fail to unmarshal unknown value") +} diff --git a/version.go b/version.go index c5f15152..697b0e96 100644 --- a/version.go +++ b/version.go @@ -23,4 +23,4 @@ package tchannel // VersionInfo identifies the version of the TChannel library. // Due to lack of proper package management, this version string will // be maintained manually. -const VersionInfo = "1.5.0" +const VersionInfo = "1.6.0"