Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Job Scheduler logic #103

Merged
merged 90 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
6e97560
Add PoC extending the Job-Scheduler plugin
AlexRuiz7 Sep 17, 2024
e3c46a7
Modify build.gradle to load job scheduler on gradlew run
f-galland Sep 18, 2024
fa6368b
Add PoC extending the Job-Scheduler plugin
AlexRuiz7 Sep 17, 2024
a98c74e
Modify build.gradle to load job scheduler on gradlew run
f-galland Sep 18, 2024
e6ddd40
Merge branch '65-job-scheduler-research' of github.com:wazuh/wazuh-in…
AlexRuiz7 Oct 14, 2024
110298f
Reformat code
AlexRuiz7 Oct 14, 2024
0504324
Merge remote-tracking branch 'origin' into 65-job-scheduler-research
AlexRuiz7 Oct 14, 2024
7f03e48
Rebuild build.gradle
AlexRuiz7 Oct 14, 2024
5afd106
Move JobSchedulerExtension to the right resources directory
f-galland Oct 14, 2024
3dc160d
Fix logger class name
f-galland Oct 14, 2024
8a1fad7
Expose a POST endpoint to schedule tasks.
f-galland Oct 16, 2024
bc2af95
Init jobScheduler
AlexRuiz7 Oct 17, 2024
b8b58f2
Check if runner interface is being called.
f-galland Oct 18, 2024
8311b4a
Try getting runjob() executed
f-galland Oct 21, 2024
b6dbdaa
Correct META-INF directory contents
f-galland Oct 22, 2024
dc61799
Merge branch 'job-scheduler-runjob-tests' into 87-implement-job-schdu…
f-galland Oct 22, 2024
925ffa5
Create Job on plugin startup
f-galland Oct 22, 2024
b90b44e
Move create() to its own JobDocument class.
f-galland Oct 22, 2024
abf32d2
Fix switch statement
f-galland Oct 22, 2024
8ea3334
Schedule job on cluster startup.
f-galland Oct 22, 2024
8c7268e
Make sure to schedule the job only on new clusters.
f-galland Oct 22, 2024
a69f4fc
Split job indexing to a separate method
f-galland Oct 23, 2024
a1f431c
Create a job that logs the search results on the .commands index.
f-galland Oct 23, 2024
bfbbbdf
Make the command manager run searches on the commands index.
f-galland Oct 23, 2024
a12b3cd
Check if index exists before running search.
f-galland Oct 25, 2024
ec5f041
Wrap searchResponse in CompletableFuture
f-galland Oct 25, 2024
4d41276
Retrieve commands sorted by ascending timeout time.
f-galland Oct 25, 2024
271d4d7
Fix sort field
f-galland Oct 25, 2024
0aae9ff
Adding scroll logic
f-galland Oct 28, 2024
a96841b
Start implementing PIT search
f-galland Oct 30, 2024
d6d93d5
Send the Search Response to a test REST endpoint
f-galland Oct 30, 2024
e4d3d8d
Change the command.status of submitted commands to DONE
f-galland Oct 31, 2024
40785d2
Merging master branch
f-galland Oct 31, 2024
57ef87b
Fix build.gradle after merging master
f-galland Oct 31, 2024
b749750
Remove old RestPostCommandAction class after master merge
f-galland Oct 31, 2024
c8015ab
Tidy up after master merge
f-galland Oct 31, 2024
87bfd63
Fix gradlew run error
f-galland Oct 31, 2024
0b5296f
Merge branch 'master' into 87-implement-job-schduler-logic
AlexRuiz7 Oct 31, 2024
fe4e00f
Apply spotless
AlexRuiz7 Oct 31, 2024
df2b5f3
Switch to synchronous code
f-galland Nov 1, 2024
c01846b
Adding pagination through PointInTime classes
f-galland Nov 4, 2024
f8ad72f
Adding pagination through PointInTime classes
f-galland Nov 4, 2024
bd64900
Only set sort fields for the first page.
f-galland Nov 4, 2024
e8417e6
Fix empty search response errors
f-galland Nov 5, 2024
9ac1039
Improve while loop
f-galland Nov 5, 2024
3fc33ea
Make PointInTime a non-singleton class
f-galland Nov 5, 2024
db50b92
Simplificar get instancia. Agregar cabecera de licencia faltante.
mcasas993 Nov 6, 2024
110b2ce
Fix linter warnings
f-galland Nov 6, 2024
21d1d68
Merge branch 'master' into 87-implement-job-schduler-logic
AlexRuiz7 Nov 7, 2024
052c573
Merge branch 'master' into 87-implement-job-schduler-logic
f-galland Nov 14, 2024
5c700dc
Block until PointInTimeBuilder is obtained
f-galland Nov 21, 2024
a354269
Create one pit per search
f-galland Nov 21, 2024
50b13b5
Make SearchJob a non-singleton class
f-galland Nov 22, 2024
2fbcc4f
Indexing operation timeout should be set in ms
f-galland Nov 25, 2024
c96abbb
Indexing operation timeout should be set in ms
f-galland Nov 25, 2024
c18d12c
Merge branch 'master' into 87-implement-job-schduler-logic
f-galland Nov 25, 2024
b7753cf
Remove mockito explicit dependency
f-galland Nov 28, 2024
2050572
Removing settings object
f-galland Nov 28, 2024
762e289
Removing unneeded CommandManagerJobRunner object
f-galland Nov 28, 2024
6272f90
JavaDocs for scheduleCommandJob method
f-galland Nov 28, 2024
0d16987
Use fqn for job index name constant
f-galland Nov 28, 2024
f76c6da
Add javadocs for parseInstantValue
f-galland Nov 28, 2024
a457fe7
Add javadocs for CommandManagerJobParameter
f-galland Nov 28, 2024
01be3a2
Add javadocs for CommandManagerJobRunner
f-galland Nov 28, 2024
0f352f6
Remove unneeded parameter
f-galland Nov 28, 2024
799f836
Add javadocs to JobDocument class
f-galland Nov 28, 2024
b24f590
Add javadocs to JobDocument's create() method
f-galland Nov 28, 2024
3c35372
Rename SearchJob to SearchThread and make it implement Runnable
f-galland Nov 28, 2024
becae9b
Improve exception handling
f-galland Nov 28, 2024
8a83185
Improve exception handling
f-galland Nov 28, 2024
264d419
Use URI and credentials in settings
f-galland Nov 28, 2024
7d645fa
Rename updateStatusField to setSentStatus and improve it
f-galland Nov 28, 2024
4f20899
Make "command" a constant
f-galland Nov 28, 2024
fef8e3c
Make queries constant
f-galland Nov 28, 2024
1afe9d6
Refactor totalHits()
f-galland Nov 28, 2024
10cfda9
Remove unneeded getters
f-galland Nov 28, 2024
6a9180d
Improve buildPit() exception handling
f-galland Nov 28, 2024
dd3a5f9
Load settings from environment
f-galland Nov 28, 2024
d422f79
Fix JavaDocs
AlexRuiz7 Nov 28, 2024
cf97af4
Improve searchAfter logic
f-galland Nov 29, 2024
6f53458
Merging changes
f-galland Nov 29, 2024
f15e517
Fixing errors after merge
f-galland Nov 29, 2024
c72cc4c
Fix build.gradle errors
AlexRuiz7 Nov 29, 2024
6dfddf8
Fix settings related errors
f-galland Nov 29, 2024
2e128fd
Fix http client permission issues
f-galland Dec 3, 2024
7b22e58
Send orders array with proper authentication
f-galland Dec 3, 2024
a535aff
Only set status to SENT when a non-error response has been received
f-galland Dec 3, 2024
e41b5ae
Merge branch 'master' into 87-implement-job-schduler-logic
AlexRuiz7 Dec 3, 2024
354f855
Add missing import
mcasas993 Dec 3, 2024
d6a6e2f
Use https for the Management API mock
AlexRuiz7 Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions plugins/command-manager/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@

import org.opensearch.gradle.test.RestIntegTestTask
import java.util.concurrent.Callable

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.18.0-SNAPSHOT")
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
opensearch_build = opensearch_no_snapshot + ".0"
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
wazuh_version = System.getProperty("version", "5.0.0")
revision = System.getProperty("revision", "0")
}
Expand All @@ -24,7 +28,7 @@ apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.rest-test'
apply plugin: 'opensearch.pluginzip'

def pluginName = 'wazuh-indexer-command-manager'
Expand Down Expand Up @@ -67,6 +71,7 @@ opensearchplugin {
name pluginName
description pluginDescription
classname "${projectPath}.${pathToPlugin}.${pluginClassName}"
extendedPlugins = ['opensearch-job-scheduler']
licenseFile rootProject.file('LICENSE.txt')
noticeFile rootProject.file('NOTICE.txt')
}
Expand All @@ -88,6 +93,10 @@ def versions = [
imposter: "4.1.2"
]

configurations {
zipArchive
}

dependencies {
implementation "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
implementation "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
Expand All @@ -99,6 +108,11 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

// Job Scheduler stuff
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}"
// implementation "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"

testImplementation "junit:junit:${versions.junit}"

// imposter
Expand Down Expand Up @@ -134,30 +148,37 @@ test {
jvmArgs "-Djava.security.policy=./plugins/command-manager/src/main/plugin-metadata/plugin-security.policy/plugin-security.policy"
}

task integTest(type: RestIntegTestTask) {
description = "Run tests against a cluster"
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
def getJobSchedulerPlugin() {
provider(new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-job-scheduler*'
}.singleFile
}
}
}
})
}
tasks.named("check").configure { dependsOn(integTest) }

integTest {
testClusters.integTest {
plugin(getJobSchedulerPlugin())
testDistribution = "ARCHIVE"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)

// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}
}

testClusters.integTest {
testDistribution = "INTEG_TEST"
//testDistribution = "ARCHIVE"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)

// add customized keystore
keystore 'm_api.auth.username', 'admin'
keystore 'm_api.auth.password', 'test'
keystore 'm_api.uri', 'http://127.0.0.1:55000' // base URI of the M_API
keystore 'm_api.uri', 'https://127.0.0.1:55000' // base URI of the M_API
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,83 @@
*/
package com.wazuh.commandmanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.*;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ReloadablePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.*;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import com.wazuh.commandmanager.index.CommandIndex;
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobParameter;
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner;
import com.wazuh.commandmanager.jobscheduler.JobDocument;
import com.wazuh.commandmanager.rest.RestPostCommandAction;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.httpclient.HttpRestClient;

/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands
* from the Wazuh Server. These commands are processed, indexed and sent back to the Server for its
* delivery to, in most cases, the Agents.
* delivery to, in most cases, the Agents. The Command Manager plugin exposes an HTTP API with a
* single endpoint to receive raw commands from the Wazuh Server. These commands are processed,
* indexed and sent back to the Server for its delivery to, in most cases, the Agents.
*
* <p>The Command Manager plugin is also a JobScheduler extension plugin.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin, ReloadablePlugin {
public class CommandManagerPlugin extends Plugin
implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager";
public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands";
public static final String COMMAND_MANAGER_INDEX_NAME = ".commands";
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";
public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command";
public static final String JOB_INDEX_NAME = ".scheduled-commands";
public static final Integer JOB_PERIOD_MINUTES = 1;
public static final Integer PAGE_SIZE = 2;
public static final Long DEFAULT_TIMEOUT_SECONDS = 20L;
public static final TimeValue PIT_KEEPALIVE_SECONDS = TimeValue.timeValueSeconds(30L);
AlexRuiz7 marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class);
public static final String JOB_TYPE = "command_manager_scheduler_extension";

private CommandIndex commandIndex;
private JobDocument jobDocument;

@Override
public Collection<Object> createComponents(
Expand All @@ -68,9 +102,50 @@ public Collection<Object> createComponents(
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
PluginSettings.getInstance(environment.settings());

// JobSchedulerExtension stuff
CommandManagerJobRunner.getInstance()
.setThreadPool(threadPool)
.setClient(client)
.setClusterService(clusterService)
.setEnvironment(environment);

scheduleCommandJob(client, clusterService, threadPool);

return Collections.emptyList();
}

/**
* Indexes a document into the jobs index, so that JobScheduler plugin can run it
*
* @param client: The cluster client, used for indexing
* @param clusterService: Provides the addListener method. We use it to determine if this is a
* new cluster.
* @param threadPool: Used by jobDocument to create the document in a thread.
*/
private void scheduleCommandJob(
f-galland marked this conversation as resolved.
Show resolved Hide resolved
Client client, ClusterService clusterService, ThreadPool threadPool) {
clusterService.addListener(
event -> {
if (event.localNodeClusterManager() && event.isNewCluster()) {
jobDocument = JobDocument.getInstance();
CompletableFuture<IndexResponse> indexResponseCompletableFuture =
jobDocument.create(
client,
threadPool,
UUIDs.base64UUID(),
getJobType(),
JOB_PERIOD_MINUTES);
indexResponseCompletableFuture.thenAccept(
indexResponse -> {
log.info(
"Scheduled task successfully, response: {}",
indexResponse.getResult().toString());
});
}
});
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
Expand Down Expand Up @@ -100,6 +175,75 @@ public void reload(Settings settings) {
// xxxService.refreshAndClearCache(commandManagerSettings);
}

@Override
public String getJobType() {
return CommandManagerPlugin.JOB_TYPE;
}

@Override
public String getJobIndex() {
return CommandManagerPlugin.JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
log.info("getJobRunner() executed");
return CommandManagerJobRunner.getInstance();
}

@Override
public ScheduledJobParser getJobParser() {
log.info("getJobParser() executed");
return (parser, id, jobDocVersion) -> {
CommandManagerJobParameter jobParameter = new CommandManagerJobParameter();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case CommandManagerJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case CommandManagerJobParameter.ENABLED_FIELD:
jobParameter.setEnabled(parser.booleanValue());
break;
case CommandManagerJobParameter.ENABLED_TIME_FIELD:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case CommandManagerJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
default:
XContentParserUtils.throwUnknownToken(
parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

/**
* Returns the proper Instant object with milliseconds from the Unix epoch when the current
* token actually holds a value.
*
* @param parser: The parser as provided by JobScheduler
*/
private Instant parseInstantValue(XContentParser parser) throws IOException {
f-galland marked this conversation as resolved.
Show resolved Hide resolved
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

/**
* Close the resources opened by this plugin.
*
Expand Down
Loading