Skip to content

Commit

Permalink
Adds reprovision API to support updating search pipelines, ingest pip…
Browse files Browse the repository at this point in the history
…elines index settings (#804)

* Initial commit, Adds ReprovisionWorkflowTransportAction, reprovision param for RestCreateWorkflowAction, creates and registers Update Ingest/Search pipeline steps in WorkflowResources, registers update steps in WorkflowStepFactory

Signed-off-by: Joshua Palis <[email protected]>

* Initial reprovisiontransportaction implementation, Added UpdateIndexStep, improved WorkflowProcessSorter.createReprovisionSequence

Signed-off-by: Joshua Palis <[email protected]>

* Implements Update index Step to support updating index settings, modifies updating resource created script to remove error if any

Signed-off-by: Joshua Palis <[email protected]>

* Improves workflow node comparision

Signed-off-by: Joshua Palis <[email protected]>

* Adding comments

Signed-off-by: Joshua Palis <[email protected]>

* Fixing tests, adding javadocs

Signed-off-by: Joshua Palis <[email protected]>

* Adding changelog

Signed-off-by: Joshua Palis <[email protected]>

* Updating parse utils, RestCreateWorkflowAction, CreateWorkflowTransportAction tests. Adding check for reprovision without workflowID.

Signed-off-by: Joshua Palis <[email protected]>

* Adding update step and get resource step tests

Signed-off-by: Joshua Palis <[email protected]>

* Adding check for filtered setting list size

Signed-off-by: Joshua Palis <[email protected]>

* Addign reprovision workflow transport action tests

Signed-off-by: Joshua Palis <[email protected]>

* Adding tests for reprovision sequence creation

Signed-off-by: Joshua Palis <[email protected]>

* Addressing comments

Signed-off-by: Joshua Palis <[email protected]>

* Changing GetResourceStep to WorkflowDataStep

Signed-off-by: Joshua Palis <[email protected]>

* Addressing PR comments

Signed-off-by: Joshua Palis <[email protected]>

* Fixing state check for reprovision transport action

Signed-off-by: Joshua Palis <[email protected]>

* Adding state eror check to reprovision transport action to remove error field

Signed-off-by: Joshua Palis <[email protected]>

* removing error check from flowframeworkindices handler

Signed-off-by: Joshua Palis <[email protected]>

* Adding check for no updated settings

Signed-off-by: Joshua Palis <[email protected]>

* refactor reprovision sequence creation

Signed-off-by: Joshua Palis <[email protected]>

* Fixing workflowrequest serialization

Signed-off-by: Joshua Palis <[email protected]>

* Addressing PR comments

Signed-off-by: Joshua Palis <[email protected]>

* Moving flattenSettings method to ParseUtils, added flatten settings tests

Signed-off-by: Joshua Palis <[email protected]>

* updating workflowrequest

Signed-off-by: Joshua Palis <[email protected]>

* fixing workflowrequest

Signed-off-by: Joshua Palis <[email protected]>

* spotlessApply

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored Aug 5, 2024
1 parent 7ec848a commit 48c7019
Show file tree
Hide file tree
Showing 34 changed files with 2,936 additions and 118 deletions.
10 changes: 1 addition & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,11 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))
- Add allow_delete parameter to Deprovision API ([#763](https://github.com/opensearch-project/flow-framework/pull/763))
- Adds reprovision API to support updating search pipelines, ingest pipelines index settings ([#804](https://github.com/opensearch-project/flow-framework/pull/804))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

### Bug Fixes
- Handle Not Found exceptions as successful deletions for agents and models ([#805](https://github.com/opensearch-project/flow-framework/pull/805))
- Wrap CreateIndexRequest mappings in _doc key as required ([#809](https://github.com/opensearch-project/flow-framework/pull/809))
- Have FlowFrameworkException status recognized by ExceptionsHelper ([#811](https://github.com/opensearch-project/flow-framework/pull/811))

### Infrastructure
### Documentation
### Maintenance
### Refactoring
- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778))
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ dependencies {
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.3"
implementation "com.google.code.gson:gson:2.10.1"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
Expand Down Expand Up @@ -170,7 +172,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStepAction.INSTANCE, GetWorkflowStepTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class),
new ActionHandler<>(ReprovisionWorkflowAction.INSTANCE, ReprovisionWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ private CommonValue() {}
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";

/*
* Constants associated with plugin configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.opensearch.flowframework.workflow.RegisterRemoteModelStep;
import org.opensearch.flowframework.workflow.ReindexStep;
import org.opensearch.flowframework.workflow.UndeployModelStep;
import org.opensearch.flowframework.workflow.UpdateIndexStep;
import org.opensearch.flowframework.workflow.UpdateIngestPipelineStep;
import org.opensearch.flowframework.workflow.UpdateSearchPipelineStep;

import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -43,29 +46,39 @@
public enum WorkflowResources {

/** Workflow steps for creating/deleting a connector and associated created resource */
CREATE_CONNECTOR(CreateConnectorStep.NAME, WorkflowResources.CONNECTOR_ID, DeleteConnectorStep.NAME),
CREATE_CONNECTOR(CreateConnectorStep.NAME, null, DeleteConnectorStep.NAME, WorkflowResources.CONNECTOR_ID),
/** Workflow steps for registering/deleting a remote model and associated created resource */
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local model and associated created resource */
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local sparse encoding model and associated created resource */
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local OpenSearch provided pretrained model and associated created resource */
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a model group and associated created resource */
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, WorkflowResources.MODEL_GROUP_ID, NoOpStep.NAME),
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, null, NoOpStep.NAME, WorkflowResources.MODEL_GROUP_ID),
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
DEPLOY_MODEL(DeployModelStep.NAME, null, UndeployModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteIngestPipelineStep.NAME),
CREATE_INGEST_PIPELINE(
CreateIngestPipelineStep.NAME,
UpdateIngestPipelineStep.NAME,
DeleteIngestPipelineStep.NAME,
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteSearchPipelineStep.NAME),
CREATE_SEARCH_PIPELINE(
CreateSearchPipelineStep.NAME,
UpdateSearchPipelineStep.NAME,
DeleteSearchPipelineStep.NAME,
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, UpdateIndexStep.NAME, DeleteIndexStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
REINDEX(ReindexStep.NAME, null, NoOpStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);
REGISTER_AGENT(RegisterAgentStep.NAME, null, DeleteAgentStep.NAME, WorkflowResources.AGENT_ID);

/** Connector Id for a remote model connector */
public static final String CONNECTOR_ID = "connector_id";
Expand All @@ -80,34 +93,37 @@ public enum WorkflowResources {
/** Agent Id */
public static final String AGENT_ID = "agent_id";

private final String workflowStep;
private final String resourceCreated;
private final String createStep;
private final String updateStep;
private final String deprovisionStep;
private final String resourceCreated;

private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
WorkflowResources(String createStep, String updateStep, String deprovisionStep, String resourceCreated) {
this.createStep = createStep;
this.updateStep = updateStep;
this.deprovisionStep = deprovisionStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
* Returns the create step for the given enum Constant
* @return the create step of this data.
*/
public String getWorkflowStep() {
return workflowStep;
public String getCreateStep() {
return createStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
* Returns the updateStep for the given enum Constant
* @return the updateStep of this data.
*/
public String getResourceCreated() {
return resourceCreated;
public String getUpdateStep() {
return updateStep;
}

/**
Expand All @@ -118,6 +134,14 @@ public String getDeprovisionStep() {
return deprovisionStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
Expand All @@ -127,7 +151,9 @@ public String getDeprovisionStep() {
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
if (workflowStep.equals(mapping.getCreateStep())
|| workflowStep.equals(mapping.getDeprovisionStep())
|| workflowStep.equals(mapping.getUpdateStep())) {
return mapping.getResourceCreated();
}
}
Expand All @@ -145,7 +171,7 @@ public static String getResourceByWorkflowStep(String workflowStep) throws FlowF
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
Expand All @@ -154,6 +180,24 @@ public static String getDeprovisionStepByWorkflowStep(String workflowStep) throw
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the update step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to update
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getUpdateStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getUpdateStep();
}
}
}
logger.error("Unable to find update step for step: {}", workflowStep);
throw new FlowFrameworkException("Unable to find update step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ public void updateResourceInStateIndex(
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
"ctx._source.resources_created.add(params.newResource);",
Collections.singletonMap("newResource", newResource.resourceMap())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.REPROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
Expand Down Expand Up @@ -74,7 +75,7 @@ public List<Route> routes() {
return List.of(
// Create new workflow
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s", WORKFLOW_URI)),
// Update use case template
// Update use case template/ reprovision existing workflow
new Route(RestRequest.Method.PUT, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID))
);
}
Expand All @@ -84,8 +85,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
? request.params()
Expand All @@ -108,28 +111,32 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
}
if (!provision && !params.isEmpty()) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
return processError(ffe, params, request);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
return processError(ffe, params, request);
}
if (reprovision && workflowId == null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && useCase != null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
try {
Template template;
Expand Down Expand Up @@ -213,7 +220,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
useCaseDefaultsMap,
reprovision
);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
Expand Down Expand Up @@ -249,4 +257,13 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
}
}

private RestChannelConsumer processError(FlowFrameworkException ffe, Map<String, String> params, RestRequest request) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
Loading

0 comments on commit 48c7019

Please sign in to comment.