Skip to content

Commit

Permalink
Creating scheduled commands index template upon job startup
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Dec 6, 2024
1 parent 2ba70f5 commit 7434a9c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class CommandManagerPlugin extends Plugin
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";
public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command";
public static final String JOB_INDEX_NAME = ".scheduled-commands";
public static final String JOB_INDEX_TEMPLATE_NAME = "index-template-scheduled-commands";
public static final Integer JOB_PERIOD_MINUTES = 1;
public static final Integer PAGE_SIZE = 100;
public static final Long DEFAULT_TIMEOUT_SECONDS = 20L;
Expand Down Expand Up @@ -130,6 +131,7 @@ private void scheduleCommandJob(
jobDocument = JobDocument.getInstance();
CompletableFuture<IndexResponse> indexResponseCompletableFuture =
jobDocument.create(
clusterService,
client,
threadPool,
UUIDs.base64UUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ public CompletableFuture<RestStatus> asyncCreate(Document document) {
try (ThreadContext.StoredContext ignored =
this.threadPool.getThreadContext().stashContext()) {
// Create index template if it does not exist.
if (!indexTemplateExists(
if (!IndexTemplateUtils.indexTemplateExists(
this.clusterService,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
IndexTemplateUtils.putIndexTemplate(
this.client,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
log.info(
"Index template {} already exists. Skipping creation.",
Expand Down Expand Up @@ -126,9 +128,11 @@ public CompletableFuture<RestStatus> asyncBulkCreate(ArrayList<Document> documen
try (ThreadContext.StoredContext ignored =
this.threadPool.getThreadContext().stashContext()) {
// Create index template if it does not exist.
if (!indexTemplateExists(
if (!IndexTemplateUtils.indexTemplateExists(
this.clusterService,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(
IndexTemplateUtils.putIndexTemplate(
this.client,
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
log.info(
Expand Down Expand Up @@ -166,7 +170,6 @@ public boolean indexTemplateExists(String template_name) {
* @param templateName : The name if the index template to load
*/
public void putIndexTemplate(String templateName) {
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);
try {
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
*/
package com.wazuh.commandmanager.jobscheduler;

import com.wazuh.commandmanager.utils.IndexTemplateUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
Expand Down Expand Up @@ -49,7 +51,7 @@ public static JobDocument getInstance() {
* @return a CompletableFuture that will hold the IndexResponse.
*/
public CompletableFuture<IndexResponse> create(
Client client, ThreadPool threadPool, String id, String jobName, Integer interval) {
ClusterService clusterService, Client client, ThreadPool threadPool, String id, String jobName, Integer interval) {
CompletableFuture<IndexResponse> completableFuture = new CompletableFuture<>();
ExecutorService executorService = threadPool.executor(ThreadPool.Names.WRITE);
CommandManagerJobParameter jobParameter =
Expand All @@ -65,7 +67,14 @@ public CompletableFuture<IndexResponse> create(
executorService.submit(
() -> {
try (ThreadContext.StoredContext ignored =
threadPool.getThreadContext().stashContext()) {
threadPool.getThreadContext().stashContext()) {
if (!IndexTemplateUtils.indexTemplateExists(clusterService,CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME)) {
IndexTemplateUtils.putIndexTemplate(client, CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME);
} else {
log.info(
"Index template {} already exists. Skipping creation.",
CommandManagerPlugin.JOB_INDEX_NAME);
}
IndexResponse indexResponse = client.index(indexRequest).actionGet();
completableFuture.complete(indexResponse);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@
*/
package com.wazuh.commandmanager.utils;

import com.wazuh.commandmanager.index.CommandIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;

import reactor.util.annotation.NonNull;

/** Util functions to parse and manage index templates files. */
public class IndexTemplateUtils {
private static final Logger log = LogManager.getLogger(IndexTemplateUtils.class);

/** Default constructor */
public IndexTemplateUtils() {}
Expand Down Expand Up @@ -70,4 +80,46 @@ public static Map<String, Object> toMap(InputStream is) throws IOException {
public static Map<String, Object> get(Map<String, Object> map, String key) {
return (Map<String, Object>) map.get(key);
}

/**
* Checks for the existence of the given index template in the cluster.
*
* @param clusterService The cluster service used to check the node's existence
* @param templateName index template name within the resources folder
* @return whether the index template exists.
*/
public static boolean indexTemplateExists(ClusterService clusterService, String templateName) {
Map<String, IndexTemplateMetadata> templates =
clusterService.state().metadata().templates();
log.debug("Existing index templates: {} ", templates);

return templates.containsKey(templateName);
}

/**
* Inserts an index template
* @param templateName : The name if the index template to load
*/
public static void putIndexTemplate(Client client, String templateName) {
try {
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");

PutIndexTemplateRequest putIndexTemplateRequest =
new PutIndexTemplateRequest()
.mapping(IndexTemplateUtils.get(template, "mappings"))
.settings(IndexTemplateUtils.get(template, "settings"))
.name(templateName)
.patterns((List<String>) template.get("index_patterns"));

AcknowledgedResponse acknowledgedResponse =
client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
if (acknowledgedResponse.isAcknowledged()) {
log.info("Index template [{}] created successfully", templateName);
}

} catch (IOException e) {
log.error("Error reading index template [{}] from filesystem", templateName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"index_patterns": [
".scheduled-commands"
],
"mappings": {
"dynamic": "strict",
"properties": {
"name": {
"type": "keyword"
},
"enabled": {
"type": "boolean"
},
"schedule": {
"properties": {
"interval": {
"properties": {
"start_time": {
"type": "date",
"format": "epoch_millis"
},
"period": {
"type": "integer"
},
"unit": {
"type": "keyword"
}
}
}
}
},
"enabled_time": {
"type": "date",
"format": "epoch_millis"
},
"last_update_time": {
"type": "date",
"format": "epoch_millis"
}
}
},
"order": 1,
"settings": {
"index": {
"hidden": true,
"number_of_replicas": "0",
"number_of_shards": "1",
"refresh_interval": "5s"
}
}
}

0 comments on commit 7434a9c

Please sign in to comment.