Skip to content

Commit

Permalink
Switch to synchronous code
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Nov 1, 2024
1 parent fe4e00f commit df2b5f3
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CommandManagerJobRunner implements ScheduledJobRunner {
private ClusterService clusterService;

private Client client;
private final SearchJob searchJob = SearchJob.getSearchJobInstance();
private final SearchJob searchJob = SearchJob.getInstance();

private CommandManagerJobRunner() {
// Singleton class, use getJobRunner method instead of constructor
Expand Down Expand Up @@ -55,33 +55,33 @@ private boolean indexExists(String indexName) {
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
if (!indexExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)) {
log.info(
"{} index not yet created, not running command manager jobs",
CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME);
"{} index not yet created, not running command manager jobs",
CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME);
return;
}
Runnable runnable =
() -> {
this.searchJob.setClient(client);
this.searchJob.setThreadPool(threadPool);
// this.searchJob.scrollSearchJob(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE);
this.searchJob
.search(
CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME,
CommandManagerPlugin.COMMAND_BATCH_SIZE)
.thenAccept(
searchResponse -> {
try {
this.searchJob.handleSearchResponse(searchResponse);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.exceptionally(
e -> {
log.error(e.getMessage());
return null;
});
};
() -> {
this.searchJob.setClient(client);
this.searchJob.setThreadPool(threadPool);
// this.searchJob.scrollSearchJob(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME, CommandManagerPlugin.COMMAND_BATCH_SIZE);
this.searchJob
.simpleSearch(
CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME,
CommandManagerPlugin.COMMAND_BATCH_SIZE)
.thenAccept(
searchResponse -> {
try {
this.searchJob.handleFirstPage(searchResponse);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.exceptionally(
e -> {
log.error(e.getMessage());
return null;
});
};
threadPool.generic().submit(runnable);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.wazuh.commandmanager.jobscheduler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.search.builder.PointInTimeBuilder;

import javax.swing.*;
import java.sql.Time;

public class PointInTime {
private static final Logger log = LogManager.getLogger(PointInTime.class);
private static PointInTime INSTANCE;
private String id;
private CreatePitRequest createPitRequest;
private PointInTimeBuilder pointInTimeBuilder;
private CreatePitResponse createPitResponse;
private TimeValue keepAlive = TimeValue.timeValueSeconds(60L);

public PointInTimeBuilder createPit(Client client, String index) {
Boolean allowPartialPitCreation = false;
setCreatePitRequest(
new CreatePitRequest(getKeepAlive(), allowPartialPitCreation, index)
);
client.createPit(
getCreatePitRequest(),
new ActionListener<>() {
@Override
public void onResponse(CreatePitResponse createPitResponse) {
setCreatePitResponse(createPitResponse);
setId(createPitResponse.getId());
setPointInTimeBuilder(
new PointInTimeBuilder(createPitResponse.getId())
);
getPointInTimeBuilder().setKeepAlive(getKeepAlive());
}

@Override
public void onFailure(Exception e) {
log.error(e);
}
});
return getPointInTimeBuilder();
}


public CreatePitResponse getCreatePitResponse() {
return createPitResponse;
}

public void setCreatePitResponse(CreatePitResponse createPitResponse) {
this.createPitResponse = createPitResponse;
}

public PointInTimeBuilder getPointInTimeBuilder() {
return pointInTimeBuilder;
}

public void setPointInTimeBuilder(PointInTimeBuilder pointInTimeBuilder) {
this.pointInTimeBuilder = pointInTimeBuilder;
}

public PointInTime() {
}

public CreatePitRequest getCreatePitRequest() {
return createPitRequest;
}

public void setCreatePitRequest(CreatePitRequest createPitRequest) {
this.createPitRequest = createPitRequest;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public TimeValue getKeepAlive() {
return keepAlive;
}

public void setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}

public static PointInTime getInstance() {
log.info("Getting Job Runner Instance");
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (SearchJob.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new PointInTime();
return INSTANCE;
}
}
}
Loading

0 comments on commit df2b5f3

Please sign in to comment.