Skip to content

Commit

Permalink
Merge pull request #627 from uber/dev
Browse files Browse the repository at this point in the history
Version 1.6.0
  • Loading branch information
prashantv authored Jun 2, 2017
2 parents 0b7f160 + eab25af commit b99c1d7
Show file tree
Hide file tree
Showing 18 changed files with 604 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ lint.log
.idea
tchannel-go.iml
.vscode
.bin/
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

# v1.6.0

* Locks Apache Thrift to version 0.9.3 to maintain backward-compatibility.
* Add `OnPeerStatusChanged` channel option to receive a notification each time
the number of available connections changes for any given peer.
* Set DiffServ (QoS) bit on outbound connections.
* Improve resilience of the frame parser.

# v1.5.0

* Add `PeerList.Len` to expose the number of peers in the peer list.
Expand Down
38 changes: 19 additions & 19 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ SRCS := $(foreach pkg,$(PKGS),$(wildcard $(pkg)/*.go))

PLATFORM := $(shell uname -s | tr '[:upper:]' '[:lower:]')
ARCH := $(shell uname -m)
THRIFT_REL := ./scripts/travis/thrift-release/$(PLATFORM)-$(ARCH)

OLD_GOPATH := $(GOPATH)

export PATH := $(realpath $(THRIFT_REL)):$(PATH)
BIN := $(shell pwd)/.bin

# Cross language test args
TEST_HOST=127.0.0.1
Expand All @@ -37,6 +36,10 @@ TEST_PORT=0

all: test examples

$(BIN)/thrift:
mkdir -p $(BIN)
scripts/install-thrift.sh $(BIN)

packages_test:
go list -json ./... | jq -r '. | select ((.TestGoFiles | length) > 0) | .ImportPath'

Expand All @@ -46,9 +49,6 @@ setup:
mkdir -p $(THRIFT_GEN_RELEASE_LINUX)
mkdir -p $(THRIFT_GEN_RELEASE_DARWIN)

get_thrift:
scripts/travis/get-thrift.sh

# We want to remove `vendor` dir because thrift-gen tests don't work with it.
# However, glide install even with --cache-gopath option leaves GOPATH at HEAD,
# not at the desired versions from glide.lock, which are only applied to `vendor`
Expand All @@ -70,7 +70,7 @@ install_glide:
# but have to pin to 0.12.3 due to https://github.com/Masterminds/glide/issues/745
GOPATH=$(OLD_GOPATH) go get -u github.com/Masterminds/glide && cd $(OLD_GOPATH)/src/github.com/Masterminds/glide && git checkout v0.12.3 && go install

install_ci: install_glide install_lint get_thrift install
install_ci: $(BIN)/thrift install_glide install_lint install
GOPATH=$(OLD_GOPATH) go get -u github.com/mattn/goveralls
ifdef CROSSDOCK
$(MAKE) install_docker_ci
Expand Down Expand Up @@ -100,23 +100,23 @@ else
$(MAKE) test
endif

test: clean setup install_test check_no_test_deps
test: clean setup install_test check_no_test_deps $(BIN)/thrift
@echo Testing packages:
go test -parallel=4 $(TEST_ARG) $(ALL_PKGS)
PATH=$(BIN):$$PATH go test -parallel=4 $(TEST_ARG) $(ALL_PKGS)
@echo Running frame pool tests
go test -run TestFramesReleased -stressTest $(TEST_ARG)
PATH=$(BIN):$$PATH go test -run TestFramesReleased -stressTest $(TEST_ARG)

check_no_test_deps:
! go list -json $(PROD_PKGS) | jq -r .Deps[] | grep -e test -e mock

benchmark: clean setup
benchmark: clean setup $(BIN)/thrift
echo Running benchmarks:
go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE
PATH=$(BIN)::$$PATH go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE

cover_profile: clean setup
cover_profile: clean setup $(BIN)/thrift
@echo Testing packages:
mkdir -p $(BUILD)
go test ./ $(TEST_ARG) -coverprofile=$(BUILD)/coverage.out
PATH=$(BIN)::$$PATH go test ./ $(TEST_ARG) -coverprofile=$(BUILD)/coverage.out

cover: cover_profile
go tool cover -html=$(BUILD)/coverage.out
Expand Down Expand Up @@ -167,18 +167,18 @@ examples: clean setup thrift_example
go build -o $(BUILD)/examples/bench/runner ./examples/bench/runner.go
go build -o $(BUILD)/examples/test_server ./examples/test_server

thrift_gen:
thrift_gen: $(BIN)/thrift
go build -o $(BUILD)/thrift-gen ./thrift/thrift-gen
$(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/
$(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
PATH=$(BIN):$$PATH $(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go

release_thrift_gen: clean setup
GOOS=linux GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_LINUX)/thrift-gen ./thrift/thrift-gen
GOOS=darwin GOARCH=amd64 go build -o $(THRIFT_GEN_RELEASE_DARWIN)/thrift-gen ./thrift/thrift-gen
tar -czf thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)
mv thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)/

.PHONY: all help clean fmt format get_thrift install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint
.PHONY: all help clean fmt format install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint
.SILENT: all help clean fmt format test lint
10 changes: 8 additions & 2 deletions all_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

package tchannel

import "sync"
import (
"fmt"
"sync"
)

// channelMap is used to ensure that applications don't create multiple channels with
// the same service name in a single process.
Expand All @@ -34,7 +37,10 @@ var channelMap = struct {
func registerNewChannel(ch *Channel) {
serviceName := ch.ServiceName()
ch.createdStack = string(getStacks(false /* all */))
ch.log.Debugf("NewChannel created at %s", ch.createdStack)
ch.log.WithFields(
LogField{"channelPtr", fmt.Sprintf("%p", ch)},
LogField{"createdStack", ch.createdStack},
).Info("Created new channel.")

channelMap.Lock()
defer channelMap.Unlock()
Expand Down
29 changes: 17 additions & 12 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ type ChannelOptions struct {
// The name of the process, for logging and reporting to peers
ProcessName string

// OnPeerStatusChanged
// OnPeerStatusChanged is an optional callback that receives a notification
// whenever the channel establishes a usable connection to a peer, or loses
// a connection to a peer.
OnPeerStatusChanged func(*Peer)

// The logger to use for this channel
Expand Down Expand Up @@ -123,14 +125,15 @@ const (
type Channel struct {
channelConnectionCommon

chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
handler Handler
chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
handler Handler
onPeerStatusChanged func(*Peer)

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -220,7 +223,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
}
ch.peers = newRootPeerList(ch).newChild()
ch.peers = newRootPeerList(ch, opts.OnPeerStatusChanged).newChild()

if opts.Handler != nil {
ch.handler = opts.Handler
Expand Down Expand Up @@ -287,7 +290,9 @@ func (ch *Channel) Serve(l net.Listener) error {
ch.log = ch.log.WithFields(LogField{"hostPort", mutable.peerInfo.HostPort})

peerInfo := mutable.peerInfo
ch.log.Debugf("%v (%v) listening on %v", peerInfo.ProcessName, peerInfo.ServiceName, peerInfo.HostPort)
ch.log.WithFields(
LogField{"hostPort", peerInfo.HostPort},
).Info("Channel is listening.")
go ch.serve()
return nil
}
Expand Down Expand Up @@ -738,7 +743,7 @@ func (ch *Channel) State() ChannelState {
// Close starts a graceful Close for the channel. This does not happen immediately:
// 1. This call closes the Listener and starts closing connections.
// 2. When all incoming connections are drained, the connection blocks new outgoing calls.
// 3. When all connections are drainged, the channel's state is updated to Closed.
// 3. When all connections are drained, the channel's state is updated to Closed.
func (ch *Channel) Close() {
ch.Logger().Info("Channel.Close called.")
var connections []*Connection
Expand Down
40 changes: 30 additions & 10 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ import (
"sync"
"time"

"github.com/uber/tchannel-go/tos"

"github.com/uber-go/atomic"
"golang.org/x/net/context"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

const (
Expand Down Expand Up @@ -129,6 +133,9 @@ type ConnectionOptions struct {

// The type of checksum to use when sending messages.
ChecksumType ChecksumType

// ToS class name marked on outbound packets.
TosPriority tos.ToS
}

// connectionEvents are the events that can be triggered by a connection.
Expand Down Expand Up @@ -176,10 +183,6 @@ type Connection struct {
stoppedExchanges atomic.Uint32
// pendingMethods is the number of methods running that may block closing of sendCh.
pendingMethods atomic.Int64
// ignoreRemotePeer is used to avoid a data race between setting the RemotePeerInfo
// and the connection failing, causing a read of the RemotePeerInfo at the same time.
ignoreRemotePeer bool

// remotePeerAddress is used as a cache for remote peer address parsed into individual
// components that can be used to set peer tags on OpenTracing Span.
remotePeerAddress peerAddressComponents
Expand Down Expand Up @@ -237,6 +240,23 @@ func (co ConnectionOptions) withDefaults() ConnectionOptions {
return co
}

func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) error {
tcpAddr, isTCP := c.RemoteAddr().(*net.TCPAddr)
if !isTCP {
return nil
}

// Handle dual stack listeners and set Traffic Class.
var err error
switch ip := tcpAddr.IP; {
case ip.To16() != nil && ip.To4() == nil:
err = ipv6.NewConn(c).SetTrafficClass(int(tosPriority))
case ip.To4() != nil:
err = ipv4.NewConn(c).SetTOS(int(tosPriority))
}
return err
}

func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection {
opts := ch.connectionOptions.withDefaults()

Expand Down Expand Up @@ -279,6 +299,12 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
commonStatsTags: ch.commonStatsTags,
}

if tosPriority := opts.TosPriority; tosPriority > 0 {
if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil {
log.WithFields(ErrField(err)).Error("Failed to set ToS priority.")
}
}

c.nextMessageID.Store(initialID)
c.log = log
c.inbound.onRemoved = c.checkExchanges
Expand Down Expand Up @@ -501,12 +527,6 @@ func (c *Connection) logConnectionError(site string, err error) error {

// connectionError handles a connection level error
func (c *Connection) connectionError(site string, err error) error {
// Avoid racing with setting the peer info.
c.withStateLock(func() error {
c.ignoreRemotePeer = true
return nil
})

var closeLogFields LogFields
if err == io.EOF {
closeLogFields = LogFields{{"reason", "network connection EOF"}}
Expand Down
Loading

0 comments on commit b99c1d7

Please sign in to comment.