Skip to content

Commit

Permalink
delete connectors api with wildcard
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Sep 30, 2024
1 parent 5286f98 commit 594499f
Show file tree
Hide file tree
Showing 2 changed files with 770 additions and 591 deletions.
245 changes: 144 additions & 101 deletions src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand Down Expand Up @@ -89,7 +88,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec

if (!connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) {
return Mono.error(new ResourceValidationException(connector,
invalidOwner(connector.getMetadata().getName())));
invalidOwner(connector.getMetadata().getName())));
}

// Set / Override name in spec.config.name, required for several Kafka Connect API calls
Expand All @@ -104,58 +103,58 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec

// Validate locally
return connectorService.validateLocally(ns, connector)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, validationErrors));
}

// Validate against connect rest API /validate
return connectorService.validateRemotely(ns, connector)
.flatMap(remoteValidationErrors -> {
if (!remoteValidationErrors.isEmpty()) {
return Mono.error(
new ResourceValidationException(connector, remoteValidationErrors));
}

// Augment with server side fields
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
connector.getMetadata().setNamespace(ns.getMetadata().getName());
connector.setStatus(Connector.ConnectorStatus.builder()
.state(Connector.TaskState.UNASSIGNED)
.build());

Optional<Connector> existingConnector =
connectorService.findByName(ns, connector.getMetadata().getName());
if (existingConnector.isPresent() && existingConnector.get().equals(connector)) {
return Mono.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged));
}

ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created;

// Only check quota on connector creation
if (status.equals(ApplyStatus.created)) {
List<String> quotaErrors = resourceQuotaService.validateConnectorQuota(ns);
if (!quotaErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, quotaErrors));
}
}

if (dryrun) {
return Mono.just(formatHttpResponse(connector, status));
}

sendEventLog(
connector,
status,
existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec(),
EMPTY_STRING
);

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
});
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, validationErrors));
}

// Validate against connect rest API /validate
return connectorService.validateRemotely(ns, connector)
.flatMap(remoteValidationErrors -> {
if (!remoteValidationErrors.isEmpty()) {
return Mono.error(
new ResourceValidationException(connector, remoteValidationErrors));
}

// Augment with server side fields
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
connector.getMetadata().setNamespace(ns.getMetadata().getName());
connector.setStatus(Connector.ConnectorStatus.builder()
.state(Connector.TaskState.UNASSIGNED)
.build());

Optional<Connector> existingConnector =
connectorService.findByName(ns, connector.getMetadata().getName());
if (existingConnector.isPresent() && existingConnector.get().equals(connector)) {
return Mono.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged));
}

ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created;

// Only check quota on connector creation
if (status.equals(ApplyStatus.created)) {
List<String> quotaErrors = resourceQuotaService.validateConnectorQuota(ns);
if (!quotaErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, quotaErrors));
}
}

if (dryrun) {
return Mono.just(formatHttpResponse(connector, status));
}

sendEventLog(
connector,
status,
existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec(),
EMPTY_STRING
);

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
});
}

/**
Expand All @@ -165,9 +164,11 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
* @deprecated use bulkDelete instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connector}{?dryrun}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace, String connector,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
Expand All @@ -189,16 +190,58 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
Connector connectorToDelete = optionalConnector.get();

sendEventLog(
connectorToDelete,
ApplyStatus.deleted,
connectorToDelete.getSpec(),
null,
EMPTY_STRING
connectorToDelete,
ApplyStatus.deleted,
connectorToDelete.getSpec(),
null,
EMPTY_STRING
);

return connectorService
.delete(ns, optionalConnector.get())
.map(httpResponse -> HttpResponse.noContent());
.delete(ns, optionalConnector.get())
.map(httpResponse -> HttpResponse.noContent());
}

/**
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);

// Validate ownership
List<String> validationErrors = connectors.stream()
.filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName()))
.map(connector -> invalidOwner(connector.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors));
}

if (connectors.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
}

/**
Expand Down Expand Up @@ -232,30 +275,30 @@ public Mono<MutableHttpResponse<ChangeConnectorState>> changeState(String namesp
case resume -> response = connectorService.resume(ns, optionalConnector.get());
default -> {
return Mono.error(
new IllegalStateException("Unspecified action " + state.getSpec().getAction()));
new IllegalStateException("Unspecified action " + state.getSpec().getAction()));
}
}

return response
.doOnSuccess(success -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(true)
.code(success.status())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.doOnError(error -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(false)
.code(HttpStatus.INTERNAL_SERVER_ERROR)
.errorMessage(error.getMessage())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.map(httpResponse -> HttpResponse.ok(state))
.onErrorReturn(HttpResponse.ok(state));
.doOnSuccess(success -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(true)
.code(success.status())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.doOnError(error -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(false)
.code(HttpStatus.INTERNAL_SERVER_ERROR)
.errorMessage(error.getMessage())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.map(httpResponse -> HttpResponse.ok(state))
.onErrorReturn(HttpResponse.ok(state));
}

/**
Expand All @@ -269,24 +312,24 @@ public Mono<MutableHttpResponse<ChangeConnectorState>> changeState(String namesp
public Flux<Connector> importResources(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
return connectorService.listUnsynchronizedConnectors(ns)
.map(unsynchronizedConnector -> {
unsynchronizedConnector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
unsynchronizedConnector.getMetadata().setCluster(ns.getMetadata().getCluster());
unsynchronizedConnector.getMetadata().setNamespace(ns.getMetadata().getName());

if (dryrun) {
return unsynchronizedConnector;
}

sendEventLog(
unsynchronizedConnector,
ApplyStatus.created,
null,
unsynchronizedConnector.getSpec(),
EMPTY_STRING
);

return connectorService.createOrUpdate(unsynchronizedConnector);
});
.map(unsynchronizedConnector -> {
unsynchronizedConnector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
unsynchronizedConnector.getMetadata().setCluster(ns.getMetadata().getCluster());
unsynchronizedConnector.getMetadata().setNamespace(ns.getMetadata().getName());

if (dryrun) {
return unsynchronizedConnector;
}

sendEventLog(
unsynchronizedConnector,
ApplyStatus.created,
null,
unsynchronizedConnector.getSpec(),
EMPTY_STRING
);

return connectorService.createOrUpdate(unsynchronizedConnector);
});
}
}
}
Loading

0 comments on commit 594499f

Please sign in to comment.