Skip to content

Commit

Permalink
Fixed hang from not getting interrupted.
Browse files Browse the repository at this point in the history
Added some 'final' to class definitions.
  • Loading branch information
larry-safran committed Sep 19, 2024
1 parent 75f956d commit 90caf39
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
16 changes: 12 additions & 4 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
* @throws StatusException if the write to the server failed
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
public static <ReqT, RespT> BlockingClientCall<?, RespT> blockingV2ServerStreamingCall(
public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingV2ServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req)
throws InterruptedException, StatusException {
BlockingClientCall<ReqT, RespT> call =
Expand Down Expand Up @@ -436,7 +436,7 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
abstract void onStart();
}

private static class CallToStreamObserverAdapter<ReqT>
private static final class CallToStreamObserverAdapter<ReqT>
extends ClientCallStreamObserver<ReqT> {
private boolean frozen;
private final ClientCall<ReqT, ?> call;
Expand Down Expand Up @@ -787,7 +787,7 @@ void onStart() {
}

@SuppressWarnings("serial")
static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());

Expand All @@ -804,12 +804,14 @@ static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
* Must only be called by one thread at a time.
*/
public void waitAndDrain() throws InterruptedException {
throwIfInterrupted();
Runnable runnable = poll();
if (runnable == null) {
waiter = Thread.currentThread();
try {
while ((runnable = poll()) == null) {
LockSupport.park(this);
throwIfInterrupted();
}
} finally {
waiter = null;
Expand All @@ -820,6 +822,12 @@ public void waitAndDrain() throws InterruptedException {
} while ((runnable = poll()) != null);
}

private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

/**
* Called after final call to {@link #waitAndDrain()}, from same thread.
*/
Expand Down Expand Up @@ -857,7 +865,7 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new ReentrantLock();
private final Lock waiterLock = new ReentrantLock();
private final Condition waiterCondition = waiterLock.newCondition();

// Non private to avoid synthetic class
Expand Down
2 changes: 1 addition & 1 deletion stub/src/test/java/io/grpc/stub/ClientCallsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,8 @@ public <ReqT,RespT> ClientCall<ReqT, RespT> interceptCall(
}

@Override public void halfClose() {
Thread.currentThread().interrupt();
super.halfClose();
Thread.currentThread().interrupt();
}
};
}
Expand Down

0 comments on commit 90caf39

Please sign in to comment.