Skip to content

Commit

Permalink
relay: Don't attempt concurrent connections to the same peer (#542)
Browse files Browse the repository at this point in the history
We avoid concurrnet connections on peer.GetConnection, but the relay
path was missing the same checks.

* Use atomic bool for delayed
  • Loading branch information
prashantv authored Nov 30, 2016
1 parent e13b2fc commit 90a6599
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
13 changes: 11 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,22 @@ func (p *Peer) GetConnection(ctx context.Context) (*Connection, error) {
return p.Connect(ctx)
}

// getConnectionRelay gets a connection, and uses the given timeout if a new
// connection is required.
// getConnectionRelay gets a connection, and uses the given timeout to lazily
// create a context if a new connection is required.
func (p *Peer) getConnectionRelay(timeout time.Duration) (*Connection, error) {
if conn, ok := p.getActiveConn(); ok {
return conn, nil
}

// Lock here to restrict new connection creation attempts to one goroutine
p.newConnLock.Lock()
defer p.newConnLock.Unlock()

// Check active connections again in case someone else got ahead of us.
if activeConn, ok := p.getActiveConn(); ok {
return activeConn, nil
}

// When the relay creates outbound connections, we don't want those services
// to ever connect back to us and send us traffic. We hide the host:port
// so that service instances on remote machines don't try to connect back
Expand Down
45 changes: 45 additions & 0 deletions relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,48 @@ func TestRelayThroughSeparateRelay(t *testing.T) {
assert.Equal(t, 1, numConns(introspected.RootPeers[serverHP]), "Expected 1 connection from relay2 to server")
})
}

func TestRelayConcurrentNewConnectionAttempts(t *testing.T) {
opts := testutils.NewOpts().SetRelayOnly()
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
// Create a server that is slow to accept connections by using
// a frame relay to slow down the initial message.
slowServer := testutils.NewServer(t, serviceNameOpts("slow-server"))
defer slowServer.Close()
testutils.RegisterEcho(slowServer, nil)

var delayed atomic.Bool
relayFunc := func(outgoing bool, f *Frame) *Frame {
if !delayed.Load() {
time.Sleep(testutils.Timeout(50 * time.Millisecond))
delayed.Store(true)
}
return f
}

slowHP, close := testutils.FrameRelay(t, slowServer.PeerInfo().HostPort, relayFunc)
defer close()
ts.RelayHost().Add("slow-server", slowHP)

// Make concurrent calls to trigger concurrent getConnectionRelay calls.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
// Create client and get dest host:port in the main goroutine to avoid races.
client := ts.NewClient(nil)
relayHostPort := ts.HostPort()
go func() {
defer wg.Done()
testutils.AssertEcho(t, client, relayHostPort, "slow-server")
}()
}
wg.Wait()

// Verify that the slow server only received a single connection.
inboundConns := 0
for _, state := range slowServer.IntrospectState(nil).RootPeers {
inboundConns += len(state.InboundConnections)
}
assert.Equal(t, 1, inboundConns, "Expected a single inbound connection to the server")
})
}

0 comments on commit 90a6599

Please sign in to comment.