From 148df59034aafac72091759d9324085d1242ed48 Mon Sep 17 00:00:00 2001 From: Miles Garnsey <11435896+Miles-Garnsey@users.noreply.github.com> Date: Thu, 31 Aug 2023 11:44:35 +1000 Subject: [PATCH] Reaper endpoints: Async Repair Endpoint (#358) * Allow `repair()` RPC method to take more arguments (as required by Reaper). Get the old NodeOpsResources class calling the new RPC method signature. --------- Co-authored-by: Erik Merkle * Cancel repairs endpoint (#368) * Add cancelAllRepairs endpoint and requisite NodeOps. --- .../com/datastax/mgmtapi/NodeOpsProvider.java | 183 ++++++++++-------- management-api-server/doc/openapi.json | 119 +++++++++++- .../mgmtapi/resources/NodeOpsResources.java | 10 +- .../resources/common/BaseResources.java | 1 + .../resources/v1/NodeOpsResources.java | 8 +- .../resources/v2/RepairResourcesV2.java | 140 ++++++++++++++ .../v2/models/RepairParallelism.java | 39 ++++ .../resources/v2/models/RepairRequest.java | 93 +++++++++ .../v2/models/RepairRequestResponse.java | 48 +++++ .../resources/v2/models/RingRange.java | 57 ++++++ .../mgmtapi/BaseDockerIntegrationTest.java | 2 +- .../mgmtapi/K8OperatorResourcesTest.java | 16 +- .../resources/v2/RepairResourcesV2Test.java | 115 +++++++++++ 13 files changed, 741 insertions(+), 90 deletions(-) create mode 100644 management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2.java create mode 100644 management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java create mode 100644 management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java create mode 100644 management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java create mode 100644 management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java create mode 100644 management-api-server/src/test/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2Test.java diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java index d4cd749a..8d6fed3c 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java @@ -746,96 +746,123 @@ public String repair( @RpcParam(name = "keyspaceName") String keyspace, @RpcParam(name = "tables") List tables, @RpcParam(name = "full") Boolean full, - @RpcParam(name = "notifications") boolean notifications) + @RpcParam(name = "notifications") boolean notifications, + @RpcParam(name = "repairParallelism") String repairParallelism, + @RpcParam(name = "datacenters") List datacenters, + @RpcParam(name = "associatedTokens") String ringRangeString, + @RpcParam(name = "repairThreadCount") Integer repairThreadCount) throws IOException { // At least one keyspace is required - if (keyspace != null) { - // create the repair spec - Map repairSpec = new HashMap<>(); - - // add any specified tables to the repair spec - if (tables != null && !tables.isEmpty()) { - // set the tables/column families - repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables)); + assert (keyspace != null); + Map repairSpec = new HashMap<>(); + // add tables/column families + if (tables != null && !tables.isEmpty()) { + repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables)); + } + // set incremental reapir + repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full)); + // Parallelism should be set if it's requested OR if incremental repair is requested. + if (!full) { + // Incremental repair requested, make sure parallelism is correct + if (repairParallelism != null + && !RepairParallelism.PARALLEL.getName().equals(repairParallelism)) { + throw new IOException( + "Invalid repair combination. Incremental repair if Parallelism is not set"); } - - // handle incremental vs full - boolean isIncremental = Boolean.FALSE.equals(full); - repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental)); - if (isIncremental) { - // incremental repairs will fail if parallelism is not set - repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName()); + // Incremental repair and parallelism should be set + repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName()); + } + if (repairThreadCount != null) { + // if specified, the value should be at least 1 + if (repairThreadCount.compareTo(Integer.valueOf(0)) <= 0) { + throw new IOException( + "Invalid repari thread count: " + + repairThreadCount + + ". Value should be greater than 0"); } + repairSpec.put(RepairOption.JOB_THREADS_KEY, repairThreadCount.toString()); + } + repairSpec.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE)); - // Since Cassandra provides us with a async, we don't need to use our executor interface for - // this. - final int repairJobId = - ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec); + if (ringRangeString != null && !ringRangeString.isEmpty()) { + repairSpec.put(RepairOption.RANGES_KEY, ringRangeString); + } + // add datacenters to the repair spec + if (datacenters != null && !datacenters.isEmpty()) { + repairSpec.put(RepairOption.DATACENTERS_KEY, String.join(",", datacenters)); + } - if (!notifications) { - return Integer.valueOf(repairJobId).toString(); - } + // Since Cassandra provides us with a async, we don't need to use our executor interface for + // this. + final int repairJobId = + ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec); - String jobId = String.format("repair-%d", repairJobId); - final Job job = service.createJob("repair", jobId); + if (!notifications) { + return Integer.valueOf(repairJobId).toString(); + } - if (repairJobId == 0) { - // Job is done and won't continue - job.setStatusChange(ProgressEventType.COMPLETE, ""); - job.setStatus(Job.JobStatus.COMPLETED); - job.setFinishedTime(System.currentTimeMillis()); - service.updateJob(job); - return job.getJobId(); - } - - ShimLoader.instance - .get() - .getStorageService() - .addNotificationListener( - (notification, handback) -> { - if (notification.getType().equals("progress")) { - Map data = (Map) notification.getUserData(); - ProgressEventType progress = ProgressEventType.values()[data.get("type")]; - - switch (progress) { - case START: - job.setStatusChange(progress, notification.getMessage()); - job.setStartTime(System.currentTimeMillis()); - break; - case NOTIFICATION: - case PROGRESS: - break; - case ERROR: - case ABORT: - job.setError(new RuntimeException(notification.getMessage())); - job.setStatus(Job.JobStatus.ERROR); - job.setFinishedTime(System.currentTimeMillis()); - break; - case SUCCESS: - job.setStatusChange(progress, notification.getMessage()); - // SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that) - break; - case COMPLETE: - job.setStatusChange(progress, notification.getMessage()); - job.setStatus(Job.JobStatus.COMPLETED); - job.setFinishedTime(System.currentTimeMillis()); - break; - } - service.updateJob(job); - } - }, - (NotificationFilter) - notification -> { - final int repairNo = - Integer.parseInt(((String) notification.getSource()).split(":")[1]); - return repairNo == repairJobId; - }, - null); + String jobId = String.format("repair-%d", repairJobId); + final Job job = service.createJob("repair", jobId); + if (repairJobId == 0) { + // Job is done and won't continue + job.setStatusChange(ProgressEventType.COMPLETE, ""); + job.setStatus(Job.JobStatus.COMPLETED); + job.setFinishedTime(System.currentTimeMillis()); + service.updateJob(job); return job.getJobId(); } - throw new RuntimeException("At least one keyspace must be defined"); + ShimLoader.instance + .get() + .getStorageService() + .addNotificationListener( + (notification, handback) -> { + if (notification.getType().equals("progress")) { + Map data = (Map) notification.getUserData(); + ProgressEventType progress = ProgressEventType.values()[data.get("type")]; + + switch (progress) { + case START: + job.setStatusChange(progress, notification.getMessage()); + job.setStartTime(System.currentTimeMillis()); + break; + case NOTIFICATION: + case PROGRESS: + break; + case ERROR: + case ABORT: + job.setError(new RuntimeException(notification.getMessage())); + job.setStatus(Job.JobStatus.ERROR); + job.setFinishedTime(System.currentTimeMillis()); + break; + case SUCCESS: + job.setStatusChange(progress, notification.getMessage()); + // SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that) + break; + case COMPLETE: + job.setStatusChange(progress, notification.getMessage()); + job.setStatus(Job.JobStatus.COMPLETED); + job.setFinishedTime(System.currentTimeMillis()); + break; + } + service.updateJob(job); + } + }, + (NotificationFilter) + notification -> { + final int repairNo = + Integer.parseInt(((String) notification.getSource()).split(":")[1]); + return repairNo == repairJobId; + }, + null); + + return job.getJobId(); + } + + @Rpc(name = "stopAllRepairs") + public void stopAllRepairs() { + ShimLoader.instance.get().getStorageService().forceTerminateAllRepairSessions(); } @Rpc(name = "move") diff --git a/management-api-server/doc/openapi.json b/management-api-server/doc/openapi.json index cc1eb308..f5e8043f 100644 --- a/management-api-server/doc/openapi.json +++ b/management-api-server/doc/openapi.json @@ -1740,6 +1740,75 @@ }, "summary" : "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version). This operation is asynchronous and returns immediately." } + }, + "/api/v2/repairs" : { + "delete" : { + "operationId" : "deleteRepairsV2", + "responses" : { + "202" : { + "content" : { + "application/json" : { + "example" : "Accepted", + "schema" : { + "$ref" : "#/components/schemas/RepairRequestResponse" + } + } + }, + "description" : "Cancel repairs Successfully requested" + } + }, + "summary" : "Cancel all repairs" + }, + "put" : { + "operationId" : "putRepairV2", + "requestBody" : { + "content" : { + "application/json" : { + "schema" : { + "$ref" : "#/components/schemas/RepairRequest" + } + } + } + }, + "responses" : { + "202" : { + "content" : { + "application/json" : { + "example" : "Accepted", + "schema" : { + "$ref" : "#/components/schemas/RepairRequestResponse" + } + } + }, + "description" : "Repair Successfully requested" + }, + "400" : { + "content" : { + "text/plain" : { + "example" : "keyspace must be specified", + "schema" : { + "type" : "string", + "enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ] + } + } + }, + "description" : "Repair request missing Keyspace name" + }, + "500" : { + "content" : { + "text/plain" : { + "example" : "internal error, we did not receive the expected repair ID from Cassandra.", + "schema" : { + "type" : "string", + "enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ] + } + } + }, + "description" : "internal error, we did not receive the expected repair ID from Cassandra." + } + }, + "summary" : "Initiate a new repair" + } } }, "components" : { @@ -2014,12 +2083,35 @@ "RepairRequest" : { "type" : "object", "properties" : { - "full" : { + "associated_tokens" : { + "type" : "array", + "items" : { + "$ref" : "#/components/schemas/RingRange" + } + }, + "datacenters" : { + "type" : "array", + "items" : { + "type" : "string" + } + }, + "full_repair" : { "type" : "boolean" }, - "keyspace_name" : { + "keyspace" : { "type" : "string" }, + "notifications" : { + "type" : "boolean" + }, + "repair_parallelism" : { + "type" : "string", + "enum" : [ "sequential", "parallel", "dc_parallel" ] + }, + "repair_thread_count" : { + "type" : "integer", + "format" : "int32" + }, "tables" : { "type" : "array", "items" : { @@ -2027,7 +2119,16 @@ } } }, - "required" : [ "keyspace_name" ] + "required" : [ "keyspace" ] + }, + "RepairRequestResponse" : { + "type" : "object", + "properties" : { + "repair_id" : { + "type" : "string" + } + }, + "required" : [ "repair_id" ] }, "ReplicationSetting" : { "type" : "object", @@ -2042,6 +2143,18 @@ }, "required" : [ "dc_name", "replication_factor" ] }, + "RingRange" : { + "type" : "object", + "properties" : { + "end" : { + "type" : "integer" + }, + "start" : { + "type" : "integer" + } + }, + "required" : [ "end", "start" ] + }, "ScrubRequest" : { "type" : "object", "properties" : { diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java index bef1a627..ecc581f2 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java @@ -503,11 +503,17 @@ public Response repair(RepairRequest repairRequest) { } app.cqlService.executePreparedStatement( app.dbUnixSocketFile, - "CALL NodeOps.repair(?, ?, ?, ?)", + "CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)", repairRequest.keyspaceName, repairRequest.tables, repairRequest.full, - false); + false, + // The default repair does not allow for specifying things like parallelism, + // threadCounts, source DCs or ranges etc. + null, + null, + null, + null); return Response.ok("OK").build(); }); diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/common/BaseResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/common/BaseResources.java index 6d4b227d..ac4a4c27 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/common/BaseResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/common/BaseResources.java @@ -65,6 +65,7 @@ protected Response handle(Callable action) { .entity("Internal connection to Cassandra closed") .build(); } catch (Throwable t) { + t.printStackTrace(); return Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR) .entity(t.getLocalizedMessage()) .build(); diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java index cd7a016c..0b1c70a8 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java @@ -154,11 +154,15 @@ public Response repair(RepairRequest repairRequest) { ResponseTools.getSingleRowStringResponse( app.dbUnixSocketFile, app.cqlService, - "CALL NodeOps.repair(?, ?, ?, ?)", + "CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)", repairRequest.keyspaceName, repairRequest.tables, repairRequest.full, - true)) + true, + null, + null, + null, + null)) .build(); }); } diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2.java new file mode 100644 index 00000000..98e2559a --- /dev/null +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2.java @@ -0,0 +1,140 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.resources.v2; + +import com.datastax.mgmtapi.ManagementApplication; +import com.datastax.mgmtapi.resources.common.BaseResources; +import com.datastax.mgmtapi.resources.v2.models.RepairParallelism; +import com.datastax.mgmtapi.resources.v2.models.RepairRequest; +import com.datastax.mgmtapi.resources.v2.models.RepairRequestResponse; +import com.datastax.mgmtapi.resources.v2.models.RingRange; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.ExampleObject; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import java.util.List; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/api/v2/repairs") +public class RepairResourcesV2 extends BaseResources { + + public RepairResourcesV2(ManagementApplication application) { + super(application); + } + + @PUT + @Operation(summary = "Initiate a new repair", operationId = "putRepairV2") + @Produces(MediaType.APPLICATION_JSON) + @Consumes("application/json") + @ApiResponse( + responseCode = "202", + description = "Repair Successfully requested", + content = + @Content( + mediaType = MediaType.APPLICATION_JSON, + schema = @Schema(implementation = RepairRequestResponse.class), + examples = @ExampleObject(value = "Accepted"))) + @ApiResponse( + responseCode = "400", + description = "Repair request missing Keyspace name", + content = + @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema(implementation = Response.Status.class), + examples = @ExampleObject(value = "keyspace must be specified"))) + @ApiResponse( + responseCode = "500", + description = "internal error, we did not receive the expected repair ID from Cassandra.", + content = + @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema(implementation = Response.Status.class), + examples = + @ExampleObject( + value = + "internal error, we did not receive the expected repair ID from Cassandra."))) + public final Response repair(RepairRequest request) { + return handle( + () -> { + if (request.keyspace == null) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("keyspaceName must be specified") + .build(); + } + + ResultSet res = + app.cqlService.executePreparedStatement( + app.dbUnixSocketFile, + "CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)", + request.keyspace, + request.tables, + request.fullRepair, + request.notifications, + getParallelismName(request.repairParallelism), + request.datacenters, + getRingRangeString(request.associatedTokens), + request.repairThreadCount); + try { + Row row = res.one(); + String repairID = row.getString(0); + return Response.accepted(new RepairRequestResponse(repairID)).build(); + } catch (Exception e) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Repair request failed: " + e.getMessage()) + .build(); + } + }); + } + + @DELETE + @Operation(summary = "Cancel all repairs", operationId = "deleteRepairsV2") + @Produces(MediaType.APPLICATION_JSON) + @ApiResponse( + responseCode = "202", + description = "Cancel repairs Successfully requested", + content = + @Content( + mediaType = MediaType.APPLICATION_JSON, + schema = @Schema(implementation = RepairRequestResponse.class), + examples = @ExampleObject(value = "Accepted"))) + public Response cancelAllRepairs() { + return handle( + () -> { + app.cqlService.executePreparedStatement( + app.dbUnixSocketFile, "CALL NodeOps.stopAllRepairs()"); + return Response.accepted().build(); + }); + } + + private String getParallelismName(RepairParallelism parallelism) { + return parallelism != null ? parallelism.getName() : null; + } + + private String getRingRangeString(List associatedTokens) { + if (associatedTokens != null && !associatedTokens.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (RingRange ringRange : associatedTokens) { + sb.append(toRangeString(ringRange)).append(","); + } + // remove trailing comma + return sb.substring(0, sb.length() - 2); + } + return null; + } + + private String toRangeString(RingRange ringRange) { + return String.join(":", ringRange.start.toString(), ringRange.end.toString()); + } +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java new file mode 100644 index 00000000..69670d73 --- /dev/null +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java @@ -0,0 +1,39 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ + +package com.datastax.mgmtapi.resources.v2.models; + +import com.fasterxml.jackson.annotation.JsonValue; + +public enum RepairParallelism { + SEQUENTIAL("sequential"), + PARALLEL("parallel"), + DATACENTER_AWARE("dc_parallel"); + + private final String name; + + public static RepairParallelism fromName(String name) { + if (PARALLEL.getName().equals(name)) { + return PARALLEL; + } else { + return DATACENTER_AWARE.getName().equals(name) ? DATACENTER_AWARE : SEQUENTIAL; + } + } + + private RepairParallelism(String name) { + this.name = name; + } + + @JsonValue + public String getName() { + return this.name; + } + + @Override + public String toString() { + return this.getName(); + } +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java new file mode 100644 index 00000000..f6dd47ce --- /dev/null +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java @@ -0,0 +1,93 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.resources.v2.models; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; + +public class RepairRequest { + + @JsonProperty(value = "keyspace", required = true) + public final String keyspace; + + @Nullable + @JsonProperty(value = "tables") + public final List tables; + + @JsonProperty(value = "full_repair", defaultValue = "true") + public final Boolean fullRepair; + + @JsonProperty(value = "notifications", defaultValue = "true") + public final Boolean notifications; + + @Nullable + @JsonProperty(value = "associated_tokens") + public final List associatedTokens; + + @Nullable + @JsonProperty(value = "repair_parallelism") + public final RepairParallelism repairParallelism; + + @Nullable + @JsonProperty(value = "datacenters") + public final List datacenters; + + @Nullable + @JsonProperty(value = "repair_thread_count") + public final Integer repairThreadCount; + + @JsonCreator + public RepairRequest( + @JsonProperty(value = "keyspace", required = true) String keyspace, + @JsonProperty(value = "tables") List tables, + @JsonProperty(value = "full_repair", defaultValue = "true") Boolean fullRepair, + @JsonProperty(value = "notifications", defaultValue = "true") boolean notifications, + @JsonProperty(value = "associated_tokens") List associatedTokens, + @JsonProperty(value = "repair_parallelism") RepairParallelism repairParallelism, + @JsonProperty(value = "datacenters") List datacenters, + @JsonProperty(value = "repair_thread_count") Integer repairThreadCount) { + this.keyspace = keyspace; + this.tables = tables; + this.fullRepair = fullRepair; + this.notifications = notifications; + this.associatedTokens = associatedTokens; + this.datacenters = datacenters; + this.repairParallelism = repairParallelism; + this.repairThreadCount = repairThreadCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RepairRequest other = (RepairRequest) o; + return Objects.equals(keyspace, other.keyspace) + && Objects.equals(tables, other.tables) + && Objects.equals(fullRepair, other.fullRepair) + && Objects.equals(associatedTokens, other.associatedTokens) + && Objects.equals(datacenters, other.datacenters) + && Objects.equals(repairParallelism, other.repairParallelism) + && Objects.equals(repairThreadCount, other.repairThreadCount); + } + + @Override + public int hashCode() { + return Objects.hashCode(keyspace) + + Objects.hashCode(tables) + + Objects.hashCode(fullRepair) + + Objects.hashCode(associatedTokens) + + Objects.hashCode(datacenters) + + Objects.hashCode(repairParallelism) + + Objects.hashCode(repairThreadCount); + } +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java new file mode 100644 index 00000000..0e57f45b --- /dev/null +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java @@ -0,0 +1,48 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ + +package com.datastax.mgmtapi.resources.v2.models; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Objects; + +public class RepairRequestResponse { + @JsonProperty(value = "repair_id", required = true) + public final String repairID; + + @JsonCreator + public RepairRequestResponse( + @JsonProperty(value = "repair_id", required = true) String repairID) { + this.repairID = repairID; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return Objects.equals(repairID, ((RepairRequestResponse) o).repairID); + } + + @Override + public int hashCode() { + return Objects.hashCode(repairID); + } + + @Override + public String toString() { + try { + return new ObjectMapper().writeValueAsString(this); + } catch (Exception e) { + return this.repairID; + } + } +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java new file mode 100644 index 00000000..0b31708e --- /dev/null +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java @@ -0,0 +1,57 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ + +package com.datastax.mgmtapi.resources.v2.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.math.BigInteger; +import java.util.Comparator; +import java.util.Objects; + +public final class RingRange { + public static final Comparator START_COMPARATOR = + (RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start); + + @JsonProperty(value = "start", required = true) + public final BigInteger start; + + @JsonProperty(value = "end", required = true) + public final BigInteger end; + + public RingRange( + @JsonProperty(value = "start", required = true) BigInteger start, + @JsonProperty(value = "end", required = true) BigInteger end) { + this.start = start; + this.end = end; + } + + public RingRange(String... range) { + start = new BigInteger(range[0]); + end = new BigInteger(range[1]); + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + @Override + public int hashCode() { + return Objects.hashCode(start) + Objects.hashCode(end); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof RingRange)) { + return false; + } + RingRange other = (RingRange) o; + return Objects.equals(start, other.start) && Objects.equals(end, other.end); + } +} diff --git a/management-api-server/src/test/java/com/datastax/mgmtapi/BaseDockerIntegrationTest.java b/management-api-server/src/test/java/com/datastax/mgmtapi/BaseDockerIntegrationTest.java index 1fe48ad4..bf934ee9 100644 --- a/management-api-server/src/test/java/com/datastax/mgmtapi/BaseDockerIntegrationTest.java +++ b/management-api-server/src/test/java/com/datastax/mgmtapi/BaseDockerIntegrationTest.java @@ -60,7 +60,7 @@ protected void failed(Throwable e, Description description) { System.err.flush(); if (null != docker) { - int numberOfLines = 100; + int numberOfLines = 1000; System.out.printf("=====> Showing last %d entries of system.log%n", numberOfLines); docker.tailSystemLog(numberOfLines); System.out.printf("=====> End of last %d entries of system.log%n", numberOfLines); diff --git a/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java b/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java index 7de92708..ba3dddfc 100644 --- a/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java +++ b/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java @@ -1659,11 +1659,15 @@ public void testRepair() throws Exception { verify(context.cqlService) .executePreparedStatement( any(), - eq("CALL NodeOps.repair(?, ?, ?, ?)"), + eq("CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)"), eq("test_ks"), eq(null), eq(true), - eq(false)); + eq(false), + eq(null), + eq(null), + eq(null), + eq(null)); } @Test @@ -1698,11 +1702,15 @@ public void testRepairAsync() throws Exception { verify(context.cqlService) .executePreparedStatement( any(), - eq("CALL NodeOps.repair(?, ?, ?, ?)"), + eq("CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)"), eq("test_ks"), eq(null), eq(true), - eq(true)); + eq(true), + eq(null), + eq(null), + eq(null), + eq(null)); } @Test diff --git a/management-api-server/src/test/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2Test.java b/management-api-server/src/test/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2Test.java new file mode 100644 index 00000000..12abe689 --- /dev/null +++ b/management-api-server/src/test/java/com/datastax/mgmtapi/resources/v2/RepairResourcesV2Test.java @@ -0,0 +1,115 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.resources.v2; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.datastax.mgmtapi.CqlService; +import com.datastax.mgmtapi.ManagementApplication; +import com.datastax.mgmtapi.resources.v2.models.RepairParallelism; +import com.datastax.mgmtapi.resources.v2.models.RepairRequest; +import com.datastax.mgmtapi.resources.v2.models.RepairRequestResponse; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.ws.rs.core.Response; +import org.junit.Test; + +public class RepairResourcesV2Test { + + @Test + public void testRepairResourcesSuccess() throws Exception { + CqlService mockCqlService = mock(CqlService.class); + ManagementApplication app = + new ManagementApplication( + null, null, new File("/tmp/cassandra.sock"), mockCqlService, null); + ResultSet mockResultSet = mock(ResultSet.class); + Row mockRow = mock(Row.class); + when(mockResultSet.one()).thenReturn(mockRow); + when(mockRow.getString(anyInt())).thenReturn("mockRepairID"); + when(mockCqlService.executePreparedStatement( + any(), anyString(), any(), any(), any(), any(), any(), any(), any(), any())) + .thenReturn(mockResultSet); + RepairResourcesV2 unit = new RepairResourcesV2(app); + RepairRequest req = + new RepairRequest( + "keyspace", + Collections.singletonList("table1"), + false, + true, + Collections.EMPTY_LIST, + RepairParallelism.DATACENTER_AWARE, + Collections.EMPTY_LIST, + 1); + Response resp = unit.repair(req); + assertEquals(202, resp.getStatus()); + assertEquals("mockRepairID", ((RepairRequestResponse) resp.getEntity()).repairID); + verify(mockCqlService) + .executePreparedStatement( + any(), + eq("CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)"), + eq("keyspace"), + eq(Collections.singletonList("table1")), + eq(false), + eq(true), + eq(RepairParallelism.DATACENTER_AWARE.getName()), + eq(Collections.EMPTY_LIST), + eq(null), + eq(Integer.valueOf(1))); + } + + @Test + public void testRepairResourcesFail() throws Exception { + CqlService mockCqlService = mock(CqlService.class); + ManagementApplication app = + new ManagementApplication( + null, null, new File("/tmp/cassandra.sock"), mockCqlService, null); + ResultSet mockResultSet = mock(ResultSet.class); + Row mockRow = mock(Row.class); + when(mockRow.getString(anyString())).thenReturn("mockrepairID"); + when(mockCqlService.executePreparedStatement( + any(), anyString(), any(), any(), any(), any(), any(), any(), any(), any())) + .thenReturn(mockResultSet); + RepairResourcesV2 unit = new RepairResourcesV2(app); + List tables = new ArrayList<>(); + tables.add("table1"); + tables.add("table2"); + RepairRequest req = + new RepairRequest( + "", + tables, + false, + true, + Collections.EMPTY_LIST, + RepairParallelism.DATACENTER_AWARE, + Collections.EMPTY_LIST, + 1); + Response resp = unit.repair(req); + assertEquals(500, resp.getStatus()); + } + + @Test + public void testCancelAllRepairs() throws Exception { + CqlService mockCqlService = mock(CqlService.class); + ManagementApplication app = + new ManagementApplication( + null, null, new File("/tmp/cassandra.sock"), mockCqlService, null); + RepairResourcesV2 unit = new RepairResourcesV2(app); + Response resp = unit.cancelAllRepairs(); + assertEquals(202, resp.getStatus()); + verify(mockCqlService).executePreparedStatement(any(), eq("CALL NodeOps.stopAllRepairs()")); + } +}