From d26ab1178f83435d5f400e8c47799f86e082b7e8 Mon Sep 17 00:00:00 2001 From: quebim Date: Thu, 5 Dec 2024 11:27:47 -0300 Subject: [PATCH 1/3] Update handlePage to sort hits by its delivery timeout --- .../jobscheduler/SearchThread.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java index 6e268a9..57192e5 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java @@ -44,6 +44,7 @@ import com.wazuh.commandmanager.CommandManagerPlugin; import com.wazuh.commandmanager.model.Command; +import com.wazuh.commandmanager.model.Document; import com.wazuh.commandmanager.model.Status; import com.wazuh.commandmanager.settings.PluginSettings; import com.wazuh.commandmanager.utils.httpclient.AuthHttpRestClient; @@ -101,6 +102,9 @@ public static T getNestedObject(Map map, String key, Class orders = new ArrayList<>(); + // Get sorted hits by delivery_timestamp. + List hitsList = getSortedHits(searchHits); + log.info("Hits: {}", hitsList); for (SearchHit hit : searchHits) { // Create a JSON representation of each hit and add it to the orders array. @@ -133,6 +137,31 @@ public void handlePage(SearchResponse searchResponse) throws IllegalStateExcepti } } + /** + * Converts SearchHits to a list and sorts them by the delivery_timestamp field. + * + * @param searchHits The SearchHits object containing the hits to be processed. + * @return A List of SearchHit objects sorted by delivery_timestamp. + */ + private static List getSortedHits(SearchHits searchHits) { + List hitsList = new ArrayList<>(List.of(searchHits.getHits())); + hitsList.sort( + (hit1, hit2) -> { + String timeout1 = + getNestedObject( + hit1.getSourceAsMap(), + Document.DELIVERY_TIMESTAMP, + String.class); + String timeout2 = + getNestedObject( + hit2.getSourceAsMap(), + Document.DELIVERY_TIMESTAMP, + String.class); + return timeout1.compareTo(timeout2); + }); + return hitsList; + } + /** * Send the command order over HTTP * From fe26abb652648cb820ebd2438597e9b77cce1ebe Mon Sep 17 00:00:00 2001 From: quebim Date: Thu, 5 Dec 2024 12:44:24 -0300 Subject: [PATCH 2/3] Use OpenSearch qeury to sort commands by its delivery timestamp --- .../jobscheduler/SearchThread.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java index 57192e5..9042bad 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java @@ -59,6 +59,7 @@ public class SearchThread implements Runnable { public static final String COMMAND_ORDER_ID_FIELD = Command.COMMAND + "." + Command.ORDER_ID + ".keyword"; public static final String COMMAND_TIMEOUT_FIELD = Command.COMMAND + "." + Command.TIMEOUT; + public static final String DELIVERY_TIMESTAMP_FIELD = Document.DELIVERY_TIMESTAMP; private static final Logger log = LogManager.getLogger(SearchThread.class); public static final String ORDERS_OBJECT = "/orders"; private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); @@ -102,10 +103,10 @@ public static T getNestedObject(Map map, String key, Class orders = new ArrayList<>(); - // Get sorted hits by delivery_timestamp. - List hitsList = getSortedHits(searchHits); - log.info("Hits: {}", hitsList); - + // // Get sorted hits by delivery_timestamp. + // List hitsList = getSortedHits(searchHits); + // log.info("Hits: {}", hitsList); + log.info("Hits: {}", searchHits); for (SearchHit hit : searchHits) { // Create a JSON representation of each hit and add it to the orders array. Map orderMap = @@ -121,7 +122,7 @@ public void handlePage(SearchResponse searchResponse) throws IllegalStateExcepti } catch (IOException e) { log.error("Error parsing hit contents: {}", e.getMessage()); } - + log.info("Payload: {}", payload); if (payload != null) { SimpleHttpResponse response = deliverOrders(payload); if (response == null) { @@ -230,8 +231,9 @@ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] s .pointInTimeBuilder(pointInTimeBuilder); if (this.searchSourceBuilder.sorts() == null) { this.searchSourceBuilder - .sort(SearchThread.COMMAND_ORDER_ID_FIELD, SortOrder.ASC) - .sort(SearchThread.COMMAND_TIMEOUT_FIELD, SortOrder.ASC); + // Agregar sort solo por delivery_timestamp + .sort(SearchThread.DELIVERY_TIMESTAMP_FIELD, SortOrder.DESC); + // .sort(SearchThread.COMMAND_TIMEOUT_FIELD, SortOrder.ASC); } if (searchAfter.length > 0) { this.searchSourceBuilder.searchAfter(searchAfter); From 7e2e984cd801f98378f23e52a111c685c3769bee Mon Sep 17 00:00:00 2001 From: quebim Date: Thu, 5 Dec 2024 13:15:58 -0300 Subject: [PATCH 3/3] Remove info logs --- .../jobscheduler/SearchThread.java | 37 ++----------------- 1 file changed, 3 insertions(+), 34 deletions(-) diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java index 9042bad..dca965e 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java @@ -103,10 +103,7 @@ public static T getNestedObject(Map map, String key, Class orders = new ArrayList<>(); - // // Get sorted hits by delivery_timestamp. - // List hitsList = getSortedHits(searchHits); - // log.info("Hits: {}", hitsList); - log.info("Hits: {}", searchHits); + for (SearchHit hit : searchHits) { // Create a JSON representation of each hit and add it to the orders array. Map orderMap = @@ -122,7 +119,7 @@ public void handlePage(SearchResponse searchResponse) throws IllegalStateExcepti } catch (IOException e) { log.error("Error parsing hit contents: {}", e.getMessage()); } - log.info("Payload: {}", payload); + if (payload != null) { SimpleHttpResponse response = deliverOrders(payload); if (response == null) { @@ -138,31 +135,6 @@ public void handlePage(SearchResponse searchResponse) throws IllegalStateExcepti } } - /** - * Converts SearchHits to a list and sorts them by the delivery_timestamp field. - * - * @param searchHits The SearchHits object containing the hits to be processed. - * @return A List of SearchHit objects sorted by delivery_timestamp. - */ - private static List getSortedHits(SearchHits searchHits) { - List hitsList = new ArrayList<>(List.of(searchHits.getHits())); - hitsList.sort( - (hit1, hit2) -> { - String timeout1 = - getNestedObject( - hit1.getSourceAsMap(), - Document.DELIVERY_TIMESTAMP, - String.class); - String timeout2 = - getNestedObject( - hit2.getSourceAsMap(), - Document.DELIVERY_TIMESTAMP, - String.class); - return timeout1.compareTo(timeout2); - }); - return hitsList; - } - /** * Send the command order over HTTP * @@ -230,10 +202,7 @@ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] s .timeout(timeout) .pointInTimeBuilder(pointInTimeBuilder); if (this.searchSourceBuilder.sorts() == null) { - this.searchSourceBuilder - // Agregar sort solo por delivery_timestamp - .sort(SearchThread.DELIVERY_TIMESTAMP_FIELD, SortOrder.DESC); - // .sort(SearchThread.COMMAND_TIMEOUT_FIELD, SortOrder.ASC); + this.searchSourceBuilder.sort(SearchThread.DELIVERY_TIMESTAMP_FIELD, SortOrder.ASC); } if (searchAfter.length > 0) { this.searchSourceBuilder.searchAfter(searchAfter);