Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create index-template-commands on POST request #83

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@
import java.util.List;
import java.util.function.Supplier;


/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to
* receive raw commands from the Wazuh Server. These commands are processed,
* indexed and sent back to the Server for its delivery to, in most cases, the
* Agents.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_commandmanager";
public static final String COMMAND_MANAGER_INDEX_NAME = "command-manager";
public static final String COMMAND_MANAGER_INDEX_NAME = ".commands";
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";

private CommandIndex commandIndex;
private ThreadPool threadPool;

@Override
public Collection<Object> createComponents(
Expand All @@ -57,8 +62,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.commandIndex = new CommandIndex(client);
this.threadPool = threadPool;
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
return Collections.emptyList();
}

Expand All @@ -71,6 +75,6 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return Collections.singletonList(new RestPostCommandAction(this.commandIndex, this.threadPool));
return Collections.singletonList(new RestPostCommandAction(this.commandIndex));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,103 +9,133 @@

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.Command;
import com.wazuh.commandmanager.utils.IndexTemplateUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

public class CommandIndex implements IndexingOperationListener {

private static final Logger logger = LogManager.getLogger(CommandIndex.class);

private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

/**
* @param client
* Default constructor
*
* @param client OpenSearch client.
* @param clusterService OpenSearch cluster service.
* @param threadPool An OpenSearch ThreadPool.
*/
public CommandIndex(Client client) {
public CommandIndex(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

/**
* @param command a Command class command
* @return Indexing operation RestStatus response
* @throws ExecutionException
* @param command: A Command model object
* @return A CompletableFuture with the RestStatus response from the operation
*/
public RestStatus create(Command command) throws ExecutionException, InterruptedException {
CompletableFuture<IndexResponse> inProgressFuture = new CompletableFuture<>();
public CompletableFuture<RestStatus> asyncCreate(Command command) {
CompletableFuture<RestStatus> future = new CompletableFuture<>();
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);

// Create index template if it does not exist.
if (!indexTemplateExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
logger.info(
"Index template {} already exists. Skipping creation.",
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME
);
}

logger.debug("Indexing command {}", command);
try {
logger.info("Creating request for command: {}", command.getId());
IndexRequest request = new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(command.getId())
.create(true);

client.index(
request,
new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
inProgressFuture.complete(indexResponse);
}

@Override
public void onFailure(Exception e) {
logger.info("Could not process command: {}", command.getId(), e);
inProgressFuture.completeExceptionally(e);
executor.submit(
() -> {
try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().stashContext()) {
RestStatus restStatus = client.index(request).actionGet().status();
future.complete(restStatus);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
);
} catch (IOException e) {
logger.error("IOException occurred creating command details", e);
logger.error(
"Failed to index command with ID {}: {}", command.getId(), e);
}
return inProgressFuture.get().status();
return future;
}

/**
*
* @param command: A Command model object
* @param threadPool: An OpenSearch ThreadPool as passed to the createComponents() method
* @return A CompletableFuture with the RestStatus response from the operation
* @return
*/
public boolean indexTemplateExists(String template_name) {
Map<String, IndexTemplateMetadata> templates = this.clusterService
.state()
.metadata()
.templates();
logger.debug("Existing index templates: {} ", templates);

public CompletableFuture<RestStatus> asyncCreate(Command command, ThreadPool threadPool) {
CompletableFuture<RestStatus> future = new CompletableFuture<>();
ExecutorService executor = threadPool.executor(ThreadPool.Names.WRITE);
return templates.containsKey(template_name);
}

/**
* Inserts an index template
*
* @param templateName : The name if the index template to load
*/
public void putIndexTemplate(String templateName) {
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);
try {
IndexRequest request = new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(command.getId())
.create(true);
executor.submit(
() -> {
try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
RestStatus restStatus = client.index(request).actionGet().status();
future.complete(restStatus);
} catch (Exception e) {
future.completeExceptionally(e);
}
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");

PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest()
.mapping(IndexTemplateUtils.get(template, "mappings"))
.settings(IndexTemplateUtils.get(template, "settings"))
.name(templateName)
.patterns((List<String>) template.get("index_patterns"));

executor.submit(() -> {
AcknowledgedResponse acknowledgedResponse = this.client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
if (acknowledgedResponse.isAcknowledged()) {
logger.info(
"Index template created successfully: {}",
templateName
);
}
);
} catch (Exception e) {
logger.error(e);
});

} catch (IOException e) {
logger.error("Error reading index template from filesystem {}", templateName);
}
return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(VERSION, this.version);
return builder.endObject();
}

@Override
public String toString() {
return "Action{" +
"type='" + type + '\'' +
", args=" + args +
", version='" + version + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package com.wazuh.commandmanager.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.UUIDs;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
Expand All @@ -19,7 +21,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

public class Command implements ToXContentObject {

public static final String NAME = "command";
public static final String ORDER_ID = "order_id";
public static final String REQUEST_ID = "request_id";
public static final String SOURCE = "source";
Expand Down Expand Up @@ -85,10 +87,12 @@ public static Command parse(XContentParser parser) throws IOException {
String user = null;
Action action = null;

// @TODO check if this call is necessary as ensureExpectedToken is invoked previously
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
// skips JSON's root level "command"
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();

parser.nextToken();
switch (fieldName) {
case SOURCE:
Expand Down Expand Up @@ -132,6 +136,7 @@ public static Command parse(XContentParser parser) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

builder.startObject(NAME);
builder.field(SOURCE, this.source);
builder.field(USER, this.user);
builder.field(TARGET, this.target);
Expand All @@ -141,6 +146,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS, this.status);
builder.field(ORDER_ID, this.orderId);
builder.field(REQUEST_ID, this.requestId);
builder.endObject();

return builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -39,17 +38,14 @@ public class RestPostCommandAction extends BaseRestHandler {
public static final String POST_COMMAND_ACTION_REQUEST_DETAILS = "post_command_action_request_details";
private static final Logger logger = LogManager.getLogger(RestPostCommandAction.class);
private final CommandIndex commandIndex;
private final ThreadPool threadPool;

/**
* Default constructor
*
* @param commandIndex persistence layer
* @param threadPool
*/
public RestPostCommandAction(CommandIndex commandIndex, ThreadPool threadPool) {
public RestPostCommandAction(CommandIndex commandIndex) {
this.commandIndex = commandIndex;
this.threadPool = threadPool;

}

Expand All @@ -60,21 +56,21 @@ public String getName() {
@Override
public List<Route> routes() {
return Collections.singletonList(
new Route(
POST,
String.format(
Locale.ROOT,
"%s",
CommandManagerPlugin.COMMAND_MANAGER_BASE_URI
new Route(
POST,
String.format(
Locale.ROOT,
"%s",
CommandManagerPlugin.COMMAND_MANAGER_BASE_URI
)
)
)
);
}

@Override
protected RestChannelConsumer prepareRequest(
final RestRequest restRequest,
final NodeClient client
final RestRequest restRequest,
final NodeClient client
) throws IOException {
// Get request details
XContentParser parser = restRequest.contentParser();
Expand All @@ -84,7 +80,7 @@ protected RestChannelConsumer prepareRequest(

// Send response
return channel -> {
commandIndex.asyncCreate(command, this.threadPool)
this.commandIndex.asyncCreate(command)
.thenAccept(restStatus -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
Expand All @@ -94,7 +90,7 @@ protected RestChannelConsumer prepareRequest(
builder.endObject();
channel.sendResponse(new BytesRestResponse(restStatus, builder));
} catch (Exception e) {
logger.error("Error indexing command: ",e);
logger.error("Error indexing command: ", e);
}
}).exceptionally(e -> {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
Expand Down
Loading