Skip to content

Commit

Permalink
feat: use FileSerde.writeAll and buffering for improved performances
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 13, 2024
1 parent c141d74 commit 2172ecc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 26 deletions.
20 changes: 8 additions & 12 deletions src/main/java/io/kestra/plugin/elasticsearch/Scroll.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -65,7 +62,7 @@ public Scroll.Output run(RunContext runContext) throws Exception {

try (
RestClientTransport transport = this.connection.client(runContext);
OutputStream output = new FileOutputStream(tempFile)
Writer output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)
) {
OpenSearchClient client = new OpenSearchClient(transport);
// build request
Expand All @@ -90,11 +87,10 @@ public Scroll.Output run(RunContext runContext) throws Exception {
requestsDuration.addAndGet(searchResponse.took());
requestsCount.incrementAndGet();

searchResponse.hits().hits()
.forEach(throwConsumer(documentFields -> {
recordsCount.incrementAndGet();
FileSerde.write(output, documentFields.source());
}));
Flux<Map> hitFlux = Flux.fromIterable(searchResponse.hits().hits()).map(hit -> hit.source());
Mono<Long> longMono = FileSerde.writeAll(output, hitFlux);

recordsCount.addAndGet(longMono.block());

ScrollRequest searchScrollRequest = new ScrollRequest.Builder()
.scrollId(scrollId)
Expand Down
28 changes: 14 additions & 14 deletions src/main/java/io/kestra/plugin/elasticsearch/Search.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.*;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -100,10 +100,10 @@ public Search.Output run(RunContext runContext) throws Exception {
break;

case STORE:
Pair<URI, Integer> store = this.store(runContext, searchResponse);
Pair<URI, Long> store = this.store(runContext, searchResponse);
outputBuilder
.uri(store.getLeft())
.size(store.getRight());
.size(store.getRight().intValue());
break;
}

Expand All @@ -120,18 +120,18 @@ public Search.Output run(RunContext runContext) throws Exception {
}


protected Pair<URI, Integer> store(RunContext runContext, SearchResponse<Map> searchResponse) throws IOException {
protected Pair<URI, Long> store(RunContext runContext, SearchResponse<Map> searchResponse) throws IOException {
File tempFile = runContext.workingDir().createTempFile(".ion").toFile();

try (var output = new FileOutputStream(tempFile)) {
searchResponse.hits().hits()
.forEach(throwConsumer(docs -> FileSerde.write(output, docs.source())));
}
try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) {
Flux<Map> hitFlux = Flux.fromIterable(searchResponse.hits().hits()).map(hit -> hit.source());
Long count = FileSerde.writeAll(output, hitFlux).block();

return Pair.of(
runContext.storage().putFile(tempFile),
searchResponse.hits().hits().size()
);
return Pair.of(
runContext.storage().putFile(tempFile),
count
);
}
}

protected Pair<List<Map<String, Object>>, Integer> fetch(SearchResponse<Map> searchResponse) {
Expand Down

0 comments on commit 2172ecc

Please sign in to comment.