diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index e60340ce..d52301ba 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -75,6 +75,7 @@ public class CommandManagerPlugin extends Plugin public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands"; public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command"; public static final String JOB_INDEX_NAME = ".scheduled-commands"; + public static final String JOB_INDEX_TEMPLATE_NAME = "index-template-scheduled-commands"; public static final Integer JOB_PERIOD_MINUTES = 1; public static final Integer PAGE_SIZE = 100; public static final Long DEFAULT_TIMEOUT_SECONDS = 20L; @@ -130,6 +131,7 @@ private void scheduleCommandJob( jobDocument = JobDocument.getInstance(); CompletableFuture<IndexResponse> indexResponseCompletableFuture = jobDocument.create( + clusterService, client, threadPool, UUIDs.base64UUID(), diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java index 4024c182..8db3fd56 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java @@ -74,10 +74,12 @@ public CompletableFuture<RestStatus> asyncCreate(Document document) { try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().stashContext()) { // Create index template if it does not exist. - if (!indexTemplateExists( + if (!IndexTemplateUtils.indexTemplateExists( + this.clusterService, CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) { - putIndexTemplate( - CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME); + IndexTemplateUtils.putIndexTemplate( + this.client, + CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME); } else { log.info( "Index template {} already exists. Skipping creation.", @@ -126,9 +128,11 @@ public CompletableFuture<RestStatus> asyncBulkCreate(ArrayList<Document> documen try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().stashContext()) { // Create index template if it does not exist. - if (!indexTemplateExists( + if (!IndexTemplateUtils.indexTemplateExists( + this.clusterService, CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) { - putIndexTemplate( + IndexTemplateUtils.putIndexTemplate( + this.client, CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME); } else { log.info( @@ -166,7 +170,6 @@ public boolean indexTemplateExists(String template_name) { * @param templateName : The name if the index template to load */ public void putIndexTemplate(String templateName) { - ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE); try { // @throws IOException Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json"); diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java index cf776d39..5522c9c9 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java @@ -8,11 +8,13 @@ */ package com.wazuh.commandmanager.jobscheduler; +import com.wazuh.commandmanager.utils.IndexTemplateUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; @@ -49,7 +51,7 @@ public static JobDocument getInstance() { * @return a CompletableFuture that will hold the IndexResponse. */ public CompletableFuture<IndexResponse> create( - Client client, ThreadPool threadPool, String id, String jobName, Integer interval) { + ClusterService clusterService, Client client, ThreadPool threadPool, String id, String jobName, Integer interval) { CompletableFuture<IndexResponse> completableFuture = new CompletableFuture<>(); ExecutorService executorService = threadPool.executor(ThreadPool.Names.WRITE); CommandManagerJobParameter jobParameter = @@ -65,7 +67,14 @@ public CompletableFuture<IndexResponse> create( executorService.submit( () -> { try (ThreadContext.StoredContext ignored = - threadPool.getThreadContext().stashContext()) { + threadPool.getThreadContext().stashContext()) { + if (!IndexTemplateUtils.indexTemplateExists(clusterService,CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME)) { + IndexTemplateUtils.putIndexTemplate(client, CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME); + } else { + log.info( + "Index template {} already exists. Skipping creation.", + CommandManagerPlugin.JOB_INDEX_NAME); + } IndexResponse indexResponse = client.index(indexRequest).actionGet(); completableFuture.complete(indexResponse); } catch (Exception e) { diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java index 409b133c..6af5616a 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java @@ -8,6 +8,14 @@ */ package com.wazuh.commandmanager.utils; +import com.wazuh.commandmanager.index.CommandIndex; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -15,12 +23,14 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Map; import reactor.util.annotation.NonNull; /** Util functions to parse and manage index templates files. */ public class IndexTemplateUtils { + private static final Logger log = LogManager.getLogger(IndexTemplateUtils.class); /** Default constructor */ public IndexTemplateUtils() {} @@ -70,4 +80,46 @@ public static Map<String, Object> toMap(InputStream is) throws IOException { public static Map<String, Object> get(Map<String, Object> map, String key) { return (Map<String, Object>) map.get(key); } + + /** + * Checks for the existence of the given index template in the cluster. + * + * @param clusterService The cluster service used to check the node's existence + * @param templateName index template name within the resources folder + * @return whether the index template exists. + */ + public static boolean indexTemplateExists(ClusterService clusterService, String templateName) { + Map<String, IndexTemplateMetadata> templates = + clusterService.state().metadata().templates(); + log.debug("Existing index templates: {} ", templates); + + return templates.containsKey(templateName); + } + + /** + * Inserts an index template + * @param templateName : The name if the index template to load + */ + public static void putIndexTemplate(Client client, String templateName) { + try { + // @throws IOException + Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json"); + + PutIndexTemplateRequest putIndexTemplateRequest = + new PutIndexTemplateRequest() + .mapping(IndexTemplateUtils.get(template, "mappings")) + .settings(IndexTemplateUtils.get(template, "settings")) + .name(templateName) + .patterns((List<String>) template.get("index_patterns")); + + AcknowledgedResponse acknowledgedResponse = + client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet(); + if (acknowledgedResponse.isAcknowledged()) { + log.info("Index template [{}] created successfully", templateName); + } + + } catch (IOException e) { + log.error("Error reading index template [{}] from filesystem", templateName); + } + } } diff --git a/plugins/command-manager/src/main/resources/index-template-scheduled-commands.json b/plugins/command-manager/src/main/resources/index-template-scheduled-commands.json new file mode 100644 index 00000000..232dbe73 --- /dev/null +++ b/plugins/command-manager/src/main/resources/index-template-scheduled-commands.json @@ -0,0 +1,51 @@ +{ + "index_patterns": [ + ".scheduled-commands" + ], + "mappings": { + "dynamic": "strict", + "properties": { + "name": { + "type": "keyword" + }, + "enabled": { + "type": "boolean" + }, + "schedule": { + "properties": { + "interval": { + "properties": { + "start_time": { + "type": "date", + "format": "epoch_millis" + }, + "period": { + "type": "integer" + }, + "unit": { + "type": "keyword" + } + } + } + } + }, + "enabled_time": { + "type": "date", + "format": "epoch_millis" + }, + "last_update_time": { + "type": "date", + "format": "epoch_millis" + } + } + }, + "order": 1, + "settings": { + "index": { + "hidden": true, + "number_of_replicas": "0", + "number_of_shards": "1", + "refresh_interval": "5s" + } + } +}