diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 3c6ab2c..5820b8e 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -1,11 +1,12 @@ import org.opensearch.gradle.test.RestIntegTestTask +import java.util.concurrent.Callable + apply plugin: 'java' apply plugin: 'idea' -apply plugin: 'eclipse' apply plugin: 'opensearch.opensearchplugin' -apply plugin: 'opensearch.yaml-rest-test' apply plugin: 'opensearch.pluginzip' +apply plugin: 'opensearch.testclusters' def pluginName = 'wazuh-indexer-command-manager' def pluginDescription = 'The Command Manager plugin handles and distributes commands across your Wazuh environment.' @@ -47,16 +48,22 @@ opensearchplugin { name pluginName description pluginDescription classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + version = "${wazuh_version}" + ".${revision}" + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file('LICENSE.txt') noticeFile rootProject.file('NOTICE.txt') } +configurations { + zipArchive +} + def versions = [ - httpclient5: "5.4", - httpcore5: "5.3", - slf4j: "1.7.36", - log4j: "2.23.1", - conscrypt: "2.5.2" + httpclient5: "5.4", + httpcore5 : "5.3", + slf4j : "1.7.36", + log4j : "2.23.1", + conscrypt : "2.5.2" ] dependencies { @@ -66,6 +73,11 @@ dependencies { api "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}" api "org.slf4j:slf4j-api:${versions.slf4j}" api "org.conscrypt:conscrypt-openjdk-uber:${versions.conscrypt}" + + /// Job Scheduler stuff + zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}" + implementation "org.opensearch:opensearch:${opensearch_version}" + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}" } // This requires an additional Jar not published as part of build-tools @@ -83,6 +95,8 @@ dependencyLicenses.enabled = false buildscript { ext { opensearch_version = System.getProperty("opensearch.version", "2.16.0") + opensearch_build = opensearch_version + ".0" + job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) wazuh_version = System.getProperty("version", "5.0.0") revision = System.getProperty("revision", "0") } @@ -110,6 +124,20 @@ test { include '**/*Tests.class' } +// As of ES 7.7 the sample-extension-plugin is being added to the list of plugins for the testCluster during build before +// the job-scheduler plugin is causing build failures. +// The job-scheduler zip is added explicitly above but the sample-extension-plugin is added implicitly at some time during evaluation. +// Will need to do a deep dive to find out exactly what task adds the sample-extension-plugin and add job-scheduler there but a temporary hack is to +// reorder the plugins list after evaluation but prior to task execution when the plugins are installed. +afterEvaluate { + testClusters.integTest.nodes.each { node -> + def plugins = node.plugins + def firstPlugin = plugins.get(0) + plugins.remove(0) + plugins.add(firstPlugin) + } +} + task integTest(type: RestIntegTestTask) { description = "Run tests against a cluster" testClassesDirs = sourceSets.test.output.classesDirs @@ -118,17 +146,33 @@ task integTest(type: RestIntegTestTask) { tasks.named("check").configure { dependsOn(integTest) } integTest { - // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable + // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable if (System.getProperty("test.debug") != null) { - jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000' } } +Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); +integTest.dependsOn(bundle) +integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))} + testClusters.integTest { testDistribution = "INTEG_TEST" + // need to install job-scheduler first, need to assemble job-scheduler first + plugin(provider(new Callable() { + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + })) // This installs our plugin into the testClusters - plugin(project.tasks.bundlePlugin.archiveFile) + // plugin(project.tasks.bundlePlugin.archiveFile) } run { @@ -141,7 +185,7 @@ task updateVersion { doLast { ext.newVersion = System.getProperty('newVersion') println "Setting version to ${newVersion}." - // String tokenization to support -SNAPSHOT + // String tokenization to support -SNAPSHOT ant.replaceregexp(file: 'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags: 'g', byline: true) } -} \ No newline at end of file +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java new file mode 100644 index 0000000..a4c63ba --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager; + +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +import java.io.IOException; +import java.time.Instant; + +/** + * A sample job parameter. + *

+ * It adds "indexToWatch" field to {@link ScheduledJobParameter}, which stores the index + * the job runner will watch. + */ +public class CommandManagerJobParameter implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FILED = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FILED = "enabled_time"; + public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field"; + public static final String INDEX_NAME_FIELD = "index_name_to_watch"; + + private String jobName; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + private Schedule schedule; + private String indexToWatch; + + public CommandManagerJobParameter() {} + + public CommandManagerJobParameter(String name, String indexToWatch, Schedule schedule) { + this.jobName = name; + this.indexToWatch = indexToWatch; + this.schedule = schedule; + + Instant now = Instant.now(); + this.isEnabled = true; + this.enabledTime = now; + this.lastUpdateTime = now; + } + + + @Override + public String getName() { + return this.jobName; + } + + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + public String getIndexToWatch() { + return indexToWatch; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + } + + public void setEnabled(boolean enabled) { + isEnabled = enabled; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setIndexToWatch(String indexToWatch) { + this.indexToWatch = indexToWatch; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD, this.jobName); + builder.field(ENABLED_FILED, this.isEnabled); + builder.field(SCHEDULE_FIELD, this.schedule); + builder.field(INDEX_NAME_FIELD, this.indexToWatch); + if (this.enabledTime != null) { + builder.timeField(ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, this.enabledTime.toEpochMilli()); + } + if (this.lastUpdateTime != null) { + builder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD_READABLE, this.lastUpdateTime.toEpochMilli()); + } + builder.endObject(); + + return builder; + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java new file mode 100644 index 0000000..add75de --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java @@ -0,0 +1,156 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.UUID; + +/** + * A sample job runner class. + *

+ * The job runner should be a singleton class if it uses OpenSearch client or other objects passed + * from OpenSearch. Because when registering the job runner to JobScheduler plugin, OpenSearch has + * not invoked plugins' createComponents() method. That is saying the plugin is not completely initialized, + * and the OpenSearch {@link org.opensearch.client.Client}, {@link ClusterService} and other objects + * are not available to plugin and this job runner. + *

+ * So we have to move this job runner initialization to {@link Plugin} createComponents() method, and using + * singleton job runner to ensure we register a usable job runner instance to JobScheduler plugin. + *

+ * This sample job runner takes the "indexToWatch" from job parameter and logs that index's shards. + */ +public class CommandManagerJobRunner implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); + + private static CommandManagerJobRunner INSTANCE; + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private CommandManagerJobRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public static CommandManagerJobRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (CommandManagerJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new CommandManagerJobRunner(); + return INSTANCE; + } + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof CommandManagerJobParameter)) { + throw new IllegalStateException( + "Job parameter is not instance of SampleJobParameter, type: " + jobParameter.getClass().getCanonicalName() + ); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + final LockService lockService = context.getLockService(); + + Runnable runnable = () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + if (lock == null) { + return; + } + + CommandManagerJobParameter parameter = (CommandManagerJobParameter) jobParameter; + StringBuilder msg = new StringBuilder(); + msg.append("Watching index ").append(parameter.getIndexToWatch()).append("\n"); + + List shardRoutingList = this.clusterService + .state() + .routingTable() + .allShards(parameter.getIndexToWatch()); + + for (ShardRouting shardRouting : shardRoutingList) { + msg.append(shardRouting.shardId().getId()) + .append("\t") + .append(shardRouting.currentNodeId()) + .append("\t") + .append(shardRouting.active() ? "active" : "inactive") + .append("\n"); + } + log.info(msg.toString()); + runTaskForIntegrationTests(parameter); + runTaskForLockIntegrationTests(parameter); + + lockService.release( + lock, + ActionListener.wrap(released -> { + log.info("Released lock for job {}", jobParameter.getName()); + }, exception -> { + throw new IllegalStateException("Failed to release lock."); + }) + ); + }, exception -> { + throw new IllegalStateException("Failed to acquire lock."); + })); + } + }; + + threadPool.generic().submit(runnable); + } + + private void runTaskForIntegrationTests(CommandManagerJobParameter jobParameter) { + this.client.index( + new IndexRequest(jobParameter.getIndexToWatch()).id(UUID.randomUUID().toString()) + .source("{\"message\": \"message\"}", XContentType.JSON) + ); + } + + private void runTaskForLockIntegrationTests(CommandManagerJobParameter jobParameter) throws InterruptedException { + if (jobParameter.getName().equals("sample-job-lock-test-it")) { + Thread.sleep(180000); + } + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index 073216c..99d958d 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -7,8 +7,11 @@ */ package com.wazuh.commandmanager; + import com.wazuh.commandmanager.index.CommandIndex; import com.wazuh.commandmanager.rest.action.RestPostCommandAction; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.wazuh.commandmanager.utils.httpclient.HttpRestClient; import com.wazuh.commandmanager.utils.httpclient.HttpRestClientDemo; import org.opensearch.client.Client; @@ -21,8 +24,14 @@ import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -33,6 +42,7 @@ import org.opensearch.watcher.ResourceWatcherService; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -43,11 +53,16 @@ * receive raw commands from the Wazuh Server. These commands are processed, * indexed and sent back to the Server for its delivery to, in most cases, the * Agents. + *

+ * The Command Manager plugin is also a JobScheduler extension plugin. */ -public class CommandManagerPlugin extends Plugin implements ActionPlugin { +public class CommandManagerPlugin extends Plugin implements ActionPlugin, JobSchedulerExtension { public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_commandmanager"; public static final String COMMAND_MANAGER_INDEX_NAME = ".commands"; public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands"; + public static final String JOB_INDEX_NAME = ".commands"; + + private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class); private CommandIndex commandIndex; @@ -67,13 +82,21 @@ public Collection createComponents( ) { this.commandIndex = new CommandIndex(client, clusterService, threadPool); + // JobSchedulerExtension stuff + CommandManagerJobRunner jobRunner = CommandManagerJobRunner.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); + // HttpRestClient stuff String uri = "https://httpbin.org/post"; String payload = "{\"message\": \"Hello world!\"}"; HttpRestClientDemo.run(uri, payload); + return Collections.emptyList(); } + @Override public List getRestHandlers( Settings settings, RestController restController, @@ -86,6 +109,72 @@ public List getRestHandlers( return Collections.singletonList(new RestPostCommandAction(this.commandIndex)); } + @Override + public String getJobType() { + return "command_manager_scheduler_extension"; + } + + @Override + public String getJobIndex() { + return JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return CommandManagerJobRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + CommandManagerJobParameter jobParameter = new CommandManagerJobParameter(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, + parser.nextToken(), + parser + ); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case CommandManagerJobParameter.NAME_FIELD: + jobParameter.setJobName(parser.text()); + break; + case CommandManagerJobParameter.ENABLED_FILED: + jobParameter.setEnabled(parser.booleanValue()); + break; + case CommandManagerJobParameter.ENABLED_TIME_FILED: + jobParameter.setEnabledTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD: + jobParameter.setLastUpdateTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.SCHEDULE_FIELD: + jobParameter.setSchedule(ScheduleParser.parse(parser)); + break; + case CommandManagerJobParameter.INDEX_NAME_FIELD: + jobParameter.setIndexToWatch(parser.text()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return jobParameter; + }; + } + + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } + /** * Close the resources opened by this plugin. * diff --git a/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000..69bad7f --- /dev/null +++ b/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright Wazuh Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +com.wazuh.commandmanager.CommandManagerPlugin \ No newline at end of file