Skip to content

Commit

Permalink
Merge branch 'master' into enhancement/155-sort-commands-by-delivery-…
Browse files Browse the repository at this point in the history
…time
  • Loading branch information
f-galland authored Dec 9, 2024
2 parents 1478f92 + a17d08a commit 2c351d1
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class CommandManagerPlugin extends Plugin
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";
public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command";
public static final String JOB_INDEX_NAME = ".scheduled-commands";
public static final String JOB_INDEX_TEMPLATE_NAME = "index-template-scheduled-commands";
public static final Integer JOB_PERIOD_MINUTES = 1;
public static final Integer PAGE_SIZE = 100;
public static final Long DEFAULT_TIMEOUT_SECONDS = 20L;
Expand Down Expand Up @@ -130,6 +131,7 @@ private void scheduleCommandJob(
jobDocument = JobDocument.getInstance();
CompletableFuture<IndexResponse> indexResponseCompletableFuture =
jobDocument.create(
clusterService,
client,
threadPool,
UUIDs.base64UUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ public CompletableFuture<RestStatus> asyncCreate(Document document) {
try (ThreadContext.StoredContext ignored =
this.threadPool.getThreadContext().stashContext()) {
// Create index template if it does not exist.
if (!indexTemplateExists(
if (!IndexTemplateUtils.indexTemplateExists(
this.clusterService,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
IndexTemplateUtils.putIndexTemplate(
this.client,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
log.info(
"Index template {} already exists. Skipping creation.",
Expand Down Expand Up @@ -126,9 +128,11 @@ public CompletableFuture<RestStatus> asyncBulkCreate(ArrayList<Document> documen
try (ThreadContext.StoredContext ignored =
this.threadPool.getThreadContext().stashContext()) {
// Create index template if it does not exist.
if (!indexTemplateExists(
if (!IndexTemplateUtils.indexTemplateExists(
this.clusterService,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(
IndexTemplateUtils.putIndexTemplate(
this.client,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
log.info(
Expand Down Expand Up @@ -166,7 +170,6 @@ public boolean indexTemplateExists(String template_name) {
* @param templateName : The name if the index template to load
*/
public void putIndexTemplate(String templateName) {
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);
try {
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
*/
package com.wazuh.commandmanager.jobscheduler;

import com.wazuh.commandmanager.utils.IndexTemplateUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
Expand Down Expand Up @@ -49,7 +51,7 @@ public static JobDocument getInstance() {
* @return a CompletableFuture that will hold the IndexResponse.
*/
public CompletableFuture<IndexResponse> create(
Client client, ThreadPool threadPool, String id, String jobName, Integer interval) {
ClusterService clusterService, Client client, ThreadPool threadPool, String id, String jobName, Integer interval) {
CompletableFuture<IndexResponse> completableFuture = new CompletableFuture<>();
ExecutorService executorService = threadPool.executor(ThreadPool.Names.WRITE);
CommandManagerJobParameter jobParameter =
Expand All @@ -65,7 +67,14 @@ public CompletableFuture<IndexResponse> create(
executorService.submit(
() -> {
try (ThreadContext.StoredContext ignored =
threadPool.getThreadContext().stashContext()) {
threadPool.getThreadContext().stashContext()) {
if (!IndexTemplateUtils.indexTemplateExists(clusterService,CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME)) {
IndexTemplateUtils.putIndexTemplate(client, CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME);
} else {
log.info(
"Index template {} already exists. Skipping creation.",
CommandManagerPlugin.JOB_INDEX_NAME);
}
IndexResponse indexResponse = client.index(indexRequest).actionGet();
completableFuture.complete(indexResponse);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
*/
public class SearchThread implements Runnable {
public static final String COMMAND_STATUS_FIELD =
Command.COMMAND + "." + Command.STATUS + ".keyword";
Command.COMMAND + "." + Command.STATUS;
public static final String COMMAND_ORDER_ID_FIELD =
Command.COMMAND + "." + Command.ORDER_ID + ".keyword";
Command.COMMAND + "." + Command.ORDER_ID;
public static final String COMMAND_TIMEOUT_FIELD = Command.COMMAND + "." + Command.TIMEOUT;
public static final String DELIVERY_TIMESTAMP_FIELD = Document.DELIVERY_TIMESTAMP;
private static final Logger log = LogManager.getLogger(SearchThread.class);
Expand All @@ -81,14 +81,24 @@ public SearchThread(Client client) {
*/
public static <T> T getNestedObject(Map<String, Object> map, String key, Class<T> type) {
Object value = map.get(key);
if (value == null) {
return null;
}
if (type.isInstance(value)) {
// Make a defensive copy for supported types like Map or List
if (value instanceof Map) {
return type.cast(new HashMap<>((Map<?, ?>) value));
} else if (value instanceof List) {
return type.cast(new ArrayList<>((List<?>) value));
}
// Return the value directly if it is immutable (e.g., String, Integer)
return type.cast(value);
} else {
throw new ClassCastException(
"Expected "
+ type
+ " but found "
+ (value != null ? value.getClass() : "null"));
"Expected "
+ type.getName()
+ " but found "
+ value.getClass().getName());
}
}

Expand All @@ -103,16 +113,13 @@ public static <T> T getNestedObject(Map<String, Object> map, String key, Class<T
public void handlePage(SearchResponse searchResponse) throws IllegalStateException {
SearchHits searchHits = searchResponse.getHits();
ArrayList<Object> orders = new ArrayList<>();

for (SearchHit hit : searchHits) {
// Create a JSON representation of each hit and add it to the orders array.
Map<String, Object> orderMap =
getNestedObject(hit.getSourceAsMap(), Command.COMMAND, Map.class);
// Add document id to the object.
orderMap.put("document_id", hit.getId());
orders.add(orderMap);
Map<String, Object> orderMap = getNestedObject(hit.getSourceAsMap(), Command.COMMAND, Map.class);
if (orderMap != null) {
orderMap.put("document_id", hit.getId());
orders.add(orderMap);
}
}

String payload = null;
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
payload = builder.map(Collections.singletonMap("orders", orders)).toString();
Expand Down Expand Up @@ -162,21 +169,21 @@ private SimpleHttpResponse deliverOrders(String orders) {
@SuppressWarnings("unchecked")
private void setSentStatus(SearchHit hit) throws IllegalStateException {
Map<String, Object> commandMap =
getNestedObject(
hit.getSourceAsMap(),
CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME,
Map.class);
getNestedObject(
hit.getSourceAsMap(),
CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME,
Map.class);
commandMap.put(Command.STATUS, Status.SENT);
hit.getSourceAsMap()
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(hit.getSourceAsMap())
.id(hit.getId());
new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(hit.getSourceAsMap())
.id(hit.getId());
this.client
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@
*/
package com.wazuh.commandmanager.utils;

import com.wazuh.commandmanager.index.CommandIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;

import reactor.util.annotation.NonNull;

/** Util functions to parse and manage index templates files. */
public class IndexTemplateUtils {
private static final Logger log = LogManager.getLogger(IndexTemplateUtils.class);

/** Default constructor */
public IndexTemplateUtils() {}
Expand Down Expand Up @@ -70,4 +80,46 @@ public static Map<String, Object> toMap(InputStream is) throws IOException {
public static Map<String, Object> get(Map<String, Object> map, String key) {
return (Map<String, Object>) map.get(key);
}

/**
* Checks for the existence of the given index template in the cluster.
*
* @param clusterService The cluster service used to check the node's existence
* @param templateName index template name within the resources folder
* @return whether the index template exists.
*/
public static boolean indexTemplateExists(ClusterService clusterService, String templateName) {
Map<String, IndexTemplateMetadata> templates =
clusterService.state().metadata().templates();
log.debug("Existing index templates: {} ", templates);

return templates.containsKey(templateName);
}

/**
* Inserts an index template
* @param templateName : The name if the index template to load
*/
public static void putIndexTemplate(Client client, String templateName) {
try {
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");

PutIndexTemplateRequest putIndexTemplateRequest =
new PutIndexTemplateRequest()
.mapping(IndexTemplateUtils.get(template, "mappings"))
.settings(IndexTemplateUtils.get(template, "settings"))
.name(templateName)
.patterns((List<String>) template.get("index_patterns"));

AcknowledgedResponse acknowledgedResponse =
client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
if (acknowledgedResponse.isAcknowledged()) {
log.info("Index template [{}] created successfully", templateName);
}

} catch (IOException e) {
log.error("Error reading index template [{}] from filesystem", templateName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"index_patterns": [
".scheduled-commands"
],
"mappings": {
"dynamic": "strict",
"properties": {
"name": {
"type": "keyword"
},
"enabled": {
"type": "boolean"
},
"schedule": {
"properties": {
"interval": {
"properties": {
"start_time": {
"type": "date",
"format": "epoch_millis"
},
"period": {
"type": "integer"
},
"unit": {
"type": "keyword"
}
}
}
}
},
"enabled_time": {
"type": "date",
"format": "epoch_millis"
},
"last_update_time": {
"type": "date",
"format": "epoch_millis"
}
}
},
"order": 1,
"settings": {
"index": {
"hidden": true,
"number_of_replicas": "0",
"number_of_shards": "1",
"refresh_interval": "5s"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public WazuhIndices(Client client, ClusterService clusterService) {
this.indexTemplates.put("index-template-agent", ".agents");
this.indexTemplates.put("index-template-alerts", "wazuh-alerts-5.x-0001");
this.indexTemplates.put("index-template-commands", ".commands");
this.indexTemplates.put("index-template-scheduled-commands", ".scheduled-commands");
this.indexTemplates.put("index-template-fim", "wazuh-states-fim");
this.indexTemplates.put("index-template-hardware", "wazuh-states-inventory-hardware");
this.indexTemplates.put("index-template-hotfixes", "wazuh-states-inventory-hotfixes");
Expand Down
8 changes: 8 additions & 0 deletions plugins/setup/src/main/resources/index-template-ports.json
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@
}
}
},
"interface": {
"properties": {
"state": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"network": {
"properties": {
"protocol": {
Expand Down
12 changes: 12 additions & 0 deletions plugins/setup/src/main/resources/index-template-processes.json
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@
}
}
},
"tty": {
"properties": {
"char_device": {
"properties": {
"major": {
"type": "long"
}
}
}
},
"type": "object"
},
"user": {
"properties": {
"id": {
Expand Down
Loading

0 comments on commit 2c351d1

Please sign in to comment.