Skip to content

Commit

Permalink
Implement Job Scheduler logic (#103)
Browse files Browse the repository at this point in the history
* Add PoC extending the Job-Scheduler plugin

Not functional yet

* Modify build.gradle to load job scheduler on gradlew run

* Add PoC extending the Job-Scheduler plugin

Not functional yet

* Modify build.gradle to load job scheduler on gradlew run

* Reformat code

* Rebuild build.gradle

* Move JobSchedulerExtension to the right resources directory

* Fix logger class name

* Expose a POST endpoint to schedule tasks.

* Init jobScheduler

* Check if runner interface is being called.

* Try getting runjob() executed

* Correct META-INF directory contents

* Create Job on plugin startup

* Move create() to its own JobDocument class.

* Fix switch statement

* Schedule job on cluster startup.

* Make sure to schedule the job only on new clusters.

* Split job indexing to a separate method

* Create a job that logs the search results on the .commands index.

* Make the command manager run searches on the commands index.

* Check if index exists before running search.

* Wrap searchResponse in CompletableFuture

* Retrieve commands sorted by ascending timeout time.

* Fix sort field

* Adding scroll logic

* Start implementing PIT search

* Send the Search Response to a test REST endpoint

* Change the command.status of submitted commands to DONE

* Fix build.gradle after merging master

* Remove old RestPostCommandAction class after master merge

* Tidy up after master merge

* Fix gradlew run error

* Apply spotless

* Switch to synchronous code

* Adding pagination through PointInTime classes

* Adding pagination through PointInTime classes

* Only set sort fields for the first page.

* Fix empty search response errors

* Improve while loop

* Make PointInTime a non-singleton class

* Simplificar get instancia. Agregar cabecera de licencia faltante.

* Fix linter warnings

* Block until PointInTimeBuilder is obtained

* Create one pit per search

* Make SearchJob a non-singleton class

* Indexing operation timeout should be set in ms

* Indexing operation timeout should be set in ms

* Remove mockito explicit dependency

* Removing settings object

* Removing unneeded CommandManagerJobRunner object

* JavaDocs for scheduleCommandJob method

* Use fqn for job index name constant

* Add javadocs for parseInstantValue

* Add javadocs for CommandManagerJobParameter

* Add javadocs for CommandManagerJobRunner

* Remove unneeded parameter

* Add javadocs to JobDocument class

* Add javadocs to JobDocument's create() method

* Rename SearchJob to SearchThread and make it implement Runnable

* Improve exception handling

* Improve exception handling

* Use URI and credentials in settings

* Rename updateStatusField to setSentStatus and improve it

* Make "command" a constant

* Make queries constant

* Refactor totalHits()

* Remove unneeded getters

* Improve buildPit() exception handling

* Load settings from environment

* Fix JavaDocs

* Improve searchAfter logic

* Fixing errors after merge

* Fix build.gradle errors

Applies Spotless

* Fix settings related errors

* Fix http client permission issues

* Send orders array with proper authentication

* Only set status to SENT when a non-error response has been received

* Add missing import

* Use https for the Management API mock

---------

Signed-off-by: Álex Ruiz <[email protected]>
Signed-off-by: Fede Galland <[email protected]>
Co-authored-by: Álex Ruiz <[email protected]>
Co-authored-by: Malena Casas <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent 7f24209 commit 270e9b5
Show file tree
Hide file tree
Showing 7 changed files with 790 additions and 19 deletions.
51 changes: 36 additions & 15 deletions plugins/command-manager/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@

import org.opensearch.gradle.test.RestIntegTestTask
import java.util.concurrent.Callable

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.18.0-SNAPSHOT")
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
opensearch_build = opensearch_no_snapshot + ".0"
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
wazuh_version = System.getProperty("version", "5.0.0")
revision = System.getProperty("revision", "0")
}
Expand All @@ -24,7 +28,7 @@ apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.rest-test'
apply plugin: 'opensearch.pluginzip'

def pluginName = 'wazuh-indexer-command-manager'
Expand Down Expand Up @@ -67,6 +71,7 @@ opensearchplugin {
name pluginName
description pluginDescription
classname "${projectPath}.${pathToPlugin}.${pluginClassName}"
extendedPlugins = ['opensearch-job-scheduler']
licenseFile rootProject.file('LICENSE.txt')
noticeFile rootProject.file('NOTICE.txt')
}
Expand All @@ -88,6 +93,10 @@ def versions = [
imposter: "4.1.2"
]

configurations {
zipArchive
}

dependencies {
implementation "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
implementation "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
Expand All @@ -99,6 +108,11 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

// Job Scheduler stuff
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}"
// implementation "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"

testImplementation "junit:junit:${versions.junit}"

// imposter
Expand Down Expand Up @@ -134,30 +148,37 @@ test {
jvmArgs "-Djava.security.policy=./plugins/command-manager/src/main/plugin-metadata/plugin-security.policy/plugin-security.policy"
}

task integTest(type: RestIntegTestTask) {
description = "Run tests against a cluster"
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
def getJobSchedulerPlugin() {
provider(new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-job-scheduler*'
}.singleFile
}
}
}
})
}
tasks.named("check").configure { dependsOn(integTest) }

integTest {
testClusters.integTest {
plugin(getJobSchedulerPlugin())
testDistribution = "ARCHIVE"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)

// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}
}

testClusters.integTest {
testDistribution = "INTEG_TEST"
//testDistribution = "ARCHIVE"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)

// add customized keystore
keystore 'm_api.auth.username', 'admin'
keystore 'm_api.auth.password', 'test'
keystore 'm_api.uri', 'http://127.0.0.1:55000' // base URI of the M_API
keystore 'm_api.uri', 'https://127.0.0.1:55000' // base URI of the M_API
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,83 @@
*/
package com.wazuh.commandmanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.*;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ReloadablePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.*;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import com.wazuh.commandmanager.index.CommandIndex;
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobParameter;
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner;
import com.wazuh.commandmanager.jobscheduler.JobDocument;
import com.wazuh.commandmanager.rest.RestPostCommandAction;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.httpclient.HttpRestClient;

/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands
* from the Wazuh Server. These commands are processed, indexed and sent back to the Server for its
* delivery to, in most cases, the Agents.
* delivery to, in most cases, the Agents. The Command Manager plugin exposes an HTTP API with a
* single endpoint to receive raw commands from the Wazuh Server. These commands are processed,
* indexed and sent back to the Server for its delivery to, in most cases, the Agents.
*
* <p>The Command Manager plugin is also a JobScheduler extension plugin.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin, ReloadablePlugin {
public class CommandManagerPlugin extends Plugin
implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager";
public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands";
public static final String COMMAND_MANAGER_INDEX_NAME = ".commands";
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";
public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command";
public static final String JOB_INDEX_NAME = ".scheduled-commands";
public static final Integer JOB_PERIOD_MINUTES = 1;
public static final Integer PAGE_SIZE = 2;
public static final Long DEFAULT_TIMEOUT_SECONDS = 20L;
public static final TimeValue PIT_KEEPALIVE_SECONDS = TimeValue.timeValueSeconds(30L);

private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class);
public static final String JOB_TYPE = "command_manager_scheduler_extension";

private CommandIndex commandIndex;
private JobDocument jobDocument;

@Override
public Collection<Object> createComponents(
Expand All @@ -68,9 +102,50 @@ public Collection<Object> createComponents(
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
PluginSettings.getInstance(environment.settings());

// JobSchedulerExtension stuff
CommandManagerJobRunner.getInstance()
.setThreadPool(threadPool)
.setClient(client)
.setClusterService(clusterService)
.setEnvironment(environment);

scheduleCommandJob(client, clusterService, threadPool);

return Collections.emptyList();
}

/**
* Indexes a document into the jobs index, so that JobScheduler plugin can run it
*
* @param client: The cluster client, used for indexing
* @param clusterService: Provides the addListener method. We use it to determine if this is a
* new cluster.
* @param threadPool: Used by jobDocument to create the document in a thread.
*/
private void scheduleCommandJob(
Client client, ClusterService clusterService, ThreadPool threadPool) {
clusterService.addListener(
event -> {
if (event.localNodeClusterManager() && event.isNewCluster()) {
jobDocument = JobDocument.getInstance();
CompletableFuture<IndexResponse> indexResponseCompletableFuture =
jobDocument.create(
client,
threadPool,
UUIDs.base64UUID(),
getJobType(),
JOB_PERIOD_MINUTES);
indexResponseCompletableFuture.thenAccept(
indexResponse -> {
log.info(
"Scheduled task successfully, response: {}",
indexResponse.getResult().toString());
});
}
});
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
Expand Down Expand Up @@ -100,6 +175,75 @@ public void reload(Settings settings) {
// xxxService.refreshAndClearCache(commandManagerSettings);
}

@Override
public String getJobType() {
return CommandManagerPlugin.JOB_TYPE;
}

@Override
public String getJobIndex() {
return CommandManagerPlugin.JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
log.info("getJobRunner() executed");
return CommandManagerJobRunner.getInstance();
}

@Override
public ScheduledJobParser getJobParser() {
log.info("getJobParser() executed");
return (parser, id, jobDocVersion) -> {
CommandManagerJobParameter jobParameter = new CommandManagerJobParameter();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case CommandManagerJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case CommandManagerJobParameter.ENABLED_FIELD:
jobParameter.setEnabled(parser.booleanValue());
break;
case CommandManagerJobParameter.ENABLED_TIME_FIELD:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case CommandManagerJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
default:
XContentParserUtils.throwUnknownToken(
parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

/**
* Returns the proper Instant object with milliseconds from the Unix epoch when the current
* token actually holds a value.
*
* @param parser: The parser as provided by JobScheduler
*/
private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

/**
* Close the resources opened by this plugin.
*
Expand Down
Loading

0 comments on commit 270e9b5

Please sign in to comment.