From 6e9756078207ac99535c87903b1a3342c2c5dfdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Ruiz?= Date: Tue, 17 Sep 2024 13:55:54 +0200 Subject: [PATCH 1/6] Add PoC extending the Job-Scheduler plugin Not functional yet --- plugins/command-manager/build.gradle | 5 + .../CommandManagerJobParameter.java | 128 ++++++++++++++ .../CommandManagerJobRunner.java | 156 ++++++++++++++++++ .../commandmanager/CommandManagerPlugin.java | 123 +++++++++++++- ...rch.jobscheduler.spi.JobSchedulerExtension | 6 + 5 files changed, 416 insertions(+), 2 deletions(-) create mode 100644 plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java create mode 100644 plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java create mode 100644 plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 7defcd1..13f5ec8 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -47,6 +47,7 @@ opensearchplugin { name pluginName description pluginDescription classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file('LICENSE.txt') noticeFile rootProject.file('NOTICE.txt') } @@ -83,6 +84,10 @@ repositories { maven { url "https://plugins.gradle.org/m2/" } } +dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_version}.0" +} + test { include '**/*Tests.class' } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java new file mode 100644 index 0000000..a4c63ba --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager; + +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +import java.io.IOException; +import java.time.Instant; + +/** + * A sample job parameter. + *

+ * It adds "indexToWatch" field to {@link ScheduledJobParameter}, which stores the index + * the job runner will watch. + */ +public class CommandManagerJobParameter implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FILED = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FILED = "enabled_time"; + public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field"; + public static final String INDEX_NAME_FIELD = "index_name_to_watch"; + + private String jobName; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + private Schedule schedule; + private String indexToWatch; + + public CommandManagerJobParameter() {} + + public CommandManagerJobParameter(String name, String indexToWatch, Schedule schedule) { + this.jobName = name; + this.indexToWatch = indexToWatch; + this.schedule = schedule; + + Instant now = Instant.now(); + this.isEnabled = true; + this.enabledTime = now; + this.lastUpdateTime = now; + } + + + @Override + public String getName() { + return this.jobName; + } + + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + public String getIndexToWatch() { + return indexToWatch; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + } + + public void setEnabled(boolean enabled) { + isEnabled = enabled; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setIndexToWatch(String indexToWatch) { + this.indexToWatch = indexToWatch; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD, this.jobName); + builder.field(ENABLED_FILED, this.isEnabled); + builder.field(SCHEDULE_FIELD, this.schedule); + builder.field(INDEX_NAME_FIELD, this.indexToWatch); + if (this.enabledTime != null) { + builder.timeField(ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, this.enabledTime.toEpochMilli()); + } + if (this.lastUpdateTime != null) { + builder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD_READABLE, this.lastUpdateTime.toEpochMilli()); + } + builder.endObject(); + + return builder; + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java new file mode 100644 index 0000000..add75de --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java @@ -0,0 +1,156 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.UUID; + +/** + * A sample job runner class. + *

+ * The job runner should be a singleton class if it uses OpenSearch client or other objects passed + * from OpenSearch. Because when registering the job runner to JobScheduler plugin, OpenSearch has + * not invoked plugins' createComponents() method. That is saying the plugin is not completely initialized, + * and the OpenSearch {@link org.opensearch.client.Client}, {@link ClusterService} and other objects + * are not available to plugin and this job runner. + *

+ * So we have to move this job runner initialization to {@link Plugin} createComponents() method, and using + * singleton job runner to ensure we register a usable job runner instance to JobScheduler plugin. + *

+ * This sample job runner takes the "indexToWatch" from job parameter and logs that index's shards. + */ +public class CommandManagerJobRunner implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); + + private static CommandManagerJobRunner INSTANCE; + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private CommandManagerJobRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public static CommandManagerJobRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (CommandManagerJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new CommandManagerJobRunner(); + return INSTANCE; + } + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof CommandManagerJobParameter)) { + throw new IllegalStateException( + "Job parameter is not instance of SampleJobParameter, type: " + jobParameter.getClass().getCanonicalName() + ); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + final LockService lockService = context.getLockService(); + + Runnable runnable = () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + if (lock == null) { + return; + } + + CommandManagerJobParameter parameter = (CommandManagerJobParameter) jobParameter; + StringBuilder msg = new StringBuilder(); + msg.append("Watching index ").append(parameter.getIndexToWatch()).append("\n"); + + List shardRoutingList = this.clusterService + .state() + .routingTable() + .allShards(parameter.getIndexToWatch()); + + for (ShardRouting shardRouting : shardRoutingList) { + msg.append(shardRouting.shardId().getId()) + .append("\t") + .append(shardRouting.currentNodeId()) + .append("\t") + .append(shardRouting.active() ? "active" : "inactive") + .append("\n"); + } + log.info(msg.toString()); + runTaskForIntegrationTests(parameter); + runTaskForLockIntegrationTests(parameter); + + lockService.release( + lock, + ActionListener.wrap(released -> { + log.info("Released lock for job {}", jobParameter.getName()); + }, exception -> { + throw new IllegalStateException("Failed to release lock."); + }) + ); + }, exception -> { + throw new IllegalStateException("Failed to acquire lock."); + })); + } + }; + + threadPool.generic().submit(runnable); + } + + private void runTaskForIntegrationTests(CommandManagerJobParameter jobParameter) { + this.client.index( + new IndexRequest(jobParameter.getIndexToWatch()).id(UUID.randomUUID().toString()) + .source("{\"message\": \"message\"}", XContentType.JSON) + ); + } + + private void runTaskForLockIntegrationTests(CommandManagerJobParameter jobParameter) throws InterruptedException { + if (jobParameter.getName().equals("sample-job-lock-test-it")) { + Thread.sleep(180000); + } + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index b7aa839..c4f20c1 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -7,9 +7,128 @@ */ package com.wazuh.commandmanager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Supplier; -public class CommandManagerPlugin extends Plugin { - // Implement the relevant Plugin Interfaces here +/** + * The Command Manager plugin is a JobScheduler extension plugin. + *

+ * It uses ".commands" index to manage its scheduled jobs, and exposes a REST API + * endpoint using {\@link SampleExtensionRestHandler}. + */ +public class CommandManagerPlugin extends Plugin implements JobSchedulerExtension { + static final String JOB_INDEX_NAME = ".commands"; + private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class); + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + CommandManagerJobRunner jobRunner = CommandManagerJobRunner.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); + + return Collections.emptyList(); + } + + @Override + public String getJobType() { + return "command_manager_scheduler_extension"; + } + + @Override + public String getJobIndex() { + return JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return CommandManagerJobRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + CommandManagerJobParameter jobParameter = new CommandManagerJobParameter(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, + parser.nextToken(), + parser + ); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case CommandManagerJobParameter.NAME_FIELD: + jobParameter.setJobName(parser.text()); + break; + case CommandManagerJobParameter.ENABLED_FILED: + jobParameter.setEnabled(parser.booleanValue()); + break; + case CommandManagerJobParameter.ENABLED_TIME_FILED: + jobParameter.setEnabledTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD: + jobParameter.setLastUpdateTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.SCHEDULE_FIELD: + jobParameter.setSchedule(ScheduleParser.parse(parser)); + break; + case CommandManagerJobParameter.INDEX_NAME_FIELD: + jobParameter.setIndexToWatch(parser.text()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return jobParameter; + }; + } + + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } } diff --git a/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000..69bad7f --- /dev/null +++ b/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright Wazuh Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +com.wazuh.commandmanager.CommandManagerPlugin \ No newline at end of file From e3c46a7e9c1e39c713423ce5ee537a102ecf3ff3 Mon Sep 17 00:00:00 2001 From: f-galland Date: Wed, 18 Sep 2024 12:15:34 -0300 Subject: [PATCH 2/6] Modify build.gradle to load job scheduler on gradlew run --- plugins/command-manager/build.gradle | 296 +++++++++++++++++++-------- 1 file changed, 211 insertions(+), 85 deletions(-) diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 13f5ec8..e86e1cc 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -1,11 +1,44 @@ import org.opensearch.gradle.test.RestIntegTestTask +import java.util.concurrent.Callable -apply plugin: 'java' -apply plugin: 'idea' -apply plugin: 'eclipse' -apply plugin: 'opensearch.opensearchplugin' -apply plugin: 'opensearch.yaml-rest-test' -apply plugin: 'opensearch.pluginzip' +buildscript { + ext { + opensearch_group = "org.opensearch" + + isSnapshot = "true" == System.getProperty("build.snapshot", "true") + opensearch_version = System.getProperty("opensearch.version", "2.16.0-SNAPSHOT") + buildVersionQualifier = System.getProperty("build.version_qualifier", "") + // 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT + version_tokens = opensearch_version.tokenize('-') + opensearch_build = version_tokens[0] + '.0' + if (buildVersionQualifier) { + opensearch_build += "-${buildVersionQualifier}" + } + if (isSnapshot) { + opensearch_build += "-SNAPSHOT" + } + job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) + wazuh_version = System.getProperty("version", "5.0.0") + revision = System.getProperty("revision", "0") + } + + repositories { + mavenLocal() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } + } + + dependencies { + classpath "${opensearch_group}.gradle:build-tools:${opensearch_version}" + } +} + +plugins { + id "com.netflix.nebula.ospackage-base" version "11.6.0" + //id "com.dorongold.task-tree" version "1.5" + id 'java-library' +} def pluginName = 'wazuh-indexer-command-manager' def pluginDescription = 'The Command Manager plugin handles and distributes commands across your Wazuh environment.' @@ -13,118 +46,211 @@ def projectPath = 'com.wazuh' def pathToPlugin = 'commandmanager' def pluginClassName = 'CommandManagerPlugin' +apply plugin: 'java' +apply plugin: 'idea' +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.pluginzip' +apply plugin: 'opensearch.testclusters' + +opensearchplugin { + name pluginName + description pluginDescription + classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + version = "${wazuh_version}" + ".${revision}" + extendedPlugins = ['opensearch-job-scheduler'] + licenseFile rootProject.file('LICENSE.txt') + noticeFile rootProject.file('NOTICE.txt') +} + publishing { - publications { - pluginZip(MavenPublication) { publication -> - pom { - name = pluginName - description = pluginDescription - licenses { - license { - name = "The Apache License, Version 2.0" - url = "http://www.apache.org/licenses/LICENSE-2.0.txt" - } - } - developers { - developer { - name = "Wazuh" - url = "https://wazuh.com" - } - } - } + publications { + pluginZip(MavenPublication) { publication -> + pom { + name = pluginName + description = pluginDescription + //groupId = "org.opensearch.plugin" + groupId = projectPath + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + developers { + developer { + name = 'Wazuh' + url = 'https://www.wazuh.com' + } } + } } + } + repositories { + maven { + name = "Snapshots" // optional target repository name + url = "https://aws.oss.sonatype.org/content/repositories/snapshots" + credentials { + username "$System.env.SONATYPE_USERNAME" + password "$System.env.SONATYPE_PASSWORD" + } + } + } } -allprojects { - group = "${projectPath}" - version = "${wazuh_version}" + ".${revision}" - targetCompatibility = JavaVersion.VERSION_11 - sourceCompatibility = JavaVersion.VERSION_11 +configurations { + zipArchive } -opensearchplugin { - name pluginName - description pluginDescription - classname "${projectPath}.${pathToPlugin}.${pluginClassName}" - extendedPlugins = ['opensearch-job-scheduler'] - licenseFile rootProject.file('LICENSE.txt') - noticeFile rootProject.file('NOTICE.txt') +ext { + projectSubstitutions = [:] + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') } -// This requires an additional Jar not published as part of build-tools -loggerUsageCheck.enabled = false - -// No need to validate pom, as we do not upload to maven/sonatype -validateNebulaPom.enabled = false - -buildscript { - ext { - opensearch_version = System.getProperty("opensearch.version", "2.16.0") - wazuh_version = System.getProperty("version", "5.0.0") - revision = System.getProperty("revision", "0") - } - - repositories { - mavenLocal() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } - mavenCentral() - maven { url "https://plugins.gradle.org/m2/" } - } +plugins.withId('java') { + sourceCompatibility = targetCompatibility = "11" +} - dependencies { - classpath "org.opensearch.gradle:build-tools:${opensearch_version}" - } +allprojects { + group = "org.opensearch" + version = "${opensearch_build}" + plugins.withId('java') { + sourceCompatibility = targetCompatibility = "11" + } } repositories { - mavenLocal() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } - mavenCentral() - maven { url "https://plugins.gradle.org/m2/" } + mavenLocal() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } } dependencies { - compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_version}.0" + zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" + implementation "org.opensearch:opensearch:${opensearch_version}" + compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}" } +javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code +licenseHeaders.enabled = true +// no need to validate pom, as we do not upload to maven/sonatype +validateNebulaPom.enabled = false +dependencyLicenses.enabled = false +thirdPartyAudit.enabled = false +// Allow @Test to be used in test classes not inherited from LuceneTestCase. +forbiddenApis.ignoreFailures = true +// Allow test cases to be named Tests without having to be inherited from LuceneTestCase. +testingConventions.enabled = false +loggerUsageCheck.enabled = false + test { - include '**/*Tests.class' + systemProperty 'tests.security.manager', 'false' + useJUnitPlatform() +} + +File repo = file("$buildDir/testclusters/repo") +def _numNodes = findProperty('numNodes') as Integer ?: 1 + +def opensearch_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile +opensearch_tmp_dir.mkdirs() + +// As of ES 7.7 the sample-extension-plugin is being added to the list of plugins for the testCluster during build before +// the job-scheduler plugin is causing build failures. +// The job-scheduler zip is added explicitly above but the sample-extension-plugin is added implicitly at some time during evaluation. +// Will need to do a deep dive to find out exactly what task adds the sample-extension-plugin and add job-scheduler there but a temporary hack is to +// reorder the plugins list after evaluation but prior to task execution when the plugins are installed. +afterEvaluate { + testClusters.integTest.nodes.each { node -> + def plugins = node.plugins + def firstPlugin = plugins.get(0) + plugins.remove(0) + plugins.add(firstPlugin) + } +} + +tasks.withType(licenseHeaders.class) { + additionalLicense 'AL ', 'Apache', 'Licensed under the Apache License, Version 2.0 (the "License")' } task integTest(type: RestIntegTestTask) { - description = "Run tests against a cluster" - testClassesDirs = sourceSets.test.output.classesDirs - classpath = sourceSets.test.runtimeClasspath + description = "Run tests against a cluster" + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath } tasks.named("check").configure { dependsOn(integTest) } integTest { - // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable - if (System.getProperty("test.debug") != null) { - jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' + dependsOn "bundlePlugin" + systemProperty 'tests.security.manager', 'false' + systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath + + systemProperty "https", System.getProperty("https") + systemProperty "user", System.getProperty("user") + systemProperty "password", System.getProperty("password") + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for + // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. + doFirst { + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can + // use longer timeouts for requests. + def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null + systemProperty 'cluster.debug', isDebuggingCluster + // Set number of nodes system property to be used in tests + systemProperty 'cluster.number_of_nodes', "${_numNodes}" + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() } + } + + // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000' + } } +Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); +integTest.dependsOn(bundle) +integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))} + testClusters.integTest { - testDistribution = "INTEG_TEST" + testDistribution = "INTEG_TEST" + // need to install job-scheduler first, need to assemble job-scheduler first + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + })) - // This installs our plugin into the testClusters - plugin(project.tasks.bundlePlugin.archiveFile) + // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 + if (_numNodes > 1) numberOfNodes = _numNodes + // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore + // i.e. we have to use a custom property to flag when we want to debug opensearch JVM + // since we also support multi node integration tests we increase debugPort per node + if (System.getProperty("cluster.debug") != null) { + def debugPort = 5005 + nodes.forEach { node -> + node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}") + debugPort += 1 + } + } + setting 'path.repo', repo.absolutePath } run { - useCluster testClusters.integTest -} - -// updateVersion: Task to auto update version to the next development iteration -task updateVersion { - onlyIf { System.getProperty('newVersion') } - doLast { - ext.newVersion = System.getProperty('newVersion') - println "Setting version to ${newVersion}." - // String tokenization to support -SNAPSHOT - ant.replaceregexp(file: 'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags: 'g', byline: true) + doFirst { + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() } + } + useCluster testClusters.integTest } - From fa6368b6723b09eedb758bc20b64ea90f087458b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Ruiz?= Date: Tue, 17 Sep 2024 13:55:54 +0200 Subject: [PATCH 3/6] Add PoC extending the Job-Scheduler plugin Not functional yet --- plugins/command-manager/build.gradle | 5 + .../CommandManagerJobParameter.java | 128 ++++++++++++++ .../CommandManagerJobRunner.java | 156 ++++++++++++++++++ .../commandmanager/CommandManagerPlugin.java | 90 +++++++++- ...rch.jobscheduler.spi.JobSchedulerExtension | 6 + 5 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java create mode 100644 plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java create mode 100644 plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 7defcd1..13f5ec8 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -47,6 +47,7 @@ opensearchplugin { name pluginName description pluginDescription classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file('LICENSE.txt') noticeFile rootProject.file('NOTICE.txt') } @@ -83,6 +84,10 @@ repositories { maven { url "https://plugins.gradle.org/m2/" } } +dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_version}.0" +} + test { include '**/*Tests.class' } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java new file mode 100644 index 0000000..a4c63ba --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobParameter.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager; + +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +import java.io.IOException; +import java.time.Instant; + +/** + * A sample job parameter. + *

+ * It adds "indexToWatch" field to {@link ScheduledJobParameter}, which stores the index + * the job runner will watch. + */ +public class CommandManagerJobParameter implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FILED = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FILED = "enabled_time"; + public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field"; + public static final String INDEX_NAME_FIELD = "index_name_to_watch"; + + private String jobName; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + private Schedule schedule; + private String indexToWatch; + + public CommandManagerJobParameter() {} + + public CommandManagerJobParameter(String name, String indexToWatch, Schedule schedule) { + this.jobName = name; + this.indexToWatch = indexToWatch; + this.schedule = schedule; + + Instant now = Instant.now(); + this.isEnabled = true; + this.enabledTime = now; + this.lastUpdateTime = now; + } + + + @Override + public String getName() { + return this.jobName; + } + + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + public String getIndexToWatch() { + return indexToWatch; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + } + + public void setEnabled(boolean enabled) { + isEnabled = enabled; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setIndexToWatch(String indexToWatch) { + this.indexToWatch = indexToWatch; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD, this.jobName); + builder.field(ENABLED_FILED, this.isEnabled); + builder.field(SCHEDULE_FIELD, this.schedule); + builder.field(INDEX_NAME_FIELD, this.indexToWatch); + if (this.enabledTime != null) { + builder.timeField(ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, this.enabledTime.toEpochMilli()); + } + if (this.lastUpdateTime != null) { + builder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD_READABLE, this.lastUpdateTime.toEpochMilli()); + } + builder.endObject(); + + return builder; + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java new file mode 100644 index 0000000..add75de --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerJobRunner.java @@ -0,0 +1,156 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.wazuh.commandmanager; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.UUID; + +/** + * A sample job runner class. + *

+ * The job runner should be a singleton class if it uses OpenSearch client or other objects passed + * from OpenSearch. Because when registering the job runner to JobScheduler plugin, OpenSearch has + * not invoked plugins' createComponents() method. That is saying the plugin is not completely initialized, + * and the OpenSearch {@link org.opensearch.client.Client}, {@link ClusterService} and other objects + * are not available to plugin and this job runner. + *

+ * So we have to move this job runner initialization to {@link Plugin} createComponents() method, and using + * singleton job runner to ensure we register a usable job runner instance to JobScheduler plugin. + *

+ * This sample job runner takes the "indexToWatch" from job parameter and logs that index's shards. + */ +public class CommandManagerJobRunner implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); + + private static CommandManagerJobRunner INSTANCE; + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private CommandManagerJobRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public static CommandManagerJobRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (CommandManagerJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new CommandManagerJobRunner(); + return INSTANCE; + } + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof CommandManagerJobParameter)) { + throw new IllegalStateException( + "Job parameter is not instance of SampleJobParameter, type: " + jobParameter.getClass().getCanonicalName() + ); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + final LockService lockService = context.getLockService(); + + Runnable runnable = () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + if (lock == null) { + return; + } + + CommandManagerJobParameter parameter = (CommandManagerJobParameter) jobParameter; + StringBuilder msg = new StringBuilder(); + msg.append("Watching index ").append(parameter.getIndexToWatch()).append("\n"); + + List shardRoutingList = this.clusterService + .state() + .routingTable() + .allShards(parameter.getIndexToWatch()); + + for (ShardRouting shardRouting : shardRoutingList) { + msg.append(shardRouting.shardId().getId()) + .append("\t") + .append(shardRouting.currentNodeId()) + .append("\t") + .append(shardRouting.active() ? "active" : "inactive") + .append("\n"); + } + log.info(msg.toString()); + runTaskForIntegrationTests(parameter); + runTaskForLockIntegrationTests(parameter); + + lockService.release( + lock, + ActionListener.wrap(released -> { + log.info("Released lock for job {}", jobParameter.getName()); + }, exception -> { + throw new IllegalStateException("Failed to release lock."); + }) + ); + }, exception -> { + throw new IllegalStateException("Failed to acquire lock."); + })); + } + }; + + threadPool.generic().submit(runnable); + } + + private void runTaskForIntegrationTests(CommandManagerJobParameter jobParameter) { + this.client.index( + new IndexRequest(jobParameter.getIndexToWatch()).id(UUID.randomUUID().toString()) + .source("{\"message\": \"message\"}", XContentType.JSON) + ); + } + + private void runTaskForLockIntegrationTests(CommandManagerJobParameter jobParameter) throws InterruptedException { + if (jobParameter.getName().equals("sample-job-lock-test-it")) { + Thread.sleep(180000); + } + } +} diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index 267e0f4..38a83ba 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -7,8 +7,11 @@ */ package com.wazuh.commandmanager; + import com.wazuh.commandmanager.index.CommandIndex; import com.wazuh.commandmanager.rest.action.RestPostCommandAction; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; @@ -19,8 +22,14 @@ import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -30,6 +39,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; +import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -40,11 +51,16 @@ * receive raw commands from the Wazuh Server. These commands are processed, * indexed and sent back to the Server for its delivery to, in most cases, the * Agents. + *

+ * The Command Manager plugin is also a JobScheduler extension plugin. */ -public class CommandManagerPlugin extends Plugin implements ActionPlugin { +public class CommandManagerPlugin extends Plugin implements ActionPlugin, JobSchedulerExtension { public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_commandmanager"; public static final String COMMAND_MANAGER_INDEX_NAME = ".commands"; public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands"; + public static final String JOB_INDEX_NAME = ".commands"; + + private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class); private CommandIndex commandIndex; @@ -63,6 +79,12 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { this.commandIndex = new CommandIndex(client, clusterService, threadPool); + + CommandManagerJobRunner jobRunner = CommandManagerJobRunner.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); + return Collections.emptyList(); } @@ -77,4 +99,70 @@ public List getRestHandlers( ) { return Collections.singletonList(new RestPostCommandAction(this.commandIndex)); } + + @Override + public String getJobType() { + return "command_manager_scheduler_extension"; + } + + @Override + public String getJobIndex() { + return JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return CommandManagerJobRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + CommandManagerJobParameter jobParameter = new CommandManagerJobParameter(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, + parser.nextToken(), + parser + ); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case CommandManagerJobParameter.NAME_FIELD: + jobParameter.setJobName(parser.text()); + break; + case CommandManagerJobParameter.ENABLED_FILED: + jobParameter.setEnabled(parser.booleanValue()); + break; + case CommandManagerJobParameter.ENABLED_TIME_FILED: + jobParameter.setEnabledTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD: + jobParameter.setLastUpdateTime(parseInstantValue(parser)); + break; + case CommandManagerJobParameter.SCHEDULE_FIELD: + jobParameter.setSchedule(ScheduleParser.parse(parser)); + break; + case CommandManagerJobParameter.INDEX_NAME_FIELD: + jobParameter.setIndexToWatch(parser.text()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return jobParameter; + }; + } + + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } } diff --git a/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000..69bad7f --- /dev/null +++ b/plugins/command-manager/src/main/java/resources/META-INF/services/sample-extension-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright Wazuh Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +com.wazuh.commandmanager.CommandManagerPlugin \ No newline at end of file From a98c74ec563204bee455ec2616da70759a52c401 Mon Sep 17 00:00:00 2001 From: f-galland Date: Wed, 18 Sep 2024 12:15:34 -0300 Subject: [PATCH 4/6] Modify build.gradle to load job scheduler on gradlew run --- plugins/command-manager/build.gradle | 296 +++++++++++++++++++-------- 1 file changed, 211 insertions(+), 85 deletions(-) diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 13f5ec8..e86e1cc 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -1,11 +1,44 @@ import org.opensearch.gradle.test.RestIntegTestTask +import java.util.concurrent.Callable -apply plugin: 'java' -apply plugin: 'idea' -apply plugin: 'eclipse' -apply plugin: 'opensearch.opensearchplugin' -apply plugin: 'opensearch.yaml-rest-test' -apply plugin: 'opensearch.pluginzip' +buildscript { + ext { + opensearch_group = "org.opensearch" + + isSnapshot = "true" == System.getProperty("build.snapshot", "true") + opensearch_version = System.getProperty("opensearch.version", "2.16.0-SNAPSHOT") + buildVersionQualifier = System.getProperty("build.version_qualifier", "") + // 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT + version_tokens = opensearch_version.tokenize('-') + opensearch_build = version_tokens[0] + '.0' + if (buildVersionQualifier) { + opensearch_build += "-${buildVersionQualifier}" + } + if (isSnapshot) { + opensearch_build += "-SNAPSHOT" + } + job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) + wazuh_version = System.getProperty("version", "5.0.0") + revision = System.getProperty("revision", "0") + } + + repositories { + mavenLocal() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } + } + + dependencies { + classpath "${opensearch_group}.gradle:build-tools:${opensearch_version}" + } +} + +plugins { + id "com.netflix.nebula.ospackage-base" version "11.6.0" + //id "com.dorongold.task-tree" version "1.5" + id 'java-library' +} def pluginName = 'wazuh-indexer-command-manager' def pluginDescription = 'The Command Manager plugin handles and distributes commands across your Wazuh environment.' @@ -13,118 +46,211 @@ def projectPath = 'com.wazuh' def pathToPlugin = 'commandmanager' def pluginClassName = 'CommandManagerPlugin' +apply plugin: 'java' +apply plugin: 'idea' +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.pluginzip' +apply plugin: 'opensearch.testclusters' + +opensearchplugin { + name pluginName + description pluginDescription + classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + version = "${wazuh_version}" + ".${revision}" + extendedPlugins = ['opensearch-job-scheduler'] + licenseFile rootProject.file('LICENSE.txt') + noticeFile rootProject.file('NOTICE.txt') +} + publishing { - publications { - pluginZip(MavenPublication) { publication -> - pom { - name = pluginName - description = pluginDescription - licenses { - license { - name = "The Apache License, Version 2.0" - url = "http://www.apache.org/licenses/LICENSE-2.0.txt" - } - } - developers { - developer { - name = "Wazuh" - url = "https://wazuh.com" - } - } - } + publications { + pluginZip(MavenPublication) { publication -> + pom { + name = pluginName + description = pluginDescription + //groupId = "org.opensearch.plugin" + groupId = projectPath + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + developers { + developer { + name = 'Wazuh' + url = 'https://www.wazuh.com' + } } + } } + } + repositories { + maven { + name = "Snapshots" // optional target repository name + url = "https://aws.oss.sonatype.org/content/repositories/snapshots" + credentials { + username "$System.env.SONATYPE_USERNAME" + password "$System.env.SONATYPE_PASSWORD" + } + } + } } -allprojects { - group = "${projectPath}" - version = "${wazuh_version}" + ".${revision}" - targetCompatibility = JavaVersion.VERSION_11 - sourceCompatibility = JavaVersion.VERSION_11 +configurations { + zipArchive } -opensearchplugin { - name pluginName - description pluginDescription - classname "${projectPath}.${pathToPlugin}.${pluginClassName}" - extendedPlugins = ['opensearch-job-scheduler'] - licenseFile rootProject.file('LICENSE.txt') - noticeFile rootProject.file('NOTICE.txt') +ext { + projectSubstitutions = [:] + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') } -// This requires an additional Jar not published as part of build-tools -loggerUsageCheck.enabled = false - -// No need to validate pom, as we do not upload to maven/sonatype -validateNebulaPom.enabled = false - -buildscript { - ext { - opensearch_version = System.getProperty("opensearch.version", "2.16.0") - wazuh_version = System.getProperty("version", "5.0.0") - revision = System.getProperty("revision", "0") - } - - repositories { - mavenLocal() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } - mavenCentral() - maven { url "https://plugins.gradle.org/m2/" } - } +plugins.withId('java') { + sourceCompatibility = targetCompatibility = "11" +} - dependencies { - classpath "org.opensearch.gradle:build-tools:${opensearch_version}" - } +allprojects { + group = "org.opensearch" + version = "${opensearch_build}" + plugins.withId('java') { + sourceCompatibility = targetCompatibility = "11" + } } repositories { - mavenLocal() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } - mavenCentral() - maven { url "https://plugins.gradle.org/m2/" } + mavenLocal() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } } dependencies { - compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_version}.0" + zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" + implementation "org.opensearch:opensearch:${opensearch_version}" + compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}" } +javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code +licenseHeaders.enabled = true +// no need to validate pom, as we do not upload to maven/sonatype +validateNebulaPom.enabled = false +dependencyLicenses.enabled = false +thirdPartyAudit.enabled = false +// Allow @Test to be used in test classes not inherited from LuceneTestCase. +forbiddenApis.ignoreFailures = true +// Allow test cases to be named Tests without having to be inherited from LuceneTestCase. +testingConventions.enabled = false +loggerUsageCheck.enabled = false + test { - include '**/*Tests.class' + systemProperty 'tests.security.manager', 'false' + useJUnitPlatform() +} + +File repo = file("$buildDir/testclusters/repo") +def _numNodes = findProperty('numNodes') as Integer ?: 1 + +def opensearch_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile +opensearch_tmp_dir.mkdirs() + +// As of ES 7.7 the sample-extension-plugin is being added to the list of plugins for the testCluster during build before +// the job-scheduler plugin is causing build failures. +// The job-scheduler zip is added explicitly above but the sample-extension-plugin is added implicitly at some time during evaluation. +// Will need to do a deep dive to find out exactly what task adds the sample-extension-plugin and add job-scheduler there but a temporary hack is to +// reorder the plugins list after evaluation but prior to task execution when the plugins are installed. +afterEvaluate { + testClusters.integTest.nodes.each { node -> + def plugins = node.plugins + def firstPlugin = plugins.get(0) + plugins.remove(0) + plugins.add(firstPlugin) + } +} + +tasks.withType(licenseHeaders.class) { + additionalLicense 'AL ', 'Apache', 'Licensed under the Apache License, Version 2.0 (the "License")' } task integTest(type: RestIntegTestTask) { - description = "Run tests against a cluster" - testClassesDirs = sourceSets.test.output.classesDirs - classpath = sourceSets.test.runtimeClasspath + description = "Run tests against a cluster" + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath } tasks.named("check").configure { dependsOn(integTest) } integTest { - // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable - if (System.getProperty("test.debug") != null) { - jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' + dependsOn "bundlePlugin" + systemProperty 'tests.security.manager', 'false' + systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath + + systemProperty "https", System.getProperty("https") + systemProperty "user", System.getProperty("user") + systemProperty "password", System.getProperty("password") + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for + // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. + doFirst { + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can + // use longer timeouts for requests. + def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null + systemProperty 'cluster.debug', isDebuggingCluster + // Set number of nodes system property to be used in tests + systemProperty 'cluster.number_of_nodes', "${_numNodes}" + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() } + } + + // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000' + } } +Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); +integTest.dependsOn(bundle) +integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))} + testClusters.integTest { - testDistribution = "INTEG_TEST" + testDistribution = "INTEG_TEST" + // need to install job-scheduler first, need to assemble job-scheduler first + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + })) - // This installs our plugin into the testClusters - plugin(project.tasks.bundlePlugin.archiveFile) + // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 + if (_numNodes > 1) numberOfNodes = _numNodes + // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore + // i.e. we have to use a custom property to flag when we want to debug opensearch JVM + // since we also support multi node integration tests we increase debugPort per node + if (System.getProperty("cluster.debug") != null) { + def debugPort = 5005 + nodes.forEach { node -> + node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}") + debugPort += 1 + } + } + setting 'path.repo', repo.absolutePath } run { - useCluster testClusters.integTest -} - -// updateVersion: Task to auto update version to the next development iteration -task updateVersion { - onlyIf { System.getProperty('newVersion') } - doLast { - ext.newVersion = System.getProperty('newVersion') - println "Setting version to ${newVersion}." - // String tokenization to support -SNAPSHOT - ant.replaceregexp(file: 'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags: 'g', byline: true) + doFirst { + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() } + } + useCluster testClusters.integTest } - From 110298fe48afefde4898e1f98f5daa6a9978dc50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Ruiz?= Date: Mon, 14 Oct 2024 17:45:27 +0200 Subject: [PATCH 5/6] Reformat code --- .../java/com/wazuh/commandmanager/CommandManagerPlugin.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index bf2a009..928eb56 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -14,7 +14,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; - import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -31,13 +30,11 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; - import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; - import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -46,7 +43,6 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; - import java.util.List; import java.util.function.Supplier; From 7f03e482e760c0b1fd34163a0a41437d95def9a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Ruiz?= Date: Mon, 14 Oct 2024 18:16:54 +0200 Subject: [PATCH 6/6] Rebuild build.gradle --- plugins/command-manager/build.gradle | 270 ++++++++------------------- 1 file changed, 83 insertions(+), 187 deletions(-) diff --git a/plugins/command-manager/build.gradle b/plugins/command-manager/build.gradle index 79569e2..5820b8e 100644 --- a/plugins/command-manager/build.gradle +++ b/plugins/command-manager/build.gradle @@ -1,50 +1,6 @@ import org.opensearch.gradle.test.RestIntegTestTask -import java.util.concurrent.Callable - -buildscript { - ext { - opensearch_group = "org.opensearch" - - isSnapshot = "true" == System.getProperty("build.snapshot", "true") - opensearch_version = System.getProperty("opensearch.version", "2.16.0-SNAPSHOT") - buildVersionQualifier = System.getProperty("build.version_qualifier", "") - // 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT - version_tokens = opensearch_version.tokenize('-') - opensearch_build = version_tokens[0] + '.0' - if (buildVersionQualifier) { - opensearch_build += "-${buildVersionQualifier}" - } - if (isSnapshot) { - opensearch_build += "-SNAPSHOT" - } - job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) - wazuh_version = System.getProperty("version", "5.0.0") - revision = System.getProperty("revision", "0") - } - - repositories { - mavenLocal() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } - mavenCentral() - maven { url "https://plugins.gradle.org/m2/" } - } - - dependencies { - classpath "${opensearch_group}.gradle:build-tools:${opensearch_version}" - } -} - -plugins { - id "com.netflix.nebula.ospackage-base" version "11.6.0" - //id "com.dorongold.task-tree" version "1.5" - id 'java-library' -} -def pluginName = 'wazuh-indexer-command-manager' -def pluginDescription = 'The Command Manager plugin handles and distributes commands across your Wazuh environment.' -def projectPath = 'com.wazuh' -def pathToPlugin = 'commandmanager' -def pluginClassName = 'CommandManagerPlugin' +import java.util.concurrent.Callable apply plugin: 'java' apply plugin: 'idea' @@ -52,63 +8,33 @@ apply plugin: 'opensearch.opensearchplugin' apply plugin: 'opensearch.pluginzip' apply plugin: 'opensearch.testclusters' -opensearchplugin { - name pluginName - description pluginDescription - classname "${projectPath}.${pathToPlugin}.${pluginClassName}" - version = "${wazuh_version}" + ".${revision}" - extendedPlugins = ['opensearch-job-scheduler'] - licenseFile rootProject.file('LICENSE.txt') - noticeFile rootProject.file('NOTICE.txt') -} +def pluginName = 'wazuh-indexer-command-manager' +def pluginDescription = 'The Command Manager plugin handles and distributes commands across your Wazuh environment.' +def projectPath = 'com.wazuh' +def pathToPlugin = 'commandmanager' +def pluginClassName = 'CommandManagerPlugin' publishing { - publications { - pluginZip(MavenPublication) { publication -> - pom { - name = pluginName - description = pluginDescription - //groupId = "org.opensearch.plugin" - groupId = projectPath - licenses { - license { - name = 'The Apache License, Version 2.0' - url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' - } - } - developers { - developer { - name = 'Wazuh' - url = 'https://www.wazuh.com' - } + publications { + pluginZip(MavenPublication) { publication -> + pom { + name = pluginName + description = pluginDescription + licenses { + license { + name = "The Apache License, Version 2.0" + url = "http://www.apache.org/licenses/LICENSE-2.0.txt" + } + } + developers { + developer { + name = "Wazuh" + url = "https://wazuh.com" + } + } + } } - } } - } - repositories { - maven { - name = "Snapshots" // optional target repository name - url = "https://aws.oss.sonatype.org/content/repositories/snapshots" - credentials { - username "$System.env.SONATYPE_USERNAME" - password "$System.env.SONATYPE_PASSWORD" - } - } - } -} - -configurations { - zipArchive -} - -ext { - projectSubstitutions = [:] - licenseFile = rootProject.file('LICENSE.txt') - noticeFile = rootProject.file('NOTICE.txt') -} - -plugins.withId('java') { - sourceCompatibility = targetCompatibility = "11" } allprojects { @@ -122,16 +48,22 @@ opensearchplugin { name pluginName description pluginDescription classname "${projectPath}.${pathToPlugin}.${pluginClassName}" + version = "${wazuh_version}" + ".${revision}" + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file('LICENSE.txt') noticeFile rootProject.file('NOTICE.txt') } +configurations { + zipArchive +} + def versions = [ - httpclient5: "5.4", - httpcore5: "5.3", - slf4j: "1.7.36", - log4j: "2.23.1", - conscrypt: "2.5.2" + httpclient5: "5.4", + httpcore5 : "5.3", + slf4j : "1.7.36", + log4j : "2.23.1", + conscrypt : "2.5.2" ] dependencies { @@ -141,6 +73,11 @@ dependencies { api "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}" api "org.slf4j:slf4j-api:${versions.slf4j}" api "org.conscrypt:conscrypt-openjdk-uber:${versions.conscrypt}" + + /// Job Scheduler stuff + zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}" + implementation "org.opensearch:opensearch:${opensearch_version}" + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}" } // This requires an additional Jar not published as part of build-tools @@ -158,6 +95,8 @@ dependencyLicenses.enabled = false buildscript { ext { opensearch_version = System.getProperty("opensearch.version", "2.16.0") + opensearch_build = opensearch_version + ".0" + job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) wazuh_version = System.getProperty("version", "5.0.0") revision = System.getProperty("revision", "0") } @@ -175,78 +114,42 @@ buildscript { } repositories { - mavenLocal() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } - mavenCentral() - maven { url "https://plugins.gradle.org/m2/" } -} - -dependencies { - zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" - implementation "org.opensearch:opensearch:${opensearch_version}" - compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}" + mavenLocal() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } } test { - systemProperty 'tests.security.manager', 'false' - useJUnitPlatform() + include '**/*Tests.class' } -File repo = file("$buildDir/testclusters/repo") -def _numNodes = findProperty('numNodes') as Integer ?: 1 - -def opensearch_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile -opensearch_tmp_dir.mkdirs() - // As of ES 7.7 the sample-extension-plugin is being added to the list of plugins for the testCluster during build before // the job-scheduler plugin is causing build failures. // The job-scheduler zip is added explicitly above but the sample-extension-plugin is added implicitly at some time during evaluation. // Will need to do a deep dive to find out exactly what task adds the sample-extension-plugin and add job-scheduler there but a temporary hack is to // reorder the plugins list after evaluation but prior to task execution when the plugins are installed. afterEvaluate { - testClusters.integTest.nodes.each { node -> - def plugins = node.plugins - def firstPlugin = plugins.get(0) - plugins.remove(0) - plugins.add(firstPlugin) - } + testClusters.integTest.nodes.each { node -> + def plugins = node.plugins + def firstPlugin = plugins.get(0) + plugins.remove(0) + plugins.add(firstPlugin) + } } task integTest(type: RestIntegTestTask) { - description = "Run tests against a cluster" - testClassesDirs = sourceSets.test.output.classesDirs - classpath = sourceSets.test.runtimeClasspath + description = "Run tests against a cluster" + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath } tasks.named("check").configure { dependsOn(integTest) } integTest { - dependsOn "bundlePlugin" - systemProperty 'tests.security.manager', 'false' - systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath - - systemProperty "https", System.getProperty("https") - systemProperty "user", System.getProperty("user") - systemProperty "password", System.getProperty("password") - // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for - // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. - doFirst { - // Tell the test JVM if the cluster JVM is running under a debugger so that tests can - // use longer timeouts for requests. - def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null - systemProperty 'cluster.debug', isDebuggingCluster - // Set number of nodes system property to be used in tests - systemProperty 'cluster.number_of_nodes', "${_numNodes}" - // There seems to be an issue when running multi node run or integ tasks with unicast_hosts - // not being written, the waitForAllConditions ensures it's written - getClusters().forEach { cluster -> - cluster.waitForAllConditions() + // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000' } - } - - // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable - if (System.getProperty("test.debug") != null) { - jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000' - } } Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); @@ -254,42 +157,35 @@ integTest.dependsOn(bundle) integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))} testClusters.integTest { - testDistribution = "INTEG_TEST" - // need to install job-scheduler first, need to assemble job-scheduler first - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { + testDistribution = "INTEG_TEST" + // need to install job-scheduler first, need to assemble job-scheduler first + plugin(provider(new Callable() { @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.getSingleFile() + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } } - } - } - })) + })) - // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 - if (_numNodes > 1) numberOfNodes = _numNodes - // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore - // i.e. we have to use a custom property to flag when we want to debug opensearch JVM - // since we also support multi node integration tests we increase debugPort per node - if (System.getProperty("cluster.debug") != null) { - def debugPort = 5005 - nodes.forEach { node -> - node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}") - debugPort += 1 - } - } - setting 'path.repo', repo.absolutePath + // This installs our plugin into the testClusters + // plugin(project.tasks.bundlePlugin.archiveFile) } run { - doFirst { - // There seems to be an issue when running multi node run or integ tasks with unicast_hosts - // not being written, the waitForAllConditions ensures it's written - getClusters().forEach { cluster -> - cluster.waitForAllConditions() + useCluster testClusters.integTest +} + +// updateVersion: Task to auto update version to the next development iteration +task updateVersion { + onlyIf { System.getProperty('newVersion') } + doLast { + ext.newVersion = System.getProperty('newVersion') + println "Setting version to ${newVersion}." + // String tokenization to support -SNAPSHOT + ant.replaceregexp(file: 'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags: 'g', byline: true) } - } - useCluster testClusters.integTest }