Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spliterator refactoring #37

Merged
merged 9 commits into from
Mar 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final List<Stream<?>> openedStreams = new ArrayList<>();
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();

@Getter
private final TxOptions options;
Expand All @@ -101,10 +101,10 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
}

private <V> Stream<V> makeStream(YdbSpliterator<V> spliterator) {
Stream<V> stream = spliterator.makeStream();
openedStreams.add(stream);
return stream;
private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
spliterators.add(spliterator);
return spliterator;
}

@Override
Expand Down Expand Up @@ -152,9 +152,9 @@ private void doCommit() {

private void closeStreams() {
Exception summaryException = null;
for (Stream<?> stream : openedStreams) {
for (YdbSpliterator<?> spliterator : spliterators) {
try {
stream.close();
spliterator.close();
} catch (Exception e) {
if (summaryException == null) {
summaryException = e;
Expand Down Expand Up @@ -381,15 +381,15 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>("scanQuery: " + yql, false);
YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
yql, sdkParams, settings,
rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);

return makeStream(spliterator);
return spliterator.createStream();
}

@Override
Expand Down Expand Up @@ -482,15 +482,15 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
}

if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>("readTable: " + tableName, params.isOrdered());
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
tableName, settings.build(),
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);

return makeStream(spliterator);
return spliterator.createStream();
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.yandex.ydb.core.Status;
import lombok.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
Expand All @@ -25,6 +24,8 @@
/**
* {@code YdbSpliterator} is used for read data from YDB streams.
* It's possible to supply values from different threads, but supplier threads mustn't call onNext concurrently.
* <p>
* Should be closed by close() method for finish work in session.
*/
class YdbSpliterator<V> implements Spliterator<V> {
private static final Logger log = LoggerFactory.getLogger(YdbSpliterator.class);
Expand All @@ -38,7 +39,7 @@ class YdbSpliterator<V> implements Spliterator<V> {
private final BlockingQueue<QueueValue<V>> queue = new ArrayBlockingQueue<>(1);
private final BiConsumer<Status, Throwable> validateResponse;

private volatile boolean streamClosed = false;
private volatile boolean closed = false;

private boolean endData = false;

Expand All @@ -63,15 +64,15 @@ private long calculateTimeout() {
}

// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
public Stream<V> makeStream() {
return StreamSupport.stream(this, false).onClose(this::onStreamClose);
public Stream<V> createStream() {
return StreamSupport.stream(this, false).onClose(this::close);
}

// (supplier thread) Send data to stream thread.
public void onNext(V value) {
if (streamClosed) {
if (closed) {
// Need to abort supplier thread if stream is closed. onSupplierThreadComplete will exit immediately.
// ConsumerDoneException isn't handled because onSupplierThreadComplete will exit by streamClosed.
// ConsumerDoneException isn't handled because onSupplierThreadComplete will exit by this.closed.
throw ConsumerDoneException.INSTANCE;
}

Expand All @@ -89,7 +90,7 @@ public void onNext(V value) {
// (supplier thread) Send knowledge to stream when data is over.
public void onSupplierThreadComplete(Status status, Throwable ex) {
ex = unwrapException(ex);
if (ex instanceof OfferDeadlineExceededException || streamClosed) {
if (ex instanceof OfferDeadlineExceededException || closed) {
// If deadline exceeded happen, need to do nothing. Stream thread will exit at deadline by themself.
return;
}
Expand Down Expand Up @@ -128,21 +129,25 @@ public boolean tryAdvance(Consumer<? super V> action) {
throw new DeadlineExceededException("Stream deadline exceeded on poll");
}

if (value.isEndData()) {
if (value.endData()) {
endData = true;
validateResponse.accept(value.getStatus(), value.getError());
validateResponse.accept(value.status(), value.error());
return false;
}

action.accept(value.getValue());
action.accept(value.value());
return true;
}

// (stream thread) callback on stream.close()
@VisibleForTesting
protected void onStreamClose() {
streamClosed = true;
// Abort offer in supplier thread. onNext() will look at streamClosed and exit immediately.
// (stream thread) close spliterator and abort supplier thread
public void close() {
// close() can be called twice by stream.close() and in the end of transaction
if (closed) {
return;
}

closed = true;
// Abort offer in supplier thread. onNext() will look at this.closed and exit immediately.
// onSupplierThreadComplete() just will exit.
queue.clear();
}
Expand All @@ -167,13 +172,12 @@ public int characteristics() {
return flags;
}

@Value
private static class QueueValue<V> {
V value;
Status status;
Throwable error;
boolean endData;

private record QueueValue<V>(
V value,
Status status,
Throwable error,
boolean endData
) {
public static <V> QueueValue<V> of(V value) {
return new QueueValue<>(value, null, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void run() {
}

public Stream<Integer> stream() {
return spliterator.makeStream();
return spliterator.createStream();
}

public void interrupt() {
Expand Down Expand Up @@ -205,7 +205,7 @@ public void endStreamWhenSupplerOfferValue() {
spliterator.onNext(1);

// wait for block on onNext(2) and close stream
var thread = new TestingThread(() -> doAfter(100, spliterator::onStreamClose));
var thread = new TestingThread(() -> doAfter(100, spliterator::close));
thread.start();

spliterator.onNext(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final List<Stream<?>> openedStreams = new ArrayList<>();
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();

@Getter
private final TxOptions options;
Expand All @@ -102,10 +102,10 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
}

private <V> Stream<V> makeStream(YdbSpliterator<V> spliterator) {
Stream<V> stream = spliterator.makeStream();
openedStreams.add(stream);
return stream;
private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
spliterators.add(spliterator);
return spliterator;
}

@Override
Expand Down Expand Up @@ -153,9 +153,9 @@ private void doCommit() {

private void closeStreams() {
Exception summaryException = null;
for (Stream<?> stream : openedStreams) {
for (YdbSpliterator<?> spliterator : spliterators) {
try {
stream.close();
spliterator.close();
} catch (Exception e) {
if (summaryException == null) {
summaryException = e;
Expand Down Expand Up @@ -381,15 +381,15 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>("scanQuery: " + yql, false);
YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
yql, sdkParams, settings,
rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);

return makeStream(spliterator);
return spliterator.createStream();
}

@Override
Expand Down Expand Up @@ -483,15 +483,15 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
}

if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>("readTable: " + tableName, params.isOrdered());
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
tableName, settings.build(),
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);

return makeStream(spliterator);
return spliterator.createStream();
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tech.ydb.yoj.repository.ydb;

import com.google.common.annotations.VisibleForTesting;
import lombok.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
Expand All @@ -26,6 +25,8 @@
/**
* {@code YdbSpliterator} is used for read data from YDB streams.
* It's possible to supply values from different threads, but supplier threads mustn't call onNext concurrently.
* <p>
* Should be closed by close() method for finish work in session.
*/
public class YdbSpliterator<V> implements Spliterator<V> {
private static final Logger log = LoggerFactory.getLogger(YdbSpliterator.class);
Expand All @@ -39,7 +40,7 @@ public class YdbSpliterator<V> implements Spliterator<V> {
private final BlockingQueue<QueueValue<V>> queue = new ArrayBlockingQueue<>(1);
private final BiConsumer<Status, Throwable> validateResponse;

private volatile boolean streamClosed = false;
private volatile boolean closed = false;

private boolean endData = false;

Expand All @@ -64,15 +65,15 @@ private long calculateTimeout() {
}

// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
public Stream<V> makeStream() {
return StreamSupport.stream(this, false).onClose(this::onStreamClose);
public Stream<V> createStream() {
return StreamSupport.stream(this, false).onClose(this::close);
}

// (supplier thread) Send data to stream thread.
public void onNext(V value) {
if (streamClosed) {
if (closed) {
// Need to abort supplier thread if stream is closed. onSupplierThreadComplete will exit immediately.
// ConsumerDoneException isn't handled because onSupplierThreadComplete will exit by streamClosed.
// ConsumerDoneException isn't handled because onSupplierThreadComplete will exit by this.closed.
throw ConsumerDoneException.INSTANCE;
}

Expand All @@ -90,7 +91,7 @@ public void onNext(V value) {
// (supplier thread) Send knowledge to stream when data is over.
public void onSupplierThreadComplete(Status status, Throwable ex) {
ex = unwrapException(ex);
if (ex instanceof OfferDeadlineExceededException || streamClosed) {
if (ex instanceof OfferDeadlineExceededException || closed) {
// If deadline exceeded happen, need to do nothing. Stream thread will exit at deadline by themself.
return;
}
Expand Down Expand Up @@ -129,21 +130,25 @@ public boolean tryAdvance(Consumer<? super V> action) {
throw new DeadlineExceededException("Stream deadline exceeded on poll");
}

if (value.isEndData()) {
if (value.endData()) {
endData = true;
validateResponse.accept(value.getStatus(), value.getError());
validateResponse.accept(value.status(), value.error());
return false;
}

action.accept(value.getValue());
action.accept(value.value());
return true;
}

// (stream thread) callback on stream.close()
@VisibleForTesting
protected void onStreamClose() {
streamClosed = true;
// Abort offer in supplier thread. onNext() will look at streamClosed and exit immediately.
// (stream thread) close spliterator and abort supplier thread
public void close() {
// close() can be called twice by stream.close() and in the end of transaction
if (closed) {
return;
}

closed = true;
// Abort offer in supplier thread. onNext() will look at this.closed and exit immediately.
// onSupplierThreadComplete() just will exit.
queue.clear();
}
Expand All @@ -168,13 +173,12 @@ public int characteristics() {
return flags;
}

@Value
private static class QueueValue<V> {
V value;
Status status;
Throwable error;
boolean endData;

private record QueueValue<V>(
V value,
Status status,
Throwable error,
boolean endData
) {
public static <V> QueueValue<V> of(V value) {
return new QueueValue<>(value, null, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void run() {
}

public Stream<Integer> stream() {
return spliterator.makeStream();
return spliterator.createStream();
}

public void interrupt() {
Expand Down Expand Up @@ -205,7 +205,7 @@ public void endStreamWhenSupplerOfferValue() {
spliterator.onNext(1);

// wait for block on onNext(2) and close stream
var thread = new TestingThread(() -> doAfter(100, spliterator::onStreamClose));
var thread = new TestingThread(() -> doAfter(100, spliterator::close));
thread.start();

spliterator.onNext(2);
Expand Down
Loading