diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 69655c32..6b5864d1 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -1,9 +1,13 @@ import org.opensearch.gradle.test.RestIntegTestTask +import java.util.concurrent.Callable buildscript { ext { opensearch_version = System.getProperty("opensearch.version", "2.18.0-SNAPSHOT") + opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","") + opensearch_build = opensearch_no_snapshot + ".0" + job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) wazuh_version = System.getProperty("version", "5.0.0") revision = System.getProperty("revision", "0") } @@ -24,7 +28,7 @@ apply plugin: 'java' apply plugin: 'idea' apply plugin: 'eclipse' apply plugin: 'opensearch.opensearchplugin' -apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.rest-test' apply plugin: 'opensearch.pluginzip' def pluginName = 'wazuh-indexer-command-manager' @@ -67,6 +71,7 @@ opensearchplugin { name pluginName description pluginDescription classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file('LICENSE.txt') noticeFile rootProject.file('NOTICE.txt') } @@ -88,6 +93,10 @@ def versions = [ imposter: "4.1.2" ] +configurations { + zipArchive +} + dependencies { implementation "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}" implementation "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}" @@ -99,6 +108,11 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + // Job Scheduler stuff + zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}" + // implementation "org.opensearch:opensearch:${opensearch_version}" + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}" + testImplementation "junit:junit:${versions.junit}" // imposter @@ -134,30 +148,37 @@ test { jvmArgs "-Djava.security.policy=./plugins/command-manager/src/main/plugin-metadata/plugin-security.policy/plugin-security.policy" } -task integTest(type: RestIntegTestTask) { - description = "Run tests against a cluster" - testClassesDirs = sourceSets.test.output.classesDirs - classpath = sourceSets.test.runtimeClasspath +def getJobSchedulerPlugin() { + provider(new Callable() { + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-job-scheduler*' + }.singleFile + } + } + } + }) } -tasks.named("check").configure { dependsOn(integTest) } -integTest { +testClusters.integTest { + plugin(getJobSchedulerPlugin()) + testDistribution = "ARCHIVE" + // This installs our plugin into the testClusters + plugin(project.tasks.bundlePlugin.archiveFile) + // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable if (System.getProperty("test.debug") != null) { jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' } -} - -testClusters.integTest { - testDistribution = "INTEG_TEST" - //testDistribution = "ARCHIVE" - // This installs our plugin into the testClusters - plugin(project.tasks.bundlePlugin.archiveFile) // add customized keystore keystore 'm_api.auth.username', 'admin' keystore 'm_api.auth.password', 'test' - keystore 'm_api.uri', 'http://127.0.0.1:55000' // base URI of the M_API + keystore 'm_api.uri', 'https://127.0.0.1:55000' // base URI of the M_API } run { diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index 71b805fd..66444ff7 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -8,33 +8,52 @@ */ package com.wazuh.commandmanager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.settings.*; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ReloadablePlugin; import org.opensearch.repositories.RepositoriesService; -import org.opensearch.rest.RestController; -import org.opensearch.rest.RestHandler; +import org.opensearch.rest.*; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import com.wazuh.commandmanager.index.CommandIndex; +import com.wazuh.commandmanager.jobscheduler.CommandManagerJobParameter; +import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner; +import com.wazuh.commandmanager.jobscheduler.JobDocument; import com.wazuh.commandmanager.rest.RestPostCommandAction; import com.wazuh.commandmanager.settings.PluginSettings; import com.wazuh.commandmanager.utils.httpclient.HttpRestClient; @@ -42,15 +61,30 @@ /** * The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands * from the Wazuh Server. These commands are processed, indexed and sent back to the Server for its - * delivery to, in most cases, the Agents. + * delivery to, in most cases, the Agents. The Command Manager plugin exposes an HTTP API with a + * single endpoint to receive raw commands from the Wazuh Server. These commands are processed, + * indexed and sent back to the Server for its delivery to, in most cases, the Agents. + * + *

The Command Manager plugin is also a JobScheduler extension plugin. */ -public class CommandManagerPlugin extends Plugin implements ActionPlugin, ReloadablePlugin { +public class CommandManagerPlugin extends Plugin + implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension { public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager"; public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands"; public static final String COMMAND_MANAGER_INDEX_NAME = ".commands"; public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands"; + public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command"; + public static final String JOB_INDEX_NAME = ".scheduled-commands"; + public static final Integer JOB_PERIOD_MINUTES = 1; + public static final Integer PAGE_SIZE = 2; + public static final Long DEFAULT_TIMEOUT_SECONDS = 20L; + public static final TimeValue PIT_KEEPALIVE_SECONDS = TimeValue.timeValueSeconds(30L); + + private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class); + public static final String JOB_TYPE = "command_manager_scheduler_extension"; private CommandIndex commandIndex; + private JobDocument jobDocument; @Override public Collection createComponents( @@ -68,9 +102,50 @@ public Collection createComponents( this.commandIndex = new CommandIndex(client, clusterService, threadPool); PluginSettings.getInstance(environment.settings()); + // JobSchedulerExtension stuff + CommandManagerJobRunner.getInstance() + .setThreadPool(threadPool) + .setClient(client) + .setClusterService(clusterService) + .setEnvironment(environment); + + scheduleCommandJob(client, clusterService, threadPool); + return Collections.emptyList(); } + /** + * Indexes a document into the jobs index, so that JobScheduler plugin can run it + * + * @param client: The cluster client, used for indexing + * @param clusterService: Provides the addListener method. We use it to determine if this is a + * new cluster. + * @param threadPool: Used by jobDocument to create the document in a thread. + */ + private void scheduleCommandJob( + Client client, ClusterService clusterService, ThreadPool threadPool) { + clusterService.addListener( + event -> { + if (event.localNodeClusterManager() && event.isNewCluster()) { + jobDocument = JobDocument.getInstance(); + CompletableFuture indexResponseCompletableFuture = + jobDocument.create( + client, + threadPool, + UUIDs.base64UUID(), + getJobType(), + JOB_PERIOD_MINUTES); + indexResponseCompletableFuture.thenAccept( + indexResponse -> { + log.info( + "Scheduled task successfully, response: {}", + indexResponse.getResult().toString()); + }); + } + }); + } + + @Override public List getRestHandlers( Settings settings, RestController restController, @@ -100,6 +175,75 @@ public void reload(Settings settings) { // xxxService.refreshAndClearCache(commandManagerSettings); } + @Override + public String getJobType() { + return CommandManagerPlugin.JOB_TYPE; + } + + @Override + public String getJobIndex() { + return CommandManagerPlugin.JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + log.info("getJobRunner() executed"); + return CommandManagerJobRunner.getInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + log.info("getJobParser() executed"); + return (parser, id, jobDocVersion) -> { + CommandManagerJobParameter jobParameter = new CommandManagerJobParameter(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case CommandManagerJobParameter.NAME_FIELD: + jobParameter.setJobName(parser.text()); + break; + case CommandManagerJobParameter.ENABLED_FIELD: + jobParameter.setEnabled(parser.booleanValue()); + break; + case CommandManagerJobParameter.ENABLED_TIME_FIELD: + jobParameter.setEnabledTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD: + jobParameter.setLastUpdateTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.SCHEDULE_FIELD: + jobParameter.setSchedule(ScheduleParser.parse(parser)); + break; + default: + XContentParserUtils.throwUnknownToken( + parser.currentToken(), parser.getTokenLocation()); + } + } + return jobParameter; + }; + } + + /** + * Returns the proper Instant object with milliseconds from the Unix epoch when the current + * token actually holds a value. + * + * @param parser: The parser as provided by JobScheduler + */ + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } + /** * Close the resources opened by this plugin. * diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java new file mode 100644 index 00000000..c6da74a3 --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobParameter.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager.jobscheduler; + +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +import java.io.IOException; +import java.time.Instant; + +/** A model for the parameters and schema to be indexed to the jobs index. */ +public class CommandManagerJobParameter implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FIELD = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FIELD = "enabled_time"; + public static final String ENABLED_TIME_FIELD_READABLE = "enabled_time_field"; + + private String jobName; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + private Schedule schedule; + + public CommandManagerJobParameter() {} + + public CommandManagerJobParameter(String jobName, Schedule schedule) { + this.jobName = jobName; + this.schedule = schedule; + + Instant now = Instant.now(); + this.isEnabled = true; + this.enabledTime = now; + this.lastUpdateTime = now; + } + + @Override + public String getName() { + return this.jobName; + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + } + + public void setEnabled(boolean enabled) { + isEnabled = enabled; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD, this.jobName); + builder.field(ENABLED_FIELD, this.isEnabled); + builder.field(SCHEDULE_FIELD, this.schedule); + if (this.enabledTime != null) { + builder.timeField( + ENABLED_TIME_FIELD, + ENABLED_TIME_FIELD_READABLE, + this.enabledTime.toEpochMilli()); + } + if (this.lastUpdateTime != null) { + builder.timeField( + LAST_UPDATE_TIME_FIELD, + LAST_UPDATE_TIME_FIELD_READABLE, + this.lastUpdateTime.toEpochMilli()); + } + builder.endObject(); + + return builder; + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java new file mode 100644 index 00000000..3f2e32e0 --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/CommandManagerJobRunner.java @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager.jobscheduler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.env.Environment; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.threadpool.ThreadPool; + +import com.wazuh.commandmanager.CommandManagerPlugin; + +/** + * Implements the ScheduledJobRunner interface, which exposes the runJob() method, which executes + * the job's logic in its own thread. + */ +public class CommandManagerJobRunner implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(CommandManagerJobRunner.class); + private static CommandManagerJobRunner INSTANCE; + private ThreadPool threadPool; + private ClusterService clusterService; + + private Client client; + private Environment environment; + + private CommandManagerJobRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public static CommandManagerJobRunner getInstance() { + log.info("Getting Job Runner Instance"); + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (CommandManagerJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new CommandManagerJobRunner(); + return INSTANCE; + } + } + + private boolean commandManagerIndexExists() { + return this.clusterService + .state() + .routingTable() + .hasIndex(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!commandManagerIndexExists()) { + log.info( + "{} index not yet created, not running command manager jobs", + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); + return; + } + SearchThread searchThread = new SearchThread(this.client); + threadPool.generic().submit(searchThread); + } + + public CommandManagerJobRunner setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + return getInstance(); + } + + public CommandManagerJobRunner setClient(Client client) { + this.client = client; + return getInstance(); + } + + public CommandManagerJobRunner setEnvironment(Environment environment) { + this.environment = environment; + return getInstance(); + } + + public CommandManagerJobRunner setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + return getInstance(); + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java new file mode 100644 index 00000000..cf776d39 --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/JobDocument.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager.jobscheduler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.client.Client; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import com.wazuh.commandmanager.CommandManagerPlugin; + +/** Indexes the command job to the Jobs index. */ +public class JobDocument { + private static final Logger log = LogManager.getLogger(JobDocument.class); + private static final JobDocument INSTANCE = new JobDocument(); + + private JobDocument() {} + + public static JobDocument getInstance() { + log.info("Getting JobDocument Instance"); + return INSTANCE; + } + + /** + * Writes a CommandManagerJobParameter type document to the jobs index + * + * @param client: The cluster's client + * @param threadPool: The cluster's threadPool + * @param id: The job ID to be used + * @param jobName: The name of the job + * @param interval: The interval the action is expected to run at + * @return a CompletableFuture that will hold the IndexResponse. + */ + public CompletableFuture create( + Client client, ThreadPool threadPool, String id, String jobName, Integer interval) { + CompletableFuture completableFuture = new CompletableFuture<>(); + ExecutorService executorService = threadPool.executor(ThreadPool.Names.WRITE); + CommandManagerJobParameter jobParameter = + new CommandManagerJobParameter( + jobName, new IntervalSchedule(Instant.now(), interval, ChronoUnit.MINUTES)); + try { + IndexRequest indexRequest = + new IndexRequest() + .index(CommandManagerPlugin.JOB_INDEX_NAME) + .id(id) + .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .create(true); + executorService.submit( + () -> { + try (ThreadContext.StoredContext ignored = + threadPool.getThreadContext().stashContext()) { + IndexResponse indexResponse = client.index(indexRequest).actionGet(); + completableFuture.complete(indexResponse); + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + }); + } catch (IOException e) { + log.error("Failed to index command with ID {}: {}", id, e); + } + return completableFuture; + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java new file mode 100644 index 00000000..052386b3 --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java @@ -0,0 +1,314 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager.jobscheduler; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.net.URIBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.SortOrder; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import com.wazuh.commandmanager.CommandManagerPlugin; +import com.wazuh.commandmanager.model.Command; +import com.wazuh.commandmanager.model.Status; +import com.wazuh.commandmanager.settings.PluginSettings; +import com.wazuh.commandmanager.utils.httpclient.AuthHttpRestClient; + +/** + * The class in charge of searching and managing commands in {@link + * com.wazuh.commandmanager.model.Status#PENDING} status and of submitting them to the destination + * client. + */ +public class SearchThread implements Runnable { + public static final String COMMAND_STATUS_FIELD = + Command.COMMAND + "." + Command.STATUS + ".keyword"; + public static final String COMMAND_ORDER_ID_FIELD = + Command.COMMAND + "." + Command.ORDER_ID + ".keyword"; + public static final String COMMAND_TIMEOUT_FIELD = Command.COMMAND + "." + Command.TIMEOUT; + private static final Logger log = LogManager.getLogger(SearchThread.class); + public static final String ORDERS_OBJECT = "/orders"; + private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + private final Client client; + private SearchResponse currentPage = null; + + public SearchThread(Client client) { + this.client = client; + } + + /** + * Retrieves a nested value from a {@code Map} in a (somewhat) safe way. + * + * @param map The parent map to look at. + * @param key The key our nested object is found under. + * @param type The type we expect the nested object to be of. + * @param The type of the nested object. + * @return the nested object cast into the proper type. + */ + public static T getNestedObject(Map map, String key, Class type) { + Object value = map.get(key); + if (type.isInstance(value)) { + return type.cast(value); + } else { + throw new ClassCastException( + "Expected " + + type + + " but found " + + (value != null ? value.getClass() : "null")); + } + } + + /** + * Iterates over search results, updating their status field and submitting them to the + * destination + * + * @param searchResponse The search results page + * @throws IllegalStateException Rethrown from setSentStatus() + */ + public void handlePage(SearchResponse searchResponse) throws IllegalStateException { + SearchHits searchHits = searchResponse.getHits(); + for (SearchHit hit : searchHits) { + SimpleHttpResponse response = deliverOrders(hit); + if (response == null) { + return; + } + if (RestStatus.fromCode(response.getCode()) == RestStatus.CREATED + | RestStatus.fromCode(response.getCode()) == RestStatus.ACCEPTED + | RestStatus.fromCode(response.getCode()) == RestStatus.OK) { + setSentStatus(hit); + } + } + } + + /** + * Send the command order over HTTP + * + * @param hit The command order + */ + @SuppressWarnings("unchecked") + private SimpleHttpResponse deliverOrders(SearchHit hit) { + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + PluginSettings settings = PluginSettings.getInstance(); + String orders = + xContentBuilder + .map( + Collections.singletonMap( + "orders", + new Object[] { + getNestedObject( + hit.getSourceAsMap(), + Command.COMMAND, + Map.class) + })) + .toString(); + URI uri = new URIBuilder(settings.getUri() + SearchThread.ORDERS_OBJECT).build(); + return AccessController.doPrivileged( + (PrivilegedAction) + () -> AuthHttpRestClient.getInstance().post(uri, orders, hit.getId())); + } catch (IOException e) { + log.error("Error parsing hit contents: {}", e.getMessage()); + } catch (URISyntaxException e) { + log.error("Invalid URI: {}", e.getMessage()); + } + return null; + } + + /** + * Retrieves the hit's contents and updates the {@link com.wazuh.commandmanager.model.Status} + * field to {@link com.wazuh.commandmanager.model.Status#SENT}. + * + * @param hit The page's result we are to update. + * @throws IllegalStateException Raised by {@link + * org.opensearch.common.action.ActionFuture#actionGet(long)}. + */ + @SuppressWarnings("unchecked") + private void setSentStatus(SearchHit hit) throws IllegalStateException { + Map commandMap = + getNestedObject( + hit.getSourceAsMap(), + CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, + Map.class); + commandMap.put(Command.STATUS, Status.SENT); + hit.getSourceAsMap() + .put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap); + IndexRequest indexRequest = + new IndexRequest() + .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) + .source(hit.getSourceAsMap()) + .id(hit.getId()); + this.client + .index(indexRequest) + .actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000); + } + + /** + * Runs a PIT style query against the Commands index. + * + * @param pointInTimeBuilder A pit builder object used to run the query. + * @param searchAfter An array of objects containing the last page's values of the sort fields. + * @return The search response. + * @throws IllegalStateException Raised by {@link + * org.opensearch.common.action.ActionFuture#actionGet(long)}. + */ + public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] searchAfter) + throws IllegalStateException { + SearchRequest searchRequest = + new SearchRequest(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME); + TermQueryBuilder termQueryBuilder = + QueryBuilders.termQuery(SearchThread.COMMAND_STATUS_FIELD, Status.PENDING); + TimeValue timeout = + TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS); + this.searchSourceBuilder + .query(termQueryBuilder) + .size(CommandManagerPlugin.PAGE_SIZE) + .trackTotalHits(true) + .timeout(timeout) + .pointInTimeBuilder(pointInTimeBuilder); + if (this.searchSourceBuilder.sorts() == null) { + this.searchSourceBuilder + .sort(SearchThread.COMMAND_ORDER_ID_FIELD, SortOrder.ASC) + .sort(SearchThread.COMMAND_TIMEOUT_FIELD, SortOrder.ASC); + } + if (searchAfter.length > 0) { + this.searchSourceBuilder.searchAfter(searchAfter); + } + searchRequest.source(this.searchSourceBuilder); + return this.client.search(searchRequest).actionGet(timeout); + } + + @Override + public void run() { + long consumableHits = 0L; + boolean firstPage = true; + PointInTimeBuilder pointInTimeBuilder = buildPit(); + try { + do { + this.currentPage = + pitQuery( + pointInTimeBuilder, + getSearchAfter(this.currentPage).orElse(new Object[0])); + if (firstPage) { + consumableHits = totalHits(); + firstPage = false; + } + if (consumableHits > 0) { + handlePage(this.currentPage); + consumableHits -= getPageLength(); + } + } while (consumableHits > 0); + } catch (ArrayIndexOutOfBoundsException e) { + log.error("ArrayIndexOutOfBoundsException retrieving page: {}", e.getMessage()); + } catch (IllegalStateException e) { + log.error("IllegalStateException retrieving page: {}", e.getMessage()); + } catch (Exception e) { + log.error("Generic exception retrieving page: {}", e.getMessage()); + } + } + + private long getPageLength() { + return this.currentPage.getHits().getHits().length; + } + + private long totalHits() { + if (this.currentPage.getHits().getTotalHits() != null) { + return this.currentPage.getHits().getTotalHits().value; + } else { + return 0; + } + } + + /** + * Gets the sort values of the last hit of a page. It is used by a PIT search to get the next + * page of results. + * + * @param searchResponse The current page + * @return The values of the sort fields of the last hit of a page whenever present. Otherwise, + * an empty Optional. + */ + private Optional getSearchAfter(SearchResponse searchResponse) { + if (searchResponse == null) { + return Optional.empty(); + } + try { + List hits = Arrays.asList(searchResponse.getHits().getHits()); + if (hits.isEmpty()) { + log.warn("Empty hits page, not getting searchAfter values"); + return Optional.empty(); + } + return Optional.ofNullable(hits.get(hits.size() - 1).getSortValues()); + } catch (NullPointerException | NoSuchElementException e) { + log.error("Could not get the page's searchAfter values: {}", e.getMessage()); + return Optional.empty(); + } + } + + /** + * Prepares a PointInTimeBuilder object to be used in a search. + * + * @return a PointInTimeBuilder or null. + */ + private PointInTimeBuilder buildPit() { + CompletableFuture future = new CompletableFuture<>(); + ActionListener actionListener = + new ActionListener<>() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + future.complete(createPitResponse); + } + + @Override + public void onFailure(Exception e) { + log.error(e.getMessage()); + future.completeExceptionally(e); + } + }; + this.client.createPit( + new CreatePitRequest( + CommandManagerPlugin.PIT_KEEPALIVE_SECONDS, + false, + CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME), + actionListener); + try { + return new PointInTimeBuilder(future.get().getId()); + } catch (CancellationException e) { + log.error("Building PIT was cancelled: {}", e.getMessage()); + } catch (ExecutionException e) { + log.error("Error building PIT: {}", e.getMessage()); + } catch (InterruptedException e) { + log.error("Building PIT was interrupted: {}", e.getMessage()); + } + return null; + } +} diff --git a/plugins/command-manager/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/plugins/command-manager/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 00000000..69bad7f1 --- /dev/null +++ b/plugins/command-manager/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright Wazuh Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +com.wazuh.commandmanager.CommandManagerPlugin \ No newline at end of file