From 90a65999799767a4bd9683d2649dc551e0092ead Mon Sep 17 00:00:00 2001 From: Prashant Varanasi Date: Wed, 30 Nov 2016 11:30:21 -0800 Subject: [PATCH] relay: Don't attempt concurrent connections to the same peer (#542) We avoid concurrnet connections on peer.GetConnection, but the relay path was missing the same checks. * Use atomic bool for delayed --- peer.go | 13 +++++++++++-- relay_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/peer.go b/peer.go index 14ac3b4a..10b891f1 100644 --- a/peer.go +++ b/peer.go @@ -360,13 +360,22 @@ func (p *Peer) GetConnection(ctx context.Context) (*Connection, error) { return p.Connect(ctx) } -// getConnectionRelay gets a connection, and uses the given timeout if a new -// connection is required. +// getConnectionRelay gets a connection, and uses the given timeout to lazily +// create a context if a new connection is required. func (p *Peer) getConnectionRelay(timeout time.Duration) (*Connection, error) { if conn, ok := p.getActiveConn(); ok { return conn, nil } + // Lock here to restrict new connection creation attempts to one goroutine + p.newConnLock.Lock() + defer p.newConnLock.Unlock() + + // Check active connections again in case someone else got ahead of us. + if activeConn, ok := p.getActiveConn(); ok { + return activeConn, nil + } + // When the relay creates outbound connections, we don't want those services // to ever connect back to us and send us traffic. We hide the host:port // so that service instances on remote machines don't try to connect back diff --git a/relay_test.go b/relay_test.go index 1ebdf6d7..0d48db9b 100644 --- a/relay_test.go +++ b/relay_test.go @@ -719,3 +719,48 @@ func TestRelayThroughSeparateRelay(t *testing.T) { assert.Equal(t, 1, numConns(introspected.RootPeers[serverHP]), "Expected 1 connection from relay2 to server") }) } + +func TestRelayConcurrentNewConnectionAttempts(t *testing.T) { + opts := testutils.NewOpts().SetRelayOnly() + testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) { + // Create a server that is slow to accept connections by using + // a frame relay to slow down the initial message. + slowServer := testutils.NewServer(t, serviceNameOpts("slow-server")) + defer slowServer.Close() + testutils.RegisterEcho(slowServer, nil) + + var delayed atomic.Bool + relayFunc := func(outgoing bool, f *Frame) *Frame { + if !delayed.Load() { + time.Sleep(testutils.Timeout(50 * time.Millisecond)) + delayed.Store(true) + } + return f + } + + slowHP, close := testutils.FrameRelay(t, slowServer.PeerInfo().HostPort, relayFunc) + defer close() + ts.RelayHost().Add("slow-server", slowHP) + + // Make concurrent calls to trigger concurrent getConnectionRelay calls. + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + // Create client and get dest host:port in the main goroutine to avoid races. + client := ts.NewClient(nil) + relayHostPort := ts.HostPort() + go func() { + defer wg.Done() + testutils.AssertEcho(t, client, relayHostPort, "slow-server") + }() + } + wg.Wait() + + // Verify that the slow server only received a single connection. + inboundConns := 0 + for _, state := range slowServer.IntrospectState(nil).RootPeers { + inboundConns += len(state.InboundConnections) + } + assert.Equal(t, 1, inboundConns, "Expected a single inbound connection to the server") + }) +}