Skip to content

Commit

Permalink
Don't delete response collectors in a transaction (#369)
Browse files Browse the repository at this point in the history
## What is the goal of this PR?

We no longer delete response collectors in a transaction after receiving a response to a "single" request, or receiving a "DONE" message in a stream. This fixes a possible error when loading 50+ answers in one query and then performing a second query.

## What are the changes implemented in this PR?

See typedb/typedb-driver-python#250, which this PR is a copy of.
  • Loading branch information
alexjpwalker authored Jan 27, 2022
1 parent 63df336 commit 7216628
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 31 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.1
2.6.2
2 changes: 1 addition & 1 deletion common/exception/ErrorMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static class Client extends ErrorMessage {
public static final Client MISSING_RESPONSE =
new Client(9, "Unexpected empty response for request ID '%s'.");
public static final Client UNKNOWN_REQUEST_ID =
new Client(10, "Received a response with unknown request id '%s'.");
new Client(10, "Received a response with unknown request id '%s':\n%s");
public static final Client CLUSTER_NO_PRIMARY_REPLICA_YET =
new Client(11, "No replica has been marked as the primary replica for latest known term '%d'.");
public static final Client CLUSTER_UNABLE_TO_CONNECT =
Expand Down
26 changes: 5 additions & 21 deletions stream/BidirectionalStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import io.grpc.stub.StreamObserver;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -71,7 +69,7 @@ public Single<Res> single(Req.Builder request, boolean batch) {
ResponseCollector.Queue<Res> queue = resCollector.queue(requestID);
if (batch) dispatcher.dispatch(req);
else dispatcher.dispatchNow(req);
return new Single<>(requestID, queue, this);
return new Single<>(queue);
}

public Stream<ResPart> stream(Req.Builder request) {
Expand All @@ -90,14 +88,14 @@ private void collect(Res res) {
UUID requestID = byteStringAsUUID(res.getReqId());
ResponseCollector.Queue<Res> collector = resCollector.get(requestID);
if (collector != null) collector.put(res);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID, res);
}

private void collect(ResPart resPart) {
UUID requestID = byteStringAsUUID(resPart.getReqId());
ResponseCollector.Queue<ResPart> collector = resPartCollector.get(requestID);
if (collector != null) collector.put(resPart);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID, resPart);
}

private static UUID byteStringAsUUID(ByteString byteString) {
Expand All @@ -122,14 +120,6 @@ private void close(@Nullable StatusRuntimeException error) {
}
}

void singleDone(UUID requestID) {
resCollector.remove(requestID);
}

void iteratorDone(UUID requestID) {
resPartCollector.remove(requestID);
}

public Optional<StatusRuntimeException> getError() {
return Optional.ofNullable(error);
}
Expand All @@ -140,20 +130,14 @@ RequestTransmitter.Dispatcher dispatcher() {

public static class Single<T> {

private final UUID requestID;
private final BidirectionalStream stream;
private final ResponseCollector.Queue<T> queue;

public Single(UUID requestID, ResponseCollector.Queue<T> queue, BidirectionalStream stream) {
this.requestID = requestID;
public Single(ResponseCollector.Queue<T> queue) {
this.queue = queue;
this.stream = stream;
}

public T get() {
T value = queue.take();
stream.singleDone(requestID);
return value;
return queue.take();
}
}

Expand Down
7 changes: 0 additions & 7 deletions stream/ResponseCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
import io.grpc.StatusRuntimeException;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -57,10 +54,6 @@ Queue<R> get(UUID requestId) {
return collectors.get(requestId);
}

void remove(UUID requestID) {
this.collectors.remove(requestID);
}

synchronized void close(@Nullable StatusRuntimeException error) {
collectors.values().forEach(collector -> collector.close(error));
}
Expand Down
1 change: 0 additions & 1 deletion stream/ResponsePartIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ private boolean fetchAndCheck() {
case STREAM_RES_PART:
switch (resPart.getStreamResPart().getState()) {
case DONE:
stream.iteratorDone(requestID);
state = State.DONE;
return false;
case CONTINUE:
Expand Down
20 changes: 20 additions & 0 deletions test/integration/ClientQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.vaticle.typedb.client.api.TypeDBSession;
import com.vaticle.typedb.client.api.TypeDBTransaction;
import com.vaticle.typedb.client.api.answer.ConceptMap;
import com.vaticle.typedb.client.api.concept.type.AttributeType;
import com.vaticle.typedb.client.api.concept.type.EntityType;
import com.vaticle.typedb.client.api.logic.Explanation;
import com.vaticle.typedb.common.test.server.TypeDBCoreRunner;
import com.vaticle.typeql.lang.TypeQL;
Expand All @@ -42,6 +44,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -518,6 +521,23 @@ public void testSimpleExplanation() {
}, READ, TypeDBOptions.core().infer(true).explain(true));
}

@Test
public void testStreaming() {
localhostTypeDBTX(tx -> {
for (int i = 0; i < 51; i++) {
tx.query().define(String.format("define person sub entity, owns name%d; name%d sub attribute, value string;", i, i));
}
tx.commit();
}, TypeDBSession.Type.SCHEMA);
localhostTypeDBTX(tx -> {
for (int i = 0; i < 50; i++) {
EntityType.Remote concept = tx.concepts().getEntityType("person").asRemote(tx);
List<? extends AttributeType> attributeTypes = concept.getOwns(false).collect(toList());
Optional<ConceptMap> conceptMap = tx.query().match("match $x sub thing; limit 1;").findFirst();
}
}, READ, TypeDBOptions.core().prefetch(true).prefetchSize(50));
}

private String[] lionNames() {
return new String[]{"male-partner", "female-partner", "young-lion"};
}
Expand Down

0 comments on commit 7216628

Please sign in to comment.