From 7f2420978fbbf638065921ea5896fa6571b1b3c9 Mon Sep 17 00:00:00 2001 From: Kevin Ledesma Date: Tue, 3 Dec 2024 17:23:00 -0300 Subject: [PATCH] Implement timeseries index model to command-manager plugin (#153) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update commands index template Add @timestamp and delivery_timestamp fields * Add timestamp and delivery_timestamp attributes to Command model * Move timestamp and deliveryTimestamp from Command to Document Add getter function for the Command attrbiture timeout Use native System.currentTimeMillis() to get current timestamp * Update command index template Move delivery_timestamp to top-level * Fix typo on field names * Implement timestamp and delivery_timestamp constants attributes for Document model Add missing 'this' on timeout getter * Update timestamp and deliveryTimestamp to be of type ZonedDateTime Implement OpenSearch DateUtils * Implement OpensSearch date_time_no_millis date format pattern * Fix errors --------- Co-authored-by: Álex Ruiz --- .../wazuh/commandmanager/model/Command.java | 9 ++++ .../wazuh/commandmanager/model/Document.java | 25 ++++++++- .../wazuh/commandmanager/model/Documents.java | 15 +++--- .../rest/RestPostCommandAction.java | 52 +++++++++++-------- .../resources/index-template-commands.json | 6 +++ .../resources/index-template-commands.json | 6 +++ 6 files changed, 83 insertions(+), 30 deletions(-) diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java index 798d1182..c5221248 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java @@ -62,6 +62,15 @@ public Command( this.status = Status.PENDING; } + /** + * Retrieves the timeout value for this command. + * + * @return the timeout value in milliseconds. + */ + public Integer getTimeout() { + return this.timeout; + } + /** * Parses the request's payload into the Command model. * diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Document.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Document.java index b9ec3850..6a9eef76 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Document.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Document.java @@ -9,18 +9,28 @@ package com.wazuh.commandmanager.model; import org.opensearch.common.UUIDs; +import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.time.DateUtils; +import org.opensearch.common.time.FormatNames; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import java.io.IOException; +import java.time.ZonedDateTime; import java.util.List; /** Command's target fields. */ public class Document implements ToXContentObject { + private static final String DATE_FORMAT = FormatNames.DATE_TIME_NO_MILLIS.getSnakeCaseName(); + private static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern(DATE_FORMAT); + public static final String TIMESTAMP = "@timestamp"; + public static final String DELIVERY_TIMESTAMP = "delivery_timestamp"; private final Agent agent; private final Command command; private final String id; + private final ZonedDateTime timestamp; + private final ZonedDateTime deliveryTimestamp; /** * Default constructor @@ -32,6 +42,8 @@ public Document(Agent agent, Command command) { this.agent = agent; this.command = command; this.id = UUIDs.base64UUID(); + this.timestamp = DateUtils.nowWithMillisResolution(); + this.deliveryTimestamp = timestamp.plusSeconds(command.getTimeout()); } /** @@ -68,11 +80,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); this.agent.toXContent(builder, ToXContentObject.EMPTY_PARAMS); this.command.toXContent(builder, ToXContentObject.EMPTY_PARAMS); + builder.field(TIMESTAMP, DATE_FORMATTER.format(this.timestamp)); + builder.field(DELIVERY_TIMESTAMP, DATE_FORMATTER.format(this.deliveryTimestamp)); return builder.endObject(); } @Override public String toString() { - return "Document{" + "agent=" + agent + ", command=" + command + '}'; + return "Document{" + + "@timestamp=" + + timestamp + + ", delivery_timestamp=" + + deliveryTimestamp + + ", agent=" + + agent + + ", command=" + + command + + '}'; } } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Documents.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Documents.java index b41fc802..36ecd068 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Documents.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Documents.java @@ -14,8 +14,6 @@ import java.io.IOException; import java.util.ArrayList; -import com.wazuh.commandmanager.CommandManagerPlugin; - public class Documents implements ToXContentObject { private ArrayList documents; @@ -59,17 +57,22 @@ public void addDocument(Document document) { this.documents.add(document); } + /** + * Fit this object into a XContentBuilder parser, preparing it for the reply of POST /commands. + * + * @param builder XContentBuilder builder + * @param params ToXContent.EMPTY_PARAMS + * @return XContentBuilder builder with the representation of this object. + * @throws IOException parsing error. + */ @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field("_index", CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); builder.startArray("_documents"); for (Document document : this.documents) { builder.startObject(); builder.field("_id", document.getId()); builder.endObject(); } - builder.endArray(); - return builder; + return builder.endArray(); } } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/RestPostCommandAction.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/RestPostCommandAction.java index 92f6b765..fe786d67 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/RestPostCommandAction.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/RestPostCommandAction.java @@ -8,11 +8,9 @@ */ package com.wazuh.commandmanager.rest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.node.NodeClient; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; @@ -25,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.concurrent.CompletableFuture; import com.wazuh.commandmanager.CommandManagerPlugin; import com.wazuh.commandmanager.index.CommandIndex; @@ -32,7 +31,6 @@ import com.wazuh.commandmanager.model.Command; import com.wazuh.commandmanager.model.Document; import com.wazuh.commandmanager.model.Documents; -import com.wazuh.commandmanager.utils.httpclient.HttpRestClientDemo; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.rest.RestRequest.Method.POST; @@ -97,7 +95,9 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException { request.getRequestId(), request.header("Host")); - // Get request details + /// Request validation + /// ================== + /// Fail fast. if (!request.hasContent()) { // Bad request if body doesn't exist return channel -> { @@ -106,11 +106,14 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException { }; } + /// Request parsing + /// =============== + /// Retrieves and generates an array list of commands. XContentParser parser = request.contentParser(); List commands = new ArrayList<>(); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); // The array of commands is inside the "commands" JSON object. - // This line moves the parser pointer into this object. + // This line moves the parser pointer to this object. parser.nextToken(); if (parser.nextToken() == XContentParser.Token.START_ARRAY) { commands = Command.parseToArray(parser); @@ -118,6 +121,13 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException { log.error("Token does not match {}", parser.currentToken()); } + /// Commands expansion + /// ================== + /// Transforms the array of commands to orders. + /// While commands can be targeted to groups of agents, orders are targeted to individual + // agents. + /// Given a group of agents A with N agents, a total of N orders are generated. One for each + // agent. Documents documents = new Documents(); for (Command command : commands) { Document document = @@ -125,30 +135,26 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException { new Agent(List.of("groups000")), // TODO read agent from .agents index command); documents.addDocument(document); - - // Commands delivery to the Management API. - // Note: needs to be decoupled from the Rest handler (job scheduler task). - try { - String payload = - documents - .toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS) - .toString(); - SimpleHttpResponse response = - HttpRestClientDemo.runWithResponse(payload, document.getId()); - log.info("Received response to POST request with code [{}]", response.getCode()); - log.info("Raw response:\n{}", response.getBodyText()); - } catch (Exception e) { - log.error("Error reading response: {}", e.getMessage()); - } } - // Send response + /// Orders indexing + /// ================== + /// The orders are inserted into the index. + CompletableFuture bulkRequestFuture = + this.commandIndex.asyncBulkCreate(documents.getDocuments()); + + /// Send response + /// ================== + /// Reply to the request. return channel -> { - this.commandIndex - .asyncBulkCreate(documents.getDocuments()) + bulkRequestFuture .thenAccept( restStatus -> { try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field( + "_index", + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); documents.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.field("result", restStatus.name()); builder.endObject(); diff --git a/plugins/command-manager/src/main/resources/index-template-commands.json b/plugins/command-manager/src/main/resources/index-template-commands.json index 3614c17b..6c834803 100644 --- a/plugins/command-manager/src/main/resources/index-template-commands.json +++ b/plugins/command-manager/src/main/resources/index-template-commands.json @@ -6,6 +6,9 @@ "date_detection": false, "dynamic": "strict", "properties": { + "@timestamp": { + "type": "date" + }, "agent": { "properties": { "groups": { @@ -83,6 +86,9 @@ "type": "keyword" } } + }, + "delivery_timestamp": { + "type": "date" } } }, diff --git a/plugins/setup/src/main/resources/index-template-commands.json b/plugins/setup/src/main/resources/index-template-commands.json index 3614c17b..6c834803 100644 --- a/plugins/setup/src/main/resources/index-template-commands.json +++ b/plugins/setup/src/main/resources/index-template-commands.json @@ -6,6 +6,9 @@ "date_detection": false, "dynamic": "strict", "properties": { + "@timestamp": { + "type": "date" + }, "agent": { "properties": { "groups": { @@ -83,6 +86,9 @@ "type": "keyword" } } + }, + "delivery_timestamp": { + "type": "date" } } },