From df2b5f33bbb60b82490a7d05f29045312c9302f6 Mon Sep 17 00:00:00 2001 From: f-galland Date: Fri, 1 Nov 2024 14:10:30 -0300 Subject: [PATCH] Switch to synchronous code --- .../jobscheduler/CommandManagerJobRunner.java | 50 +-- .../jobscheduler/PointInTime.java | 107 ++++++ .../jobscheduler/SearchJob.java | 314 +++++------------- 3 files changed, 223 insertions(+), 248 deletions(-) create mode 100644 plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/PointInTime.java diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java index f84d7e6..9f68bdd 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java @@ -27,7 +27,7 @@ public class CommandManagerJobRunner implements ScheduledJobRunner { private ClusterService clusterService; private Client client; - private final SearchJob searchJob = SearchJob.getSearchJobInstance(); + private final SearchJob searchJob = SearchJob.getInstance(); private CommandManagerJobRunner() { // Singleton class, use getJobRunner method instead of constructor @@ -55,33 +55,33 @@ private boolean indexExists(String indexName) { public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { if (!indexExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)) { log.info( - "{} index not yet created, not running command manager jobs", - CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); + "{} index not yet created, not running command manager jobs", + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); return; } Runnable runnable = - () -> { - this.searchJob.setClient(client); - this.searchJob.setThreadPool(threadPool); - // this.searchJob.scrollSearchJob(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE); - this.searchJob - .search( - CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, - CommandManagerPlugin.COMMAND_BATCH_SIZE) - .thenAccept( - searchResponse -> { - try { - this.searchJob.handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - }) - .exceptionally( - e -> { - log.error(e.getMessage()); - return null; - }); - }; + () -> { + this.searchJob.setClient(client); + this.searchJob.setThreadPool(threadPool); + // this.searchJob.scrollSearchJob(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE); + this.searchJob + .simpleSearch( + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, + CommandManagerPlugin.COMMAND_BATCH_SIZE) + .thenAccept( + searchResponse -> { + try { + this.searchJob.handleFirstPage(searchResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .exceptionally( + e -> { + log.error(e.getMessage()); + return null; + }); + }; threadPool.generic().submit(runnable); } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/PointInTime.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/PointInTime.java new file mode 100644 index 0000000..5c222bf --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/PointInTime.java @@ -0,0 +1,107 @@ +package com.wazuh.commandmanager.jobscheduler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.search.builder.PointInTimeBuilder; + +import javax.swing.*; +import java.sql.Time; + +public class PointInTime { + private static final Logger log = LogManager.getLogger(PointInTime.class); + private static PointInTime INSTANCE; + private String id; + private CreatePitRequest createPitRequest; + private PointInTimeBuilder pointInTimeBuilder; + private CreatePitResponse createPitResponse; + private TimeValue keepAlive = TimeValue.timeValueSeconds(60L); + + public PointInTimeBuilder createPit(Client client, String index) { + Boolean allowPartialPitCreation = false; + setCreatePitRequest( + new CreatePitRequest(getKeepAlive(), allowPartialPitCreation, index) + ); + client.createPit( + getCreatePitRequest(), + new ActionListener<>() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + setCreatePitResponse(createPitResponse); + setId(createPitResponse.getId()); + setPointInTimeBuilder( + new PointInTimeBuilder(createPitResponse.getId()) + ); + getPointInTimeBuilder().setKeepAlive(getKeepAlive()); + } + + @Override + public void onFailure(Exception e) { + log.error(e); + } + }); + return getPointInTimeBuilder(); + } + + + public CreatePitResponse getCreatePitResponse() { + return createPitResponse; + } + + public void setCreatePitResponse(CreatePitResponse createPitResponse) { + this.createPitResponse = createPitResponse; + } + + public PointInTimeBuilder getPointInTimeBuilder() { + return pointInTimeBuilder; + } + + public void setPointInTimeBuilder(PointInTimeBuilder pointInTimeBuilder) { + this.pointInTimeBuilder = pointInTimeBuilder; + } + + public PointInTime() { + } + + public CreatePitRequest getCreatePitRequest() { + return createPitRequest; + } + + public void setCreatePitRequest(CreatePitRequest createPitRequest) { + this.createPitRequest = createPitRequest; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + public static PointInTime getInstance() { + log.info("Getting Job Runner Instance"); + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (SearchJob.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new PointInTime(); + return INSTANCE; + } + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java index 0e4a8e1..d73d3ee 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchJob.java @@ -8,33 +8,28 @@ */ package com.wazuh.commandmanager.jobscheduler; +import com.wazuh.commandmanager.model.Status; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.*; import org.opensearch.client.Client; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.search.Scroll; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.sort.SortOrder; import org.opensearch.threadpool.ThreadPool; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.HashMap; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import com.wazuh.commandmanager.CommandManagerPlugin; import com.wazuh.commandmanager.model.Command; @@ -45,11 +40,11 @@ public class SearchJob { private static SearchJob INSTANCE; private ThreadPool threadPool; private Client client; - private String scrollId; private String pitId; private Object[] searchAfter; - private SearchResponse searchResponse; private SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + private final PointInTime pointInTime = PointInTime.getInstance(); + private SearchResponse searchResponse; public void setPitId(String pitId) { this.pitId = pitId; @@ -59,7 +54,7 @@ public String getPitId() { return pitId; } - public static SearchJob getSearchJobInstance() { + public static SearchJob getInstance() { log.info("Getting Job Runner Instance"); if (INSTANCE != null) { return INSTANCE; @@ -73,54 +68,19 @@ public static SearchJob getSearchJobInstance() { } } - public CompletableFuture search(String index, Integer resultsPerPage) { - SearchRequest searchRequest = new SearchRequest(index); - TermQueryBuilder termQueryBuilder = - QueryBuilders.termQuery("command.status.keyword", "PENDING"); - getSearchSourceBuilder().query(termQueryBuilder).size(resultsPerPage); - searchRequest.source(getSearchSourceBuilder()); - - CompletableFuture completableFuture = new CompletableFuture<>(); - ExecutorService executorService = this.threadPool.executor(ThreadPool.Names.SEARCH); - executorService.submit( - () -> { - try { - SearchResponse searchResponse = client.search(searchRequest).actionGet(); - completableFuture.complete(searchResponse); - } catch (Exception e) { - completableFuture.completeExceptionally(e); - } - }); - return completableFuture; - } - - private CompletableFuture scrollSearch( - SearchScrollRequest searchScrollRequest) { - CompletableFuture completableFuture = new CompletableFuture<>(); - ExecutorService executorService = this.threadPool.executor(ThreadPool.Names.SEARCH); - executorService.submit( - () -> { - try { - SearchResponse searchResponse = - client.searchScroll(searchScrollRequest).actionGet(); - completableFuture.complete(searchResponse); - } catch (Exception e) { - completableFuture.completeExceptionally(e); - } - }); - return completableFuture; - } - - Map checkAndTransform(Map inputMap) throws ClassCastException { - Map result = new HashMap<>(); - for (Map.Entry entry : inputMap.entrySet()) { - try { - result.put(entry.getKey(), entry.getValue()); - } catch (ClassCastException e) { - logStackTrace(e); - } - } - return result; + public CompletableFuture simpleSearch(String index, Integer resultsPerPage) { + setSearchRequest(index, Command.COMMAND + "." + Command.STATUS, Status.PENDING.toString(), resultsPerPage); + //ExecutorService executorService = this.threadPool.executor(ThreadPool.Names.SEARCH); + //executorService.submit( + // () -> { + // try { + // SearchResponse searchResponse = client.search(searchRequest).actionGet(); + // completableFuture.complete(searchResponse); + // } catch (Exception e) { + // completableFuture.completeExceptionally(e); + // } + // }); + return CompletableFuture.completedFuture(client.search(searchRequest).actionGet()); } @SuppressWarnings("Unchecked") @@ -130,178 +90,96 @@ public static T getNestedValue(Map map, String key, Class return type.cast(value); } else { throw new ClassCastException( - "Expected " - + type - + " but found " - + (value != null ? value.getClass() : "null")); + "Expected " + + type + + " but found " + + (value != null ? value.getClass() : "null")); } } - public void handleSearchResponse(SearchResponse searchResponse) throws Exception { + public Object[] getLastItemSortValues(SearchResponse searchResponse) { + return searchResponse + .getHits() + .getHits()[searchResponse.getHits().getHits().length - 1] + .getSortValues(); + } + + public Object[] handleFirstPage(SearchResponse searchResponse) throws Exception { SearchHits searchHits = searchResponse.getHits(); for (SearchHit hit : searchHits) { - Map commandMap = - getNestedValue(hit.getSourceAsMap(), "command", Map.class); - commandMap.put("status", "DONE"); - hit.getSourceAsMap().put("command", commandMap); - IndexRequest indexRequest = - new IndexRequest() - .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) - .source(hit.getSourceAsMap()) - .id(hit.getId()); - client.index( - indexRequest, - new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - log.debug("Updated command with document id: {}", hit.getId()); - } - - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); - XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); - hit.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); - String uri = "https://httpbin.org/post"; - HttpRestClientDemo.run(uri, xContentBuilder.toString()); + updateStatusField(hit, Status.SENT); + commandHttpRequest(hit); } + return getLastItemSortValues(searchResponse); } - public void pointInTimeSearch(String index, Integer resultsPerPage) { - CreatePitRequest createPitRequest = - new CreatePitRequest(TimeValue.timeValueMinutes(1L), false, index); - client.createPit( - createPitRequest, - new ActionListener<>() { - @Override - public void onResponse(CreatePitResponse createPitResponse) { - setPitId(createPitResponse.getId()); - } - - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); - SearchRequest searchRequest = new SearchRequest(index); - final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(getPitId()); - pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(1L)); - TermQueryBuilder termQueryBuilder = - QueryBuilders.termQuery("command.status.keyword", "PENDING"); - getSearchSourceBuilder() - .query(termQueryBuilder) - .size(resultsPerPage) - .sort(Command.COMMAND + "." + Command.ORDER_ID, SortOrder.ASC) - .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC) - .pointInTimeBuilder(pointInTimeBuilder); - searchRequest.source(getSearchSourceBuilder()); - client.search( - searchRequest, - new ActionListener<>() { - @Override - public void onResponse(SearchResponse searchResponse) { - try { - setSearchResponse(searchResponse); - handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + public void loopThroughPages() throws Exception { + setSearchResponse( + preparePitSearch( + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, + CommandManagerPlugin.COMMAND_BATCH_SIZE + ) + ); - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + Object[] lastSortValues = handleFirstPage(getSearchResponse()); + boolean hasNext = true; + while (hasNext) { - SearchHit[] searchHits = getSearchResponse().getHits().getHits(); - if (searchHits != null && searchHits.length > 0) { - searchAfter = searchHits[searchHits.length - 1].getSortValues(); - getSearchSourceBuilder().searchAfter(searchAfter); } - } - public void scrollSearchJob(String index, Integer resultsPerPage) { - final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); - SearchRequest searchRequest = new SearchRequest(index); - searchRequest.scroll(scroll); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - TermQueryBuilder termQueryBuilder = - QueryBuilders.termQuery("command.status.keyword", "PENDING"); - searchSourceBuilder - .query(termQueryBuilder) - .size(resultsPerPage) - .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC); - searchRequest.source(searchSourceBuilder); - client.search( - searchRequest, - new ActionListener<>() { - @Override - public void onResponse(SearchResponse searchResponse) { - log.info("First search iteration completed successfully"); - try { - handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - setScrollId(searchResponse); - setSearchResponse(searchResponse); - } + } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); + private static void commandHttpRequest(SearchHit hit) throws IOException { + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); + hit.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + HttpRestClientDemo.run("https://httpbin.org/post", xContentBuilder.toString()); + } - SearchHit[] searchHits = searchResponse.getHits().getHits(); + private void updateStatusField(SearchHit hit, Status status ) { + Map commandMap = + getNestedValue(hit.getSourceAsMap(), "command", Map.class); + commandMap.put(Command.STATUS, status); + hit.getSourceAsMap().put("command", commandMap); + IndexRequest indexRequest = + new IndexRequest() + .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) + .source(hit.getSourceAsMap()) + .id(hit.getId()); + this.client.index( + indexRequest, + new ActionListener<>() { + @Override + public void onResponse(IndexResponse indexResponse) { + log.debug("Updated command with document id: {}", hit.getId()); + } - while (searchHits != null && searchHits.length > 0) { - SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); - scrollRequest.scroll(scroll); - client.searchScroll( - scrollRequest, - new ActionListener<>() { - @Override - public void onResponse(SearchResponse searchResponse) { - log.info("Get next page of results"); - try { - handleSearchResponse(searchResponse); - } catch (Exception e) { - throw new RuntimeException(e); - } - setScrollId(searchResponse); - setSearchResponse(searchResponse); - } + @Override + public void onFailure(Exception e) { + log.error(e); + } + }); + } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); - searchHits = searchResponse.getHits().getHits(); - } + private void search(String index, String query, Integer size) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId); - client.clearScroll( - clearScrollRequest, - new ActionListener<>() { - @Override - public void onResponse(ClearScrollResponse clearScrollResponse) { - log.info("Scroll successfully cleaned"); - } - @Override - public void onFailure(Exception e) { - logStackTrace(e); - } - }); } - public void pointInTimeSearchJob(String index, Integer resultsPerPage) {} + public SearchResponse preparePitSearch(String index, Integer resultsPerPage) { + SearchRequest searchRequest = new SearchRequest(index); + TermQueryBuilder termQueryBuilder = + QueryBuilders.termQuery(Command.STATUS + ".keyword", Status.PENDING); + getSearchSourceBuilder() + .query(termQueryBuilder) + .size(resultsPerPage) + .sort(Command.COMMAND + "." + Command.ORDER_ID, SortOrder.ASC) + .sort(Command.COMMAND + "." + Command.TIMEOUT, SortOrder.ASC) + .pointInTimeBuilder( + this.pointInTime.createPit(this.client, index) + ); + searchRequest.source(getSearchSourceBuilder()); + return this.client.search(searchRequest).actionGet(); + } public void setThreadPool(ThreadPool threadPool) { this.threadPool = threadPool; @@ -311,10 +189,6 @@ public void setClient(Client client) { this.client = client; } - private void setScrollId(SearchResponse searchResponse) { - this.scrollId = searchResponse.getScrollId(); - } - public SearchSourceBuilder getSearchSourceBuilder() { return searchSourceBuilder; } @@ -323,18 +197,12 @@ public void setSearchSourceBuilder(SearchSourceBuilder searchSourceBuilder) { this.searchSourceBuilder = searchSourceBuilder; } - public void setSearchResponse(SearchResponse searchResponse) { - this.searchResponse = searchResponse; - } - public SearchResponse getSearchResponse() { return searchResponse; } - private static void logStackTrace(Throwable e) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(byteArrayOutputStream); - e.printStackTrace(ps); - log.error(byteArrayOutputStream.toString()); + public void setSearchResponse(SearchResponse searchResponse) { + this.searchResponse = searchResponse; } + }