Skip to content

Commit

Permalink
Merge pull request cdapio#15317 from cdapio/operation-store
Browse files Browse the repository at this point in the history
CDAP-20803: API, DB and Proto Schema for Long Running Operation
  • Loading branch information
samdgupi authored Sep 22, 2023
2 parents ec02b03 + 1941854 commit 30c8856
Show file tree
Hide file tree
Showing 9 changed files with 851 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.gateway.handlers;

import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.gateway.handlers.util.AbstractAppFabricHttpHandler;
import io.cdap.cdap.proto.operationrun.OperationRun;
import io.cdap.http.HttpHandler;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;

/**
* The {@link HttpHandler} for handling REST calls to namespace endpoints.
*/
@Path(Constants.Gateway.API_VERSION_3 + "/namespaces/{namespace-id}/operations")
public class OperationRunHttpHandler extends AbstractAppFabricHttpHandler {

private static final Gson GSON = new Gson();

@Inject
OperationRunHttpHandler() {
}

/**
* API to fetch all running operations in a namespace.
*
* @param namespaceId Namespace to fetch runs from
* @param pageToken the token identifier for the current page requested in a paginated
* request
* @param pageSize the number of application details returned in a paginated request
* @param filter optional filters in EBNF grammar. Currently Only one status and one type
* filter is supported with AND expression.
*/
@GET
@Path("/")
public void scanOperations(HttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@QueryParam("pageToken") String pageToken,
@QueryParam("pageSize") Integer pageSize,
@QueryParam("filter") String filter) {
// TODO(samik, CDAP-20812) fetch the operation runs from store
List<OperationRun> runs = new ArrayList<>();
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(runs));
}

/**
* API to fetch operation run by id.
*
* @param namespaceId Namespace to fetch runs from
* @param runId id of the operation run
*/
@GET
@Path("/{id}")
public void getOperationRun(HttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@PathParam("id") String runId) {
// // TODO(samik, CDAP-20813) fetch the operation runs from store
OperationRun run = null;
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(run));
}

/**
* API to stop operation run by id.
*
* @param namespaceId Namespace to fetch runs from
* @param runId id of the operation run
*/
@POST
@Path("/{id}/stop")
public void failOperation(FullHttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@PathParam("id") String runId) {
// // TODO(samik, CDAP-20814) send the message to stop the operation
responder.sendString(HttpResponseStatus.OK,
String.format("Updated status for operation run %s in namespace '%s'.", runId,
namespaceId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.store;

import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
import io.cdap.cdap.proto.operationrun.OperationRun;
import java.util.Arrays;
import javax.annotation.Nullable;

/**
* Store the meta information about operation runs in CDAP. This class contains all information the
* system needs about a run, which includes information that should not be exposed to users.
* {@link io.cdap.cdap.proto.operationrun.OperationRun} contains fields that are exposed to users,
* so everything else like the user request and principal goes here
*
* @param <T> The type of the operation request
*/
public class OperationRunDetail<T> {

@SerializedName("run")
private final OperationRun run;

// sourceid refers to the tms message id which has updated this run details
@SerializedName("sourceid")
@Nullable
private final byte[] sourceId;

@SerializedName("principal")
@Nullable
private final String principal;

@SerializedName("request")
private final T request;

protected OperationRunDetail(OperationRun run, byte[] sourceId, @Nullable String principal,
T request) {
this.run = run;
this.sourceId = sourceId;
this.principal = principal;
this.request = request;
}

@Nullable
public byte[] getSourceId() {
return sourceId;
}

@Nullable
public String getPrincipal() {
return principal;
}

public T getRequest() {
return request;
}

public OperationRun getRun() {
return run;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

OperationRunDetail<T> that = (OperationRunDetail<T>) o;
return Objects.equal(this.getRun(), that.getRun())
&& Arrays.equals(this.getSourceId(), that.getSourceId())
&& Objects.equal(this.getRequest(), that.getRequest())
&& Objects.equal(this.getPrincipal(), that.getPrincipal());
}

@Override
public int hashCode() {
return Objects.hashCode(super.hashCode(), Arrays.hashCode(getSourceId()), getRequest(),
getPrincipal());
}

/**
* Builder to create a OperationRunDetail.
*/
public static <T> Builder<T> builder() {
return new Builder<>();
}

/**
* Builder to create OperationRunDetail.
*
* @param detail existing detail to copy fields from.
*/
public static <T> Builder<T> builder(OperationRunDetail<T> detail) {
return new Builder<>(detail);
}

/**
* Builds RunRecordMetas.
*/
public static class Builder<T> {

protected OperationRun run;
protected byte[] sourceId;
protected String principal;
protected T request;

protected Builder() {
}

protected Builder(OperationRunDetail<T> detail) {
sourceId = detail.getSourceId();
principal = detail.getPrincipal();
request = detail.getRequest();
run = detail.getRun();
}

public Builder<T> setSourceId(byte[] sourceId) {
this.sourceId = sourceId;
return this;
}

public Builder<T> setPrincipal(String principal) {
this.principal = principal;
return this;
}

public Builder<T> setRequest(T request) {
this.request = request;
return this;
}

public Builder<T> setRun(OperationRun run) {
this.run = run;
return this;
}

/**
* Validates input and returns a OperationRunDetail.
*/
public OperationRunDetail<T> build() {
if (request == null) {
throw new IllegalArgumentException("Operation run request must be specified.");
}
if (sourceId == null) {
throw new IllegalArgumentException("Operation run source id must be specified.");
}
if (run == null) {
throw new IllegalArgumentException("Operation run must be specified.");
}

return new OperationRunDetail<>(run, sourceId, principal, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public static void createAllTables(StructuredTableAdmin tableAdmin) throws IOExc
TetheringStore.create(tableAdmin);
AppStateStore.create(tableAdmin);
CredentialProviderStore.create(tableAdmin);
OperationRunsStore.create(tableAdmin);
}

/**
Expand Down Expand Up @@ -1323,4 +1324,46 @@ public static void create(StructuredTableAdmin tableAdmin) throws IOException {
createIfNotExists(tableAdmin, IDENTITY_TABLE_SPEC);
}
}

/**
* Schemas for operation runs.
*/
public static final class OperationRunsStore {
public static final StructuredTableId OPERATION_RUNS =
new StructuredTableId("operation_runs");

public static final String ID_FIELD = "id";
public static final String NAMESPACE_FIELD = "namespace";
public static final String TYPE_FIELD = "type";
public static final String STATUS_FIELD = "status";
public static final String START_TIME_FIELD = "start_time";
public static final String UPDATE_TIME_FIELD = "update_time";
// contains serialized OperationRunDetail
public static final String DETAILS_FIELD = "details";
public static final StructuredTableSpecification OPERATION_RUNS_TABLE_SPEC =
new StructuredTableSpecification.Builder()
.withId(OPERATION_RUNS)
.withFields(
Fields.stringType(ID_FIELD),
Fields.stringType(NAMESPACE_FIELD),
Fields.stringType(TYPE_FIELD),
Fields.stringType(STATUS_FIELD),
Fields.longType(START_TIME_FIELD),
Fields.longType(UPDATE_TIME_FIELD),
Fields.stringType(DETAILS_FIELD)
)
.withPrimaryKeys(NAMESPACE_FIELD, ID_FIELD)
.withIndexes(TYPE_FIELD, STATUS_FIELD, START_TIME_FIELD)
.build();

/**
* Creates operation store tables.
*
* @param tableAdmin The table admin to use.
* @throws IOException If table creation fails.
*/
public static void create(StructuredTableAdmin tableAdmin) throws IOException {
createIfNotExists(tableAdmin, OPERATION_RUNS_TABLE_SPEC);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.proto.operationrun;

import java.util.Collection;
import java.util.Objects;

/**
* Error representation for Operation Run.
*/
public class OperationError {

private final String message;
private final Collection<OperationResourceScopedError> details;

public OperationError(String message, Collection<OperationResourceScopedError> details) {
this.message = message;
this.details = details;
}

public String getMessage() {
return message;
}

public Collection<OperationResourceScopedError> getDetails() {
return details;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

OperationError that = (OperationError) o;

return this.details.equals(that.details);
}

@Override
public int hashCode() {
return Objects.hash(details);
}
}
Loading

0 comments on commit 30c8856

Please sign in to comment.