Skip to content

Commit

Permalink
Reaper endpoints: Async Repair Endpoint (#358)
Browse files Browse the repository at this point in the history
* Allow `repair()` RPC method to take more arguments (as required by Reaper). Get the old NodeOpsResources class calling the new RPC method signature.

---------

Co-authored-by: Erik Merkle <[email protected]>

* Cancel repairs endpoint (#368)

* Add cancelAllRepairs endpoint and requisite NodeOps.
  • Loading branch information
Miles-Garnsey authored Aug 31, 2023
1 parent da8d906 commit 148df59
Show file tree
Hide file tree
Showing 13 changed files with 741 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -746,96 +746,123 @@ public String repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "notifications") boolean notifications)
@RpcParam(name = "notifications") boolean notifications,
@RpcParam(name = "repairParallelism") String repairParallelism,
@RpcParam(name = "datacenters") List<String> datacenters,
@RpcParam(name = "associatedTokens") String ringRangeString,
@RpcParam(name = "repairThreadCount") Integer repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
// create the repair spec
Map<String, String> repairSpec = new HashMap<>();

// add any specified tables to the repair spec
if (tables != null && !tables.isEmpty()) {
// set the tables/column families
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
assert (keyspace != null);
Map<String, String> repairSpec = new HashMap<>();
// add tables/column families
if (tables != null && !tables.isEmpty()) {
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
}
// set incremental reapir
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
// Parallelism should be set if it's requested OR if incremental repair is requested.
if (!full) {
// Incremental repair requested, make sure parallelism is correct
if (repairParallelism != null
&& !RepairParallelism.PARALLEL.getName().equals(repairParallelism)) {
throw new IOException(
"Invalid repair combination. Incremental repair if Parallelism is not set");
}

// handle incremental vs full
boolean isIncremental = Boolean.FALSE.equals(full);
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental));
if (isIncremental) {
// incremental repairs will fail if parallelism is not set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
// Incremental repair and parallelism should be set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
}
if (repairThreadCount != null) {
// if specified, the value should be at least 1
if (repairThreadCount.compareTo(Integer.valueOf(0)) <= 0) {
throw new IOException(
"Invalid repari thread count: "
+ repairThreadCount
+ ". Value should be greater than 0");
}
repairSpec.put(RepairOption.JOB_THREADS_KEY, repairThreadCount.toString());
}
repairSpec.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));

// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);
if (ringRangeString != null && !ringRangeString.isEmpty()) {
repairSpec.put(RepairOption.RANGES_KEY, ringRangeString);
}
// add datacenters to the repair spec
if (datacenters != null && !datacenters.isEmpty()) {
repairSpec.put(RepairOption.DATACENTERS_KEY, String.join(",", datacenters));
}

if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}
// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);

String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);
if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);
String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

throw new RuntimeException("At least one keyspace must be defined");
ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);

return job.getJobId();
}

@Rpc(name = "stopAllRepairs")
public void stopAllRepairs() {
ShimLoader.instance.get().getStorageService().forceTerminateAllRepairSessions();
}

@Rpc(name = "move")
Expand Down
119 changes: 116 additions & 3 deletions management-api-server/doc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1740,6 +1740,75 @@
},
"summary" : "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version). This operation is asynchronous and returns immediately."
}
},
"/api/v2/repairs" : {
"delete" : {
"operationId" : "deleteRepairsV2",
"responses" : {
"202" : {
"content" : {
"application/json" : {
"example" : "Accepted",
"schema" : {
"$ref" : "#/components/schemas/RepairRequestResponse"
}
}
},
"description" : "Cancel repairs Successfully requested"
}
},
"summary" : "Cancel all repairs"
},
"put" : {
"operationId" : "putRepairV2",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/RepairRequest"
}
}
}
},
"responses" : {
"202" : {
"content" : {
"application/json" : {
"example" : "Accepted",
"schema" : {
"$ref" : "#/components/schemas/RepairRequestResponse"
}
}
},
"description" : "Repair Successfully requested"
},
"400" : {
"content" : {
"text/plain" : {
"example" : "keyspace must be specified",
"schema" : {
"type" : "string",
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
}
}
},
"description" : "Repair request missing Keyspace name"
},
"500" : {
"content" : {
"text/plain" : {
"example" : "internal error, we did not receive the expected repair ID from Cassandra.",
"schema" : {
"type" : "string",
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
}
}
},
"description" : "internal error, we did not receive the expected repair ID from Cassandra."
}
},
"summary" : "Initiate a new repair"
}
}
},
"components" : {
Expand Down Expand Up @@ -2014,20 +2083,52 @@
"RepairRequest" : {
"type" : "object",
"properties" : {
"full" : {
"associated_tokens" : {
"type" : "array",
"items" : {
"$ref" : "#/components/schemas/RingRange"
}
},
"datacenters" : {
"type" : "array",
"items" : {
"type" : "string"
}
},
"full_repair" : {
"type" : "boolean"
},
"keyspace_name" : {
"keyspace" : {
"type" : "string"
},
"notifications" : {
"type" : "boolean"
},
"repair_parallelism" : {
"type" : "string",
"enum" : [ "sequential", "parallel", "dc_parallel" ]
},
"repair_thread_count" : {
"type" : "integer",
"format" : "int32"
},
"tables" : {
"type" : "array",
"items" : {
"type" : "string"
}
}
},
"required" : [ "keyspace_name" ]
"required" : [ "keyspace" ]
},
"RepairRequestResponse" : {
"type" : "object",
"properties" : {
"repair_id" : {
"type" : "string"
}
},
"required" : [ "repair_id" ]
},
"ReplicationSetting" : {
"type" : "object",
Expand All @@ -2042,6 +2143,18 @@
},
"required" : [ "dc_name", "replication_factor" ]
},
"RingRange" : {
"type" : "object",
"properties" : {
"end" : {
"type" : "integer"
},
"start" : {
"type" : "integer"
}
},
"required" : [ "end", "start" ]
},
"ScrubRequest" : {
"type" : "object",
"properties" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,17 @@ public Response repair(RepairRequest repairRequest) {
}
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
false);
false,
// The default repair does not allow for specifying things like parallelism,
// threadCounts, source DCs or ranges etc.
null,
null,
null,
null);

return Response.ok("OK").build();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ protected Response handle(Callable<Response> action) {
.entity("Internal connection to Cassandra closed")
.build();
} catch (Throwable t) {
t.printStackTrace();
return Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR)
.entity(t.getLocalizedMessage())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,15 @@ public Response repair(RepairRequest repairRequest) {
ResponseTools.getSingleRowStringResponse(
app.dbUnixSocketFile,
app.cqlService,
"CALL NodeOps.repair(?, ?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
true))
true,
null,
null,
null,
null))
.build();
});
}
Expand Down
Loading

0 comments on commit 148df59

Please sign in to comment.