From 1dac376256eff29092fb9a634c1e106e44eca5b6 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Thu, 31 Oct 2024 11:51:40 +0530 Subject: [PATCH] okhttp: Use failing "source" for read bytes when sending GOAWAY due to insufficient thread pool size Create `ClientFrameHandler` with failing source to be used in case of failed 2nd thread scheduling. Fixes NPE from https://github.com/grpc/grpc-java/pull/11503. --- .../io/grpc/okhttp/OkHttpClientTransport.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 2f6b836dc3a..59f824b1a3c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -515,21 +515,6 @@ public Runnable start(Listener listener) { serializingExecutor.execute(new Runnable() { @Override public void run() { - // This is a hack to make sure the connection preface and initial settings to be sent out - // without blocking the start. By doing this essentially prevents potential deadlock when - // network is not available during startup while another thread holding lock to send the - // initial preface. - try { - latch.await(); - barrier.await(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (TimeoutException | BrokenBarrierException e) { - startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE - .withDescription("Timed out waiting for second handshake thread. " - + "The transport executor pool may have run out of threads")); - return; - } // Use closed source on failure so that the reader immediately shuts down. BufferedSource source = Okio.buffer(new Source() { @Override @@ -549,6 +534,22 @@ public void close() { Socket sock; SSLSession sslSession = null; try { + // This is a hack to make sure the connection preface and initial settings to be sent out + // without blocking the start. By doing this essentially prevents potential deadlock when + // network is not available during startup while another thread holding lock to send the + // initial preface. + try { + latch.await(); + barrier.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TimeoutException | BrokenBarrierException e) { + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads")); + return; + } + if (proxiedAddr == null) { sock = socketFactory.createSocket(address.getAddress(), address.getPort()); } else { @@ -1459,4 +1460,4 @@ public void alternateService(int streamId, String origin, ByteString protocol, S // TODO(madongfly): Deal with alternateService propagation } } -} +} \ No newline at end of file