Skip to content

Commit

Permalink
okhttp: Use failing "source" for read bytes when sending GOAWAY due t…
Browse files Browse the repository at this point in the history
…o insufficient thread pool size

Create `ClientFrameHandler` with failing source to be used in case of failed 2nd thread scheduling. Fixes NPE from #11503.
  • Loading branch information
kannanjgithub authored and ejona86 committed Nov 27, 2024
1 parent e225c23 commit 1dac376
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,6 @@ public Runnable start(Listener listener) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
// This is a hack to make sure the connection preface and initial settings to be sent out
// without blocking the start. By doing this essentially prevents potential deadlock when
// network is not available during startup while another thread holding lock to send the
// initial preface.
try {
latch.await();
barrier.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException | BrokenBarrierException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
.withDescription("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads"));
return;
}
// Use closed source on failure so that the reader immediately shuts down.
BufferedSource source = Okio.buffer(new Source() {
@Override
Expand All @@ -549,6 +534,22 @@ public void close() {
Socket sock;
SSLSession sslSession = null;
try {
// This is a hack to make sure the connection preface and initial settings to be sent out
// without blocking the start. By doing this essentially prevents potential deadlock when
// network is not available during startup while another thread holding lock to send the
// initial preface.
try {
latch.await();
barrier.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException | BrokenBarrierException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
.withDescription("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads"));
return;
}

if (proxiedAddr == null) {
sock = socketFactory.createSocket(address.getAddress(), address.getPort());
} else {
Expand Down Expand Up @@ -1459,4 +1460,4 @@ public void alternateService(int streamId, String origin, ByteString protocol, S
// TODO(madongfly): Deal with alternateService propagation
}
}
}
}

0 comments on commit 1dac376

Please sign in to comment.