Skip to content

Commit

Permalink
Implement timeseries index model to command-manager plugin (#153)
Browse files Browse the repository at this point in the history
* Update commands index template

Add @timestamp and delivery_timestamp fields

* Add timestamp and delivery_timestamp attributes to Command model

* Move timestamp and deliveryTimestamp from Command to Document

Add getter function for the Command attrbiture timeout

Use native System.currentTimeMillis() to get current timestamp

* Update command index template

Move delivery_timestamp to top-level

* Fix typo on field names

* Implement timestamp and delivery_timestamp constants attributes for Document model

Add missing 'this' on timeout getter

* Update timestamp and deliveryTimestamp to be of type ZonedDateTime

Implement OpenSearch DateUtils

* Implement OpensSearch date_time_no_millis date format pattern

* Fix errors

---------

Co-authored-by: Álex Ruiz <[email protected]>
  • Loading branch information
QU3B1M and AlexRuiz7 authored Dec 3, 2024
1 parent 8e54428 commit 7f24209
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public Command(
this.status = Status.PENDING;
}

/**
* Retrieves the timeout value for this command.
*
* @return the timeout value in milliseconds.
*/
public Integer getTimeout() {
return this.timeout;
}

/**
* Parses the request's payload into the Command model.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,28 @@
package com.wazuh.commandmanager.model;

import org.opensearch.common.UUIDs;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.time.DateUtils;
import org.opensearch.common.time.FormatNames;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;

/** Command's target fields. */
public class Document implements ToXContentObject {
private static final String DATE_FORMAT = FormatNames.DATE_TIME_NO_MILLIS.getSnakeCaseName();
private static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern(DATE_FORMAT);
public static final String TIMESTAMP = "@timestamp";
public static final String DELIVERY_TIMESTAMP = "delivery_timestamp";
private final Agent agent;
private final Command command;
private final String id;
private final ZonedDateTime timestamp;
private final ZonedDateTime deliveryTimestamp;

/**
* Default constructor
Expand All @@ -32,6 +42,8 @@ public Document(Agent agent, Command command) {
this.agent = agent;
this.command = command;
this.id = UUIDs.base64UUID();
this.timestamp = DateUtils.nowWithMillisResolution();
this.deliveryTimestamp = timestamp.plusSeconds(command.getTimeout());
}

/**
Expand Down Expand Up @@ -68,11 +80,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
this.agent.toXContent(builder, ToXContentObject.EMPTY_PARAMS);
this.command.toXContent(builder, ToXContentObject.EMPTY_PARAMS);
builder.field(TIMESTAMP, DATE_FORMATTER.format(this.timestamp));
builder.field(DELIVERY_TIMESTAMP, DATE_FORMATTER.format(this.deliveryTimestamp));
return builder.endObject();
}

@Override
public String toString() {
return "Document{" + "agent=" + agent + ", command=" + command + '}';
return "Document{"
+ "@timestamp="
+ timestamp
+ ", delivery_timestamp="
+ deliveryTimestamp
+ ", agent="
+ agent
+ ", command="
+ command
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import java.io.IOException;
import java.util.ArrayList;

import com.wazuh.commandmanager.CommandManagerPlugin;

public class Documents implements ToXContentObject {
private ArrayList<Document> documents;

Expand Down Expand Up @@ -59,17 +57,22 @@ public void addDocument(Document document) {
this.documents.add(document);
}

/**
* Fit this object into a XContentBuilder parser, preparing it for the reply of POST /commands.
*
* @param builder XContentBuilder builder
* @param params ToXContent.EMPTY_PARAMS
* @return XContentBuilder builder with the representation of this object.
* @throws IOException parsing error.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("_index", CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME);
builder.startArray("_documents");
for (Document document : this.documents) {
builder.startObject();
builder.field("_id", document.getId());
builder.endObject();
}
builder.endArray();
return builder;
return builder.endArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
*/
package com.wazuh.commandmanager.rest;

import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -25,14 +23,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.index.CommandIndex;
import com.wazuh.commandmanager.model.Agent;
import com.wazuh.commandmanager.model.Command;
import com.wazuh.commandmanager.model.Document;
import com.wazuh.commandmanager.model.Documents;
import com.wazuh.commandmanager.utils.httpclient.HttpRestClientDemo;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -97,7 +95,9 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException {
request.getRequestId(),
request.header("Host"));

// Get request details
/// Request validation
/// ==================
/// Fail fast.
if (!request.hasContent()) {
// Bad request if body doesn't exist
return channel -> {
Expand All @@ -106,49 +106,55 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException {
};
}

/// Request parsing
/// ===============
/// Retrieves and generates an array list of commands.
XContentParser parser = request.contentParser();
List<Command> commands = new ArrayList<>();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
// The array of commands is inside the "commands" JSON object.
// This line moves the parser pointer into this object.
// This line moves the parser pointer to this object.
parser.nextToken();
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
commands = Command.parseToArray(parser);
} else {
log.error("Token does not match {}", parser.currentToken());
}

/// Commands expansion
/// ==================
/// Transforms the array of commands to orders.
/// While commands can be targeted to groups of agents, orders are targeted to individual
// agents.
/// Given a group of agents A with N agents, a total of N orders are generated. One for each
// agent.
Documents documents = new Documents();
for (Command command : commands) {
Document document =
new Document(
new Agent(List.of("groups000")), // TODO read agent from .agents index
command);
documents.addDocument(document);

// Commands delivery to the Management API.
// Note: needs to be decoupled from the Rest handler (job scheduler task).
try {
String payload =
documents
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)
.toString();
SimpleHttpResponse response =
HttpRestClientDemo.runWithResponse(payload, document.getId());
log.info("Received response to POST request with code [{}]", response.getCode());
log.info("Raw response:\n{}", response.getBodyText());
} catch (Exception e) {
log.error("Error reading response: {}", e.getMessage());
}
}

// Send response
/// Orders indexing
/// ==================
/// The orders are inserted into the index.
CompletableFuture<RestStatus> bulkRequestFuture =
this.commandIndex.asyncBulkCreate(documents.getDocuments());

/// Send response
/// ==================
/// Reply to the request.
return channel -> {
this.commandIndex
.asyncBulkCreate(documents.getDocuments())
bulkRequestFuture
.thenAccept(
restStatus -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field(
"_index",
CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME);
documents.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.field("result", restStatus.name());
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"date_detection": false,
"dynamic": "strict",
"properties": {
"@timestamp": {
"type": "date"
},
"agent": {
"properties": {
"groups": {
Expand Down Expand Up @@ -83,6 +86,9 @@
"type": "keyword"
}
}
},
"delivery_timestamp": {
"type": "date"
}
}
},
Expand Down
6 changes: 6 additions & 0 deletions plugins/setup/src/main/resources/index-template-commands.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"date_detection": false,
"dynamic": "strict",
"properties": {
"@timestamp": {
"type": "date"
},
"agent": {
"properties": {
"groups": {
Expand Down Expand Up @@ -83,6 +86,9 @@
"type": "keyword"
}
}
},
"delivery_timestamp": {
"type": "date"
}
}
},
Expand Down

0 comments on commit 7f24209

Please sign in to comment.