Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle wildcard parameter in Connector deletion API #450

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 144 additions & 101 deletions src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand Down Expand Up @@ -89,7 +88,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec

if (!connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) {
return Mono.error(new ResourceValidationException(connector,
invalidOwner(connector.getMetadata().getName())));
invalidOwner(connector.getMetadata().getName())));
}

// Set / Override name in spec.config.name, required for several Kafka Connect API calls
Expand All @@ -104,58 +103,58 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec

// Validate locally
return connectorService.validateLocally(ns, connector)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, validationErrors));
}

// Validate against connect rest API /validate
return connectorService.validateRemotely(ns, connector)
.flatMap(remoteValidationErrors -> {
if (!remoteValidationErrors.isEmpty()) {
return Mono.error(
new ResourceValidationException(connector, remoteValidationErrors));
}

// Augment with server side fields
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
connector.getMetadata().setNamespace(ns.getMetadata().getName());
connector.setStatus(Connector.ConnectorStatus.builder()
.state(Connector.TaskState.UNASSIGNED)
.build());

Optional<Connector> existingConnector =
connectorService.findByName(ns, connector.getMetadata().getName());
if (existingConnector.isPresent() && existingConnector.get().equals(connector)) {
return Mono.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged));
}

ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created;

// Only check quota on connector creation
if (status.equals(ApplyStatus.created)) {
List<String> quotaErrors = resourceQuotaService.validateConnectorQuota(ns);
if (!quotaErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, quotaErrors));
}
}

if (dryrun) {
return Mono.just(formatHttpResponse(connector, status));
}

sendEventLog(
connector,
status,
existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec(),
EMPTY_STRING
);

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
});
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, validationErrors));
}

// Validate against connect rest API /validate
return connectorService.validateRemotely(ns, connector)
.flatMap(remoteValidationErrors -> {
if (!remoteValidationErrors.isEmpty()) {
return Mono.error(
new ResourceValidationException(connector, remoteValidationErrors));
}

// Augment with server side fields
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
connector.getMetadata().setNamespace(ns.getMetadata().getName());
connector.setStatus(Connector.ConnectorStatus.builder()
.state(Connector.TaskState.UNASSIGNED)
.build());

Optional<Connector> existingConnector =
connectorService.findByName(ns, connector.getMetadata().getName());
if (existingConnector.isPresent() && existingConnector.get().equals(connector)) {
return Mono.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged));
}

ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created;

// Only check quota on connector creation
if (status.equals(ApplyStatus.created)) {
List<String> quotaErrors = resourceQuotaService.validateConnectorQuota(ns);
if (!quotaErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(connector, quotaErrors));
}
}

if (dryrun) {
return Mono.just(formatHttpResponse(connector, status));
}

sendEventLog(
connector,
status,
existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec(),
EMPTY_STRING
);

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
});
}

/**
Expand All @@ -165,9 +164,11 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
* @deprecated use bulkDelete instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connector}{?dryrun}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace, String connector,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
Expand All @@ -189,16 +190,58 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
Connector connectorToDelete = optionalConnector.get();

sendEventLog(
connectorToDelete,
ApplyStatus.deleted,
connectorToDelete.getSpec(),
null,
EMPTY_STRING
connectorToDelete,
ApplyStatus.deleted,
connectorToDelete.getSpec(),
null,
EMPTY_STRING
);

return connectorService
.delete(ns, optionalConnector.get())
.map(httpResponse -> HttpResponse.noContent());
.delete(ns, optionalConnector.get())
.map(httpResponse -> HttpResponse.noContent());
}

/**
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);

// Validate ownership
List<String> validationErrors = connectors.stream()
.filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName()))
.map(connector -> invalidOwner(connector.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors));
}

if (connectors.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
}

/**
Expand Down Expand Up @@ -232,30 +275,30 @@ public Mono<MutableHttpResponse<ChangeConnectorState>> changeState(String namesp
case resume -> response = connectorService.resume(ns, optionalConnector.get());
default -> {
return Mono.error(
new IllegalStateException("Unspecified action " + state.getSpec().getAction()));
new IllegalStateException("Unspecified action " + state.getSpec().getAction()));
}
}

return response
.doOnSuccess(success -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(true)
.code(success.status())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.doOnError(error -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(false)
.code(HttpStatus.INTERNAL_SERVER_ERROR)
.errorMessage(error.getMessage())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.map(httpResponse -> HttpResponse.ok(state))
.onErrorReturn(HttpResponse.ok(state));
.doOnSuccess(success -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(true)
.code(success.status())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.doOnError(error -> {
state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
.success(false)
.code(HttpStatus.INTERNAL_SERVER_ERROR)
.errorMessage(error.getMessage())
.build());
state.setMetadata(optionalConnector.get().getMetadata());
state.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
})
.map(httpResponse -> HttpResponse.ok(state))
.onErrorReturn(HttpResponse.ok(state));
}

/**
Expand All @@ -269,24 +312,24 @@ public Mono<MutableHttpResponse<ChangeConnectorState>> changeState(String namesp
public Flux<Connector> importResources(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
return connectorService.listUnsynchronizedConnectors(ns)
.map(unsynchronizedConnector -> {
unsynchronizedConnector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
unsynchronizedConnector.getMetadata().setCluster(ns.getMetadata().getCluster());
unsynchronizedConnector.getMetadata().setNamespace(ns.getMetadata().getName());

if (dryrun) {
return unsynchronizedConnector;
}

sendEventLog(
unsynchronizedConnector,
ApplyStatus.created,
null,
unsynchronizedConnector.getSpec(),
EMPTY_STRING
);

return connectorService.createOrUpdate(unsynchronizedConnector);
});
.map(unsynchronizedConnector -> {
unsynchronizedConnector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
unsynchronizedConnector.getMetadata().setCluster(ns.getMetadata().getCluster());
unsynchronizedConnector.getMetadata().setNamespace(ns.getMetadata().getName());

if (dryrun) {
return unsynchronizedConnector;
}

sendEventLog(
unsynchronizedConnector,
ApplyStatus.created,
null,
unsynchronizedConnector.getSpec(),
EMPTY_STRING
);

return connectorService.createOrUpdate(unsynchronizedConnector);
});
}
}
}
Loading
Loading