From 7216628a7cdaf04a1d4e054a6555268a09c869fc Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Thu, 27 Jan 2022 12:26:10 +0000 Subject: [PATCH] Don't delete response collectors in a transaction (#369) ## What is the goal of this PR? We no longer delete response collectors in a transaction after receiving a response to a "single" request, or receiving a "DONE" message in a stream. This fixes a possible error when loading 50+ answers in one query and then performing a second query. ## What are the changes implemented in this PR? See https://github.com/vaticle/typedb-client-python/pull/250, which this PR is a copy of. --- VERSION | 2 +- common/exception/ErrorMessage.java | 2 +- stream/BidirectionalStream.java | 26 +++++--------------------- stream/ResponseCollector.java | 7 ------- stream/ResponsePartIterator.java | 1 - test/integration/ClientQueryTest.java | 20 ++++++++++++++++++++ 6 files changed, 27 insertions(+), 31 deletions(-) diff --git a/VERSION b/VERSION index 6a6a3d8e35..097a15a2af 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.6.1 +2.6.2 diff --git a/common/exception/ErrorMessage.java b/common/exception/ErrorMessage.java index 86d3157f2d..2f26206929 100644 --- a/common/exception/ErrorMessage.java +++ b/common/exception/ErrorMessage.java @@ -47,7 +47,7 @@ public static class Client extends ErrorMessage { public static final Client MISSING_RESPONSE = new Client(9, "Unexpected empty response for request ID '%s'."); public static final Client UNKNOWN_REQUEST_ID = - new Client(10, "Received a response with unknown request id '%s'."); + new Client(10, "Received a response with unknown request id '%s':\n%s"); public static final Client CLUSTER_NO_PRIMARY_REPLICA_YET = new Client(11, "No replica has been marked as the primary replica for latest known term '%d'."); public static final Client CLUSTER_UNABLE_TO_CONNECT = diff --git a/stream/BidirectionalStream.java b/stream/BidirectionalStream.java index 611aa4dc83..ede0a2530e 100644 --- a/stream/BidirectionalStream.java +++ b/stream/BidirectionalStream.java @@ -32,8 +32,6 @@ import io.grpc.stub.StreamObserver; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +69,7 @@ public Single single(Req.Builder request, boolean batch) { ResponseCollector.Queue queue = resCollector.queue(requestID); if (batch) dispatcher.dispatch(req); else dispatcher.dispatchNow(req); - return new Single<>(requestID, queue, this); + return new Single<>(queue); } public Stream stream(Req.Builder request) { @@ -90,14 +88,14 @@ private void collect(Res res) { UUID requestID = byteStringAsUUID(res.getReqId()); ResponseCollector.Queue collector = resCollector.get(requestID); if (collector != null) collector.put(res); - else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID); + else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID, res); } private void collect(ResPart resPart) { UUID requestID = byteStringAsUUID(resPart.getReqId()); ResponseCollector.Queue collector = resPartCollector.get(requestID); if (collector != null) collector.put(resPart); - else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID); + else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID, resPart); } private static UUID byteStringAsUUID(ByteString byteString) { @@ -122,14 +120,6 @@ private void close(@Nullable StatusRuntimeException error) { } } - void singleDone(UUID requestID) { - resCollector.remove(requestID); - } - - void iteratorDone(UUID requestID) { - resPartCollector.remove(requestID); - } - public Optional getError() { return Optional.ofNullable(error); } @@ -140,20 +130,14 @@ RequestTransmitter.Dispatcher dispatcher() { public static class Single { - private final UUID requestID; - private final BidirectionalStream stream; private final ResponseCollector.Queue queue; - public Single(UUID requestID, ResponseCollector.Queue queue, BidirectionalStream stream) { - this.requestID = requestID; + public Single(ResponseCollector.Queue queue) { this.queue = queue; - this.stream = stream; } public T get() { - T value = queue.take(); - stream.singleDone(requestID); - return value; + return queue.take(); } } diff --git a/stream/ResponseCollector.java b/stream/ResponseCollector.java index 440aec3ee1..88384e4105 100644 --- a/stream/ResponseCollector.java +++ b/stream/ResponseCollector.java @@ -26,9 +26,6 @@ import io.grpc.StatusRuntimeException; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -57,10 +54,6 @@ Queue get(UUID requestId) { return collectors.get(requestId); } - void remove(UUID requestID) { - this.collectors.remove(requestID); - } - synchronized void close(@Nullable StatusRuntimeException error) { collectors.values().forEach(collector -> collector.close(error)); } diff --git a/stream/ResponsePartIterator.java b/stream/ResponsePartIterator.java index aeb860bb46..af71d9b4a0 100644 --- a/stream/ResponsePartIterator.java +++ b/stream/ResponsePartIterator.java @@ -60,7 +60,6 @@ private boolean fetchAndCheck() { case STREAM_RES_PART: switch (resPart.getStreamResPart().getState()) { case DONE: - stream.iteratorDone(requestID); state = State.DONE; return false; case CONTINUE: diff --git a/test/integration/ClientQueryTest.java b/test/integration/ClientQueryTest.java index 077a1155b3..90e9aeb7bc 100644 --- a/test/integration/ClientQueryTest.java +++ b/test/integration/ClientQueryTest.java @@ -27,6 +27,8 @@ import com.vaticle.typedb.client.api.TypeDBSession; import com.vaticle.typedb.client.api.TypeDBTransaction; import com.vaticle.typedb.client.api.answer.ConceptMap; +import com.vaticle.typedb.client.api.concept.type.AttributeType; +import com.vaticle.typedb.client.api.concept.type.EntityType; import com.vaticle.typedb.client.api.logic.Explanation; import com.vaticle.typedb.common.test.server.TypeDBCoreRunner; import com.vaticle.typeql.lang.TypeQL; @@ -42,6 +44,7 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -518,6 +521,23 @@ public void testSimpleExplanation() { }, READ, TypeDBOptions.core().infer(true).explain(true)); } + @Test + public void testStreaming() { + localhostTypeDBTX(tx -> { + for (int i = 0; i < 51; i++) { + tx.query().define(String.format("define person sub entity, owns name%d; name%d sub attribute, value string;", i, i)); + } + tx.commit(); + }, TypeDBSession.Type.SCHEMA); + localhostTypeDBTX(tx -> { + for (int i = 0; i < 50; i++) { + EntityType.Remote concept = tx.concepts().getEntityType("person").asRemote(tx); + List attributeTypes = concept.getOwns(false).collect(toList()); + Optional conceptMap = tx.query().match("match $x sub thing; limit 1;").findFirst(); + } + }, READ, TypeDBOptions.core().prefetch(true).prefetchSize(50)); + } + private String[] lionNames() { return new String[]{"male-partner", "female-partner", "young-lion"}; }