diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index 5c30e82..8dd04f4 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -8,14 +8,6 @@ */ package com.wazuh.commandmanager; - -import com.wazuh.commandmanager.index.CommandIndex; -import com.wazuh.commandmanager.jobscheduler.CommandManagerJobParameter; -import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner; -import com.wazuh.commandmanager.jobscheduler.JobDocument; -import com.wazuh.commandmanager.rest.RestPostCommandAction; -import com.wazuh.commandmanager.utils.httpclient.HttpRestClient; -import com.wazuh.commandmanager.utils.httpclient.HttpRestClientDemo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexResponse; @@ -23,8 +15,8 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.*; import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.*; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; @@ -49,8 +41,8 @@ import org.opensearch.watcher.ResourceWatcherService; import java.io.IOException; -import java.util.Arrays; import java.time.Instant; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -58,6 +50,9 @@ import java.util.function.Supplier; import com.wazuh.commandmanager.index.CommandIndex; +import com.wazuh.commandmanager.jobscheduler.CommandManagerJobParameter; +import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner; +import com.wazuh.commandmanager.jobscheduler.JobDocument; import com.wazuh.commandmanager.rest.RestPostCommandAction; import com.wazuh.commandmanager.settings.PluginSettings; import com.wazuh.commandmanager.utils.httpclient.HttpRestClient; @@ -65,15 +60,14 @@ /** * The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands * from the Wazuh Server. These commands are processed, indexed and sent back to the Server for its - * delivery to, in most cases, the Agents. - * The Command Manager plugin exposes an HTTP API with a single endpoint to - * receive raw commands from the Wazuh Server. These commands are processed, - * indexed and sent back to the Server for its delivery to, in most cases, the - * Agents. - *

- * The Command Manager plugin is also a JobScheduler extension plugin. + * delivery to, in most cases, the Agents. The Command Manager plugin exposes an HTTP API with a + * single endpoint to receive raw commands from the Wazuh Server. These commands are processed, + * indexed and sent back to the Server for its delivery to, in most cases, the Agents. + * + *

The Command Manager plugin is also a JobScheduler extension plugin. */ -public class CommandManagerPlugin extends Plugin implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension { +public class CommandManagerPlugin extends Plugin + implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension { public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager"; public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands"; public static final String COMMAND_MANAGER_INDEX_NAME = ".commands"; @@ -85,7 +79,7 @@ public class CommandManagerPlugin extends Plugin implements ActionPlugin, Reload private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class); private CommandIndex commandIndex; - private CommandManagerSettings commandManagerSettings; + private PluginSettings commandManagerSettings; private JobDocument jobDocument; @Override @@ -113,25 +107,34 @@ public Collection createComponents( scheduleCommandJob(client, clusterService, threadPool); // HttpRestClient stuff - //String uri = "https://httpbin.org/post"; - //String payload = "{\"message\": \"Hello world!\"}"; - //HttpRestClientDemo.run(uri, payload); + // String uri = "https://httpbin.org/post"; + // String payload = "{\"message\": \"Hello world!\"}"; + // HttpRestClientDemo.run(uri, payload); return Collections.emptyList(); } - private void scheduleCommandJob(Client client, ClusterService clusterService, ThreadPool threadPool) { - clusterService.addListener(event -> { - if(event.localNodeClusterManager() && event.isNewCluster()) { - jobDocument = JobDocument.getInstance(); - CompletableFuture indexResponseCompletableFuture = jobDocument.create(client, threadPool, UUIDs.base64UUID(), getJobType(), JOB_PERIOD_MINUTES); - indexResponseCompletableFuture.thenAccept( - indexResponse -> { - log.info("Scheduled task successfully, response: {}", indexResponse.getResult().toString()); + private void scheduleCommandJob( + Client client, ClusterService clusterService, ThreadPool threadPool) { + clusterService.addListener( + event -> { + if (event.localNodeClusterManager() && event.isNewCluster()) { + jobDocument = JobDocument.getInstance(); + CompletableFuture indexResponseCompletableFuture = + jobDocument.create( + client, + threadPool, + UUIDs.base64UUID(), + getJobType(), + JOB_PERIOD_MINUTES); + indexResponseCompletableFuture.thenAccept( + indexResponse -> { + log.info( + "Scheduled task successfully, response: {}", + indexResponse.getResult().toString()); + }); } - ); - } - }); + }); } @Override @@ -186,10 +189,7 @@ public ScheduledJobParser getJobParser() { return (parser, id, jobDocVersion) -> { CommandManagerJobParameter jobParameter = new CommandManagerJobParameter(); XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, - parser.nextToken(), - parser - ); + XContentParser.Token.START_OBJECT, parser.nextToken(), parser); while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { String fieldName = parser.currentName(); @@ -211,14 +211,14 @@ public ScheduledJobParser getJobParser() { jobParameter.setSchedule(ScheduleParser.parse(parser)); break; default: - XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + XContentParserUtils.throwUnknownToken( + parser.currentToken(), parser.getTokenLocation()); } } return jobParameter; }; } - private Instant parseInstantValue(XContentParser parser) throws IOException { if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { return null; diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java index f8eadfd..f6de103 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java @@ -1,4 +1,5 @@ /* + * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to @@ -29,7 +30,6 @@ public class CommandManagerJobParameter implements ScheduledJobParameter { private boolean isEnabled; private Schedule schedule; - public CommandManagerJobParameter() {} public CommandManagerJobParameter(String jobName, Schedule schedule) { @@ -42,25 +42,21 @@ public CommandManagerJobParameter(String jobName, Schedule schedule) { this.lastUpdateTime = now; } - @Override public String getName() { return this.jobName; } - @Override public Instant getLastUpdateTime() { return this.lastUpdateTime; } - @Override public Instant getEnabledTime() { return this.enabledTime; } - @Override public Schedule getSchedule() { return this.schedule; @@ -98,10 +94,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(ENABLED_FIELD, this.isEnabled); builder.field(SCHEDULE_FIELD, this.schedule); if (this.enabledTime != null) { - builder.timeField(ENABLED_TIME_FIELD, ENABLED_TIME_FIELD_READABLE, this.enabledTime.toEpochMilli()); + builder.timeField( + ENABLED_TIME_FIELD, + ENABLED_TIME_FIELD_READABLE, + this.enabledTime.toEpochMilli()); } if (this.lastUpdateTime != null) { - builder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD_READABLE, this.lastUpdateTime.toEpochMilli()); + builder.timeField( + LAST_UPDATE_TIME_FIELD, + LAST_UPDATE_TIME_FIELD_READABLE, + this.lastUpdateTime.toEpochMilli()); } builder.endObject(); diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java index 20965d0..f84d7e6 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java @@ -1,4 +1,5 @@ /* + * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to @@ -7,7 +8,6 @@ */ package com.wazuh.commandmanager.jobscheduler; -import com.wazuh.commandmanager.CommandManagerPlugin; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; @@ -17,6 +17,8 @@ import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.threadpool.ThreadPool; +import com.wazuh.commandmanager.CommandManagerPlugin; + public class CommandManagerJobRunner implements ScheduledJobRunner { private static final Logger log = LogManager.getLogger(CommandManagerJobRunner.class); @@ -51,31 +53,35 @@ private boolean indexExists(String indexName) { @Override public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { - if ( ! indexExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) ) { - log.info("{} index not yet created, not running command manager jobs", CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); + if (!indexExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)) { + log.info( + "{} index not yet created, not running command manager jobs", + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); return; } - Runnable runnable = () -> { - this.searchJob.setClient(client); - this.searchJob.setThreadPool(threadPool); - //this.searchJob.scrollSearchJob(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE); - this.searchJob.search(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE) - .thenAccept( - searchResponse -> { - try { - this.searchJob.handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - ) - .exceptionally( - e -> { - log.error(e.getMessage()); - return null; - } - ); - }; + Runnable runnable = + () -> { + this.searchJob.setClient(client); + this.searchJob.setThreadPool(threadPool); + // this.searchJob.scrollSearchJob(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE); + this.searchJob + .search( + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, + CommandManagerPlugin.COMMAND_BATCH_SIZE) + .thenAccept( + searchResponse -> { + try { + this.searchJob.handleSearchResponse(searchResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .exceptionally( + e -> { + log.error(e.getMessage()); + return null; + }); + }; threadPool.generic().submit(runnable); } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java index 02a1e82..ffe0191 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java @@ -1,6 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ package com.wazuh.commandmanager.jobscheduler; -import com.wazuh.commandmanager.CommandManagerPlugin; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; @@ -17,12 +24,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import com.wazuh.commandmanager.CommandManagerPlugin; + public class JobDocument { private static final Logger log = LogManager.getLogger(JobDocument.class); private static JobDocument INSTANCE; - private JobDocument() { - } + private JobDocument() {} public static JobDocument getInstance() { log.info("Getting JobDocument Instance"); @@ -38,29 +46,30 @@ public static JobDocument getInstance() { } } - public CompletableFuture create(Client client, ThreadPool threadPool, String id, String jobName, Integer interval) { + public CompletableFuture create( + Client client, ThreadPool threadPool, String id, String jobName, Integer interval) { CompletableFuture completableFuture = new CompletableFuture<>(); ExecutorService executorService = threadPool.executor(ThreadPool.Names.WRITE); - CommandManagerJobParameter jobParameter = new CommandManagerJobParameter( - jobName, - new IntervalSchedule(Instant.now(), interval, ChronoUnit.MINUTES) - ); + CommandManagerJobParameter jobParameter = + new CommandManagerJobParameter( + jobName, new IntervalSchedule(Instant.now(), interval, ChronoUnit.MINUTES)); try { - IndexRequest indexRequest = new IndexRequest() - .index(CommandManagerPlugin.JOB_INDEX_NAME) - .id(id) - .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) - .create(true); + IndexRequest indexRequest = + new IndexRequest() + .index(CommandManagerPlugin.JOB_INDEX_NAME) + .id(id) + .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .create(true); executorService.submit( - () -> { - try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) { - IndexResponse indexResponse = client.index(indexRequest).actionGet(); - completableFuture.complete(indexResponse); - } catch (Exception e) { - completableFuture.completeExceptionally(e); - } - } - ); + () -> { + try (ThreadContext.StoredContext ignored = + threadPool.getThreadContext().stashContext()) { + IndexResponse indexResponse = client.index(indexRequest).actionGet(); + completableFuture.complete(indexResponse); + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + }); } catch (IOException e) { log.error("Failed to index command with ID {}: {}", id, e); } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java index 60c9b61..0e4a8e1 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java @@ -1,8 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ package com.wazuh.commandmanager.jobscheduler; -import com.wazuh.commandmanager.CommandManagerPlugin; -import com.wazuh.commandmanager.model.Command; -import com.wazuh.commandmanager.utils.httpclient.HttpRestClientDemo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; @@ -31,6 +36,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import com.wazuh.commandmanager.CommandManagerPlugin; +import com.wazuh.commandmanager.model.Command; +import com.wazuh.commandmanager.utils.httpclient.HttpRestClientDemo; + public class SearchJob { private static final Logger log = LogManager.getLogger(SearchJob.class); private static SearchJob INSTANCE; @@ -42,8 +51,6 @@ public class SearchJob { private SearchResponse searchResponse; private SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - - public void setPitId(String pitId) { this.pitId = pitId; } @@ -52,7 +59,6 @@ public String getPitId() { return pitId; } - public static SearchJob getSearchJobInstance() { log.info("Getting Job Runner Instance"); if (INSTANCE != null) { @@ -69,45 +75,45 @@ public static SearchJob getSearchJobInstance() { public CompletableFuture search(String index, Integer resultsPerPage) { SearchRequest searchRequest = new SearchRequest(index); - TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("command.status.keyword","PENDING"); - getSearchSourceBuilder().query(termQueryBuilder) - .size(resultsPerPage); + TermQueryBuilder termQueryBuilder = + QueryBuilders.termQuery("command.status.keyword", "PENDING"); + getSearchSourceBuilder().query(termQueryBuilder).size(resultsPerPage); searchRequest.source(getSearchSourceBuilder()); CompletableFuture completableFuture = new CompletableFuture<>(); ExecutorService executorService = this.threadPool.executor(ThreadPool.Names.SEARCH); executorService.submit( - () -> { - try { - SearchResponse searchResponse = client.search(searchRequest).actionGet(); - completableFuture.complete(searchResponse); - } catch (Exception e) { - completableFuture.completeExceptionally(e); - } - } - ); + () -> { + try { + SearchResponse searchResponse = client.search(searchRequest).actionGet(); + completableFuture.complete(searchResponse); + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + }); return completableFuture; } - private CompletableFuture scrollSearch(SearchScrollRequest searchScrollRequest) { + private CompletableFuture scrollSearch( + SearchScrollRequest searchScrollRequest) { CompletableFuture completableFuture = new CompletableFuture<>(); ExecutorService executorService = this.threadPool.executor(ThreadPool.Names.SEARCH); executorService.submit( - () -> { - try { - SearchResponse searchResponse = client.searchScroll(searchScrollRequest).actionGet(); - completableFuture.complete(searchResponse); - } catch (Exception e) { - completableFuture.completeExceptionally(e); - } - } - ); + () -> { + try { + SearchResponse searchResponse = + client.searchScroll(searchScrollRequest).actionGet(); + completableFuture.complete(searchResponse); + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + }); return completableFuture; } Map checkAndTransform(Map inputMap) throws ClassCastException { Map result = new HashMap<>(); - for ( Map.Entry entry : inputMap.entrySet() ) { + for (Map.Entry entry : inputMap.entrySet()) { try { result.put(entry.getKey(), entry.getValue()); } catch (ClassCastException e) { @@ -123,33 +129,39 @@ public static T getNestedValue(Map map, String key, Class if (type.isInstance(value)) { return type.cast(value); } else { - throw new ClassCastException("Expected " + type + " but found " + (value != null ? value.getClass() : "null")); + throw new ClassCastException( + "Expected " + + type + + " but found " + + (value != null ? value.getClass() : "null")); } } public void handleSearchResponse(SearchResponse searchResponse) throws Exception { SearchHits searchHits = searchResponse.getHits(); - for (SearchHit hit: searchHits) { - Map commandMap = getNestedValue(hit.getSourceAsMap(), "command", Map.class); + for (SearchHit hit : searchHits) { + Map commandMap = + getNestedValue(hit.getSourceAsMap(), "command", Map.class); commandMap.put("status", "DONE"); hit.getSourceAsMap().put("command", commandMap); - IndexRequest indexRequest = new IndexRequest() - .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) - .source(hit.getSourceAsMap()) - .id(hit.getId()); + IndexRequest indexRequest = + new IndexRequest() + .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) + .source(hit.getSourceAsMap()) + .id(hit.getId()); client.index( - indexRequest, - new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - log.debug("Updated command with document id: {}", hit.getId()); - } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - } - ); + indexRequest, + new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + log.debug("Updated command with document id: {}", hit.getId()); + } + + @Override + public void onFailure(Exception e) { + logStackTrace(e); + } + }); XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); hit.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); String uri = "https://httpbin.org/post"; @@ -158,42 +170,51 @@ public void onFailure(Exception e) { } public void pointInTimeSearch(String index, Integer resultsPerPage) { - CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueMinutes(1L), false, index); - client.createPit(createPitRequest, new ActionListener<>() { - @Override - public void onResponse(CreatePitResponse createPitResponse) { - setPitId(createPitResponse.getId()); - } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + CreatePitRequest createPitRequest = + new CreatePitRequest(TimeValue.timeValueMinutes(1L), false, index); + client.createPit( + createPitRequest, + new ActionListener<>() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + setPitId(createPitResponse.getId()); + } + + @Override + public void onFailure(Exception e) { + logStackTrace(e); + } + }); SearchRequest searchRequest = new SearchRequest(index); final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(getPitId()); pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(1L)); - TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("command.status.keyword","PENDING"); - getSearchSourceBuilder().query(termQueryBuilder) - .size(resultsPerPage) - .sort(Command.COMMAND + "." + Command.ORDER_ID, SortOrder.ASC) - .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC) - .pointInTimeBuilder(pointInTimeBuilder); + TermQueryBuilder termQueryBuilder = + QueryBuilders.termQuery("command.status.keyword", "PENDING"); + getSearchSourceBuilder() + .query(termQueryBuilder) + .size(resultsPerPage) + .sort(Command.COMMAND + "." + Command.ORDER_ID, SortOrder.ASC) + .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC) + .pointInTimeBuilder(pointInTimeBuilder); searchRequest.source(getSearchSourceBuilder()); - client.search(searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse searchResponse) { - try { - setSearchResponse(searchResponse); - handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + client.search( + searchRequest, + new ActionListener<>() { + @Override + public void onResponse(SearchResponse searchResponse) { + try { + setSearchResponse(searchResponse); + handleSearchResponse(searchResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void onFailure(Exception e) { + logStackTrace(e); + } + }); SearchHit[] searchHits = getSearchResponse().getHits().getHits(); if (searchHits != null && searchHits.length > 0) { @@ -207,73 +228,80 @@ public void scrollSearchJob(String index, Integer resultsPerPage) { SearchRequest searchRequest = new SearchRequest(index); searchRequest.scroll(scroll); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("command.status.keyword","PENDING"); - searchSourceBuilder.query(termQueryBuilder) - .size(resultsPerPage) - .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC); + TermQueryBuilder termQueryBuilder = + QueryBuilders.termQuery("command.status.keyword", "PENDING"); + searchSourceBuilder + .query(termQueryBuilder) + .size(resultsPerPage) + .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC); searchRequest.source(searchSourceBuilder); - client.search(searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse searchResponse) { - log.info("First search iteration completed successfully"); - try { - handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - setScrollId(searchResponse); - setSearchResponse(searchResponse); - } + client.search( + searchRequest, + new ActionListener<>() { + @Override + public void onResponse(SearchResponse searchResponse) { + log.info("First search iteration completed successfully"); + try { + handleSearchResponse(searchResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + setScrollId(searchResponse); + setSearchResponse(searchResponse); + } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + @Override + public void onFailure(Exception e) { + logStackTrace(e); + } + }); SearchHit[] searchHits = searchResponse.getHits().getHits(); - while ( searchHits != null && searchHits.length > 0 ) { + while (searchHits != null && searchHits.length > 0) { SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(scroll); - client.searchScroll(scrollRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse searchResponse) { - log.info("Get next page of results"); - try { - handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - setScrollId(searchResponse); - setSearchResponse(searchResponse); - } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + client.searchScroll( + scrollRequest, + new ActionListener<>() { + @Override + public void onResponse(SearchResponse searchResponse) { + log.info("Get next page of results"); + try { + handleSearchResponse(searchResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + setScrollId(searchResponse); + setSearchResponse(searchResponse); + } + + @Override + public void onFailure(Exception e) { + logStackTrace(e); + } + }); searchHits = searchResponse.getHits().getHits(); } ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); - client.clearScroll(clearScrollRequest, new ActionListener<>() { - @Override - public void onResponse(ClearScrollResponse clearScrollResponse) { - log.info("Scroll successfully cleaned"); - } + client.clearScroll( + clearScrollRequest, + new ActionListener<>() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + log.info("Scroll successfully cleaned"); + } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + @Override + public void onFailure(Exception e) { + logStackTrace(e); + } + }); } - public void pointInTimeSearchJob(String index, Integer resultsPerPage) { - - } + public void pointInTimeSearchJob(String index, Integer resultsPerPage) {} public void setThreadPool(ThreadPool threadPool) { this.threadPool = threadPool;