Skip to content

Commit

Permalink
feat: add fetchType on Search
Browse files Browse the repository at this point in the history
close #3
  • Loading branch information
tchiotludo committed Dec 21, 2023
1 parent 76fc166 commit cf31a4c
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docker-compose-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.6"

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.3
environment:
discovery.type: single-node
ES_JAVA_OPTS: "-Xms256m -Xmx256m"
Expand Down
108 changes: 93 additions & 15 deletions src/main/java/io/kestra/plugin/elasticsearch/Search.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.search.aggregations.Aggregation;
import org.slf4j.Logger;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static io.kestra.core.utils.Rethrow.throwConsumer;

Expand Down Expand Up @@ -53,6 +59,17 @@
}
)
public class Search extends AbstractSearch implements RunnableTask<Search.Output> {
@Schema(
title = "The way you want to store the data",
description = "FETCH_ONE output the first row, "
+ "FETCH output all the rows, "
+ "STORE store all rows in a file, "
+ "NONE do nothing."
)
@Builder.Default
@PluginProperty
private FetchType fetchType = FetchType.FETCH;

@Override
public Search.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
Expand All @@ -64,46 +81,107 @@ public Search.Output run(RunContext runContext) throws Exception {

SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);

// fetch
ArrayList<Map<String, Object>> rows = new ArrayList<>();
AtomicLong recordsCount = new AtomicLong();
Output.OutputBuilder outputBuilder = Search.Output.builder();

switch (fetchType) {
case FETCH:
Pair<List<Map<String, Object>>, Integer> fetch = this.fetch(searchResponse);
outputBuilder
.rows(fetch.getLeft())
.size(fetch.getRight());
break;

Arrays.stream(searchResponse.getHits().getHits())
.forEach(throwConsumer(documentFields -> {
recordsCount.incrementAndGet();
rows.add(documentFields.getSourceAsMap());
}));
case FETCH_ONE:
var o = this.fetchOne(searchResponse);

outputBuilder
.row(o)
.size(o != null ? 1 : 0);
break;

case STORE:
Pair<URI, Integer> store = this.store(runContext, searchResponse);
outputBuilder
.uri(store.getLeft())
.size(store.getRight());
break;
}

// metrics
runContext.metric(Counter.of("requests.count", 1));
runContext.metric(Counter.of("records", searchResponse.getHits().getHits().length));
runContext.metric(Timer.of("requests.duration", Duration.ofNanos(searchResponse.getTook().nanos())));

// outputs
return Output.builder()
.size(recordsCount.get())
return outputBuilder
.total(searchResponse.getHits().getTotalHits().value)
.rows(rows)
.build();
}
}


protected Pair<URI, Integer> store(RunContext runContext, SearchResponse searchResponse) throws IOException {
File tempFile = runContext.tempFile(".ion").toFile();

try (var output = new FileOutputStream(tempFile)) {
Arrays
.stream(searchResponse.getHits().getHits())
.forEach(throwConsumer(docs -> FileSerde.write(output, docs.getSourceAsMap())));
}

return Pair.of(
runContext.putTempFile(tempFile),
searchResponse.getHits().getHits().length
);
}

protected Pair<List<Map<String, Object>>, Integer> fetch(SearchResponse searchResponse) {
List<Map<String, Object>> result = new ArrayList<>();

Arrays
.stream(searchResponse.getHits().getHits())
.forEach(throwConsumer(docs -> result.add(docs.getSourceAsMap())));

return Pair.of(result, searchResponse.getHits().getHits().length);
}

protected Map<String, Object> fetchOne(SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
return null;
}

return searchResponse.getHits().getHits()[0].getSourceAsMap();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The size of the rows fetch"
)
private Long size;
private Integer size;

@Schema(
title = "The total of the rows fetch without pagination"
)
private Long total;

@Schema(
title = "The search result fetch"
title = "List containing the fetched data",
description = "Only populated if using `fetchType=FETCH`."
)
private List<Map<String, Object>> rows;

@Schema(
title = "Map containing the first row of fetched data",
description = "Only populated if using `fetchType=FETCH_ONE`."
)
private Map<String, Object> row;

@Schema(
title = "The uri of stored data",
description = "Only populated if using `fetchType=STORE`"
)
private URI uri;
}
}
64 changes: 63 additions & 1 deletion src/test/java/io/kestra/plugin/elasticsearch/SearchTest.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package io.kestra.plugin.elasticsearch;

import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@MicronautTest
class SearchTest {
Expand All @@ -23,6 +31,9 @@ class SearchTest {
@Value("${elasticsearch-hosts}")
private List<String> hosts;

@Inject
private StorageInterface storageInterface;

@Test
void run() throws Exception {
RunContext runContext = runContextFactory.of();
Expand All @@ -38,7 +49,58 @@ void run() throws Exception {

Search.Output run = task.run(runContext);

assertThat(run.getSize(), is(1L));
assertThat(run.getSize(), is(1));
assertThat(run.getRows().get(0).get("genericName"), is("Larus"));
}

@Test
void runFetchOne() throws Exception {
RunContext runContext = runContextFactory.of();

SearchSourceBuilder query = new SearchSourceBuilder();
query.query(QueryBuilders.termQuery("publishingCountry.keyword", "BE"));
query.sort("key");

Search task = Search.builder()
.connection(ElasticsearchConnection.builder().hosts(hosts).build())
.indexes(Collections.singletonList("gbif"))
.request(query.toString())
.fetchType(FetchType.FETCH_ONE)
.build();

Search.Output run = task.run(runContext);

assertThat(run.getSize(), is(1));
assertThat(run.getTotal(), is(28L));
assertThat(run.getRow().get("key"), is(925277090));
}

@SuppressWarnings("unchecked")
@Test
void runStored() throws Exception {
RunContext runContext = runContextFactory.of();

SearchSourceBuilder query = new SearchSourceBuilder();
query.query(QueryBuilders.termQuery("publishingCountry.keyword", "BE"));
query.sort("key");

Search task = Search.builder()
.connection(ElasticsearchConnection.builder().hosts(hosts).build())
.indexes(Collections.singletonList("gbif"))
.request(query.toString())
.fetchType(FetchType.STORE)
.build();

Search.Output run = task.run(runContext);

assertThat(run.getSize(), is(10));
assertThat(run.getTotal(), is(28L));
assertThat(run.getUri(), notNullValue());

BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, run.getUri())));
List<Map<String, Object>> result = new ArrayList<>();
FileSerde.reader(inputStream, r -> result.add((Map<String, Object>) r));

assertThat(result.get(8).get("key"), is(925311404));
}
}

0 comments on commit cf31a4c

Please sign in to comment.