Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement commands index template creation #82

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f8ff61d
Adding POST endpoint
f-galland Sep 18, 2024
e44bfce
Merge branch 'master' into 69-create-command-manager-endpoint
AlexRuiz7 Sep 19, 2024
cb47c41
Revert guava dependency
AlexRuiz7 Sep 19, 2024
789fae1
Add required fields if not present in the request
f-galland Sep 19, 2024
05e1a0b
Adding PostCommandRequest class to validate input
f-galland Sep 19, 2024
117b5c0
Adding CommandDetails model class
f-galland Sep 19, 2024
b7b28d3
Adding CommandManagerService class to handle CRUD operations
f-galland Sep 19, 2024
49db3b7
Rewrite prepareRequest following opensearch's common practices
f-galland Sep 19, 2024
fbe7360
Remove unused imports
f-galland Sep 19, 2024
3491748
Change POST endpoint to /_plugins/_commandmanager
f-galland Sep 19, 2024
e5e9714
Instantiating commandManagerService from CommandManagerPlugin class
f-galland Sep 19, 2024
4755141
Removing update functionality
f-galland Sep 20, 2024
2675b9c
Generate order and request ids randomly
f-galland Sep 20, 2024
7b81541
Make the document _id be a concatenation of the orderId and requestId…
f-galland Sep 20, 2024
73328d1
Refactor
AlexRuiz7 Sep 23, 2024
4206794
Merge branch 'master' into 69-create-command-manager-endpoint
AlexRuiz7 Sep 23, 2024
434f4ba
Merge branch 'master' into 69-create-command-manager-endpoint
AlexRuiz7 Sep 24, 2024
5e597e8
More refactor
AlexRuiz7 Sep 25, 2024
64d3d45
Merge branch '69-create-command-manager-endpoint' of github.com:wazuh…
AlexRuiz7 Sep 25, 2024
698bb71
Remove unused imports
AlexRuiz7 Sep 25, 2024
e3dd5bb
Go back to using an ActionListener to create a document.
f-galland Sep 25, 2024
5505f70
Change Action.args to list of strings
AlexRuiz7 Sep 26, 2024
0d0901a
Return the operation's RestStatus
f-galland Sep 26, 2024
89bb6f5
Add logging
f-galland Sep 26, 2024
7b1988c
Add yaml REST tests
f-galland Sep 26, 2024
6379bee
Replace document's ID with UUID
AlexRuiz7 Sep 27, 2024
3408112
Make threadpool accessible to RestPostCommandAction
f-galland Sep 27, 2024
48630ab
Make document creation threaded
f-galland Sep 27, 2024
41ba7c3
Add a proper status reply
f-galland Sep 27, 2024
84f4f55
Rename old create() method
f-galland Sep 27, 2024
1278e38
Create index-template-commands on POST request
AlexRuiz7 Sep 30, 2024
01158d2
Merge branch 'master' into 42-implement-commands-index-template-creation
AlexRuiz7 Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@
import java.util.List;
import java.util.function.Supplier;


/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to
* receive raw commands from the Wazuh Server. These commands are processed,
* indexed and sent back to the Server for its delivery to, in most cases, the
* Agents.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_commandmanager";
public static final String COMMAND_MANAGER_INDEX_NAME = "command-manager";
public static final String COMMAND_MANAGER_INDEX_NAME = ".commands";
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";

private CommandIndex commandIndex;
private ThreadPool threadPool;
Expand All @@ -57,8 +63,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.commandIndex = new CommandIndex(client);
this.threadPool = threadPool;
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
return Collections.emptyList();
}

Expand All @@ -71,6 +76,6 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return Collections.singletonList(new RestPostCommandAction(this.commandIndex, this.threadPool));
return Collections.singletonList(new RestPostCommandAction(this.commandIndex));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,103 +9,132 @@

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.Command;
import com.wazuh.commandmanager.utils.IndexTemplateUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

public class CommandIndex implements IndexingOperationListener {

private static final Logger logger = LogManager.getLogger(CommandIndex.class);

private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

/**
* @param client
* Default constructor
*
* @param client OpenSearch client.
* @param clusterService OpenSearch cluster service.
* @param threadPool An OpenSearch ThreadPool.
*/
public CommandIndex(Client client) {
public CommandIndex(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

/**
* @param command a Command class command
* @return Indexing operation RestStatus response
* @throws ExecutionException
* @param command: A Command model object
* @return A CompletableFuture with the RestStatus response from the operation
*/
public RestStatus create(Command command) throws ExecutionException, InterruptedException {
CompletableFuture<IndexResponse> inProgressFuture = new CompletableFuture<>();
public CompletableFuture<RestStatus> asyncCreate(Command command) {
CompletableFuture<RestStatus> future = new CompletableFuture<>();
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);

// Create index template if it does not exist.
if (!indexTemplateExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
logger.info(String.format(
"Index template %s already exists. Skipping creation.",
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME
));
}

logger.debug("Indexing command {}", command);
try {
logger.info("Creating request for command: {}", command.getId());
IndexRequest request = new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(command.getId())
.create(true);

client.index(
request,
new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
inProgressFuture.complete(indexResponse);
}

@Override
public void onFailure(Exception e) {
logger.info("Could not process command: {}", command.getId(), e);
inProgressFuture.completeExceptionally(e);
executor.submit(
() -> {
try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().stashContext()) {
RestStatus restStatus = client.index(request).actionGet().status();
future.complete(restStatus);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
);
} catch (IOException e) {
logger.error("IOException occurred creating command details", e);
logger.error(e);
}
return inProgressFuture.get().status();
return future;
}

/**
*
* @param command: A Command model object
* @param threadPool: An OpenSearch ThreadPool as passed to the createComponents() method
* @return A CompletableFuture with the RestStatus response from the operation
* @return
*/
public boolean indexTemplateExists(String template_name) {
Map<String, IndexTemplateMetadata> templates = this.clusterService
.state()
.metadata()
.templates();
logger.debug("Existing index templates: {} ", templates);

public CompletableFuture<RestStatus> asyncCreate(Command command, ThreadPool threadPool) {
CompletableFuture<RestStatus> future = new CompletableFuture<>();
ExecutorService executor = threadPool.executor(ThreadPool.Names.WRITE);
return templates.containsKey(template_name);
}

/**
* Inserts an index template
*
* @param templateName : The name if the index template to load
*/
public void putIndexTemplate(String templateName) {
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);
try {
IndexRequest request = new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(command.getId())
.create(true);
executor.submit(
() -> {
try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
RestStatus restStatus = client.index(request).actionGet().status();
future.complete(restStatus);
} catch (Exception e) {
future.completeExceptionally(e);
}
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");

PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest()
.mapping(IndexTemplateUtils.get(template, "mappings"))
.settings(IndexTemplateUtils.get(template, "settings"))
.name(templateName)
.patterns((List<String>) template.get("index_patterns"));

executor.submit(() -> {
AcknowledgedResponse acknowledgedResponse = this.client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
if (acknowledgedResponse.isAcknowledged()) {
logger.info(
"Index template created successfully: {}",
templateName
);
}
);
} catch (Exception e) {
logger.error(e);
});

} catch (IOException e) {
logger.error("Error reading index template from filesystem {}", templateName);
}
return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(VERSION, this.version);
return builder.endObject();
}

@Override
public String toString() {
return "Action{" +
"type='" + type + '\'' +
", args=" + args +
", version='" + version + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

public class Command implements ToXContentObject {

public static final String NAME = "command";
public static final String ORDER_ID = "order_id";
public static final String REQUEST_ID = "request_id";
public static final String SOURCE = "source";
Expand Down Expand Up @@ -85,8 +85,9 @@ public static Command parse(XContentParser parser) throws IOException {
String user = null;
Action action = null;

// @TODO check if this call is necessary as ensureExpectedToken is invoked previously
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
// skips JSON's root level "command"
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
Expand Down Expand Up @@ -132,6 +133,7 @@ public static Command parse(XContentParser parser) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

builder.startObject(NAME);
builder.field(SOURCE, this.source);
builder.field(USER, this.user);
builder.field(TARGET, this.target);
Expand All @@ -141,6 +143,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS, this.status);
builder.field(ORDER_ID, this.orderId);
builder.field(REQUEST_ID, this.requestId);
builder.endObject();

return builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -39,18 +38,14 @@ public class RestPostCommandAction extends BaseRestHandler {
public static final String POST_COMMAND_ACTION_REQUEST_DETAILS = "post_command_action_request_details";
private static final Logger logger = LogManager.getLogger(RestPostCommandAction.class);
private final CommandIndex commandIndex;
private final ThreadPool threadPool;

/**
* Default constructor
*
* @param commandIndex persistence layer
* @param threadPool
*/
public RestPostCommandAction(CommandIndex commandIndex, ThreadPool threadPool) {
public RestPostCommandAction(CommandIndex commandIndex) {
this.commandIndex = commandIndex;
this.threadPool = threadPool;

}

public String getName() {
Expand All @@ -60,21 +55,21 @@ public String getName() {
@Override
public List<Route> routes() {
return Collections.singletonList(
new Route(
POST,
String.format(
Locale.ROOT,
"%s",
CommandManagerPlugin.COMMAND_MANAGER_BASE_URI
new Route(
POST,
String.format(
Locale.ROOT,
"%s",
CommandManagerPlugin.COMMAND_MANAGER_BASE_URI
)
)
)
);
}

@Override
protected RestChannelConsumer prepareRequest(
final RestRequest restRequest,
final NodeClient client
final RestRequest restRequest,
final NodeClient client
) throws IOException {
// Get request details
XContentParser parser = restRequest.contentParser();
Expand All @@ -84,7 +79,7 @@ protected RestChannelConsumer prepareRequest(

// Send response
return channel -> {
commandIndex.asyncCreate(command, this.threadPool)
this.commandIndex.asyncCreate(command)
.thenAccept(restStatus -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
Expand All @@ -94,7 +89,7 @@ protected RestChannelConsumer prepareRequest(
builder.endObject();
channel.sendResponse(new BytesRestResponse(restStatus, builder));
} catch (Exception e) {
logger.error("Error indexing command: ",e);
logger.error("Error indexing command: ", e);
}
}).exceptionally(e -> {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
Expand Down
Loading