Skip to content

Commit

Permalink
Try fix by deleting 1 with wildcard
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Oct 21, 2024
1 parent d28bd5a commit 662e773
Show file tree
Hide file tree
Showing 23 changed files with 291 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Optional<Connector> get(String namespace, String connector) {
*
* @param namespace The namespace
* @param connector The connector to create
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created connector
*/
@Post("{?dryrun}")
Expand Down Expand Up @@ -162,7 +162,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
*
* @param namespace The current namespace
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @param dryrun Is dry run mode or not?
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
Expand Down Expand Up @@ -206,14 +206,14 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @param name The name parameter
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
public Mono<HttpResponse<List<Connector>>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);
Expand All @@ -233,15 +233,15 @@ public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(default
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
return Mono.just(HttpResponse.ok(connectors));
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
.then(Mono.just(HttpResponse.ok(connectors)));
}

/**
Expand Down Expand Up @@ -305,7 +305,7 @@ public Mono<MutableHttpResponse<ChangeConnectorState>> changeState(String namesp
* Import unsynchronized connectors.
*
* @param namespace The namespace
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return The list of imported connectors
*/
@Post("/_/import{?dryrun}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ConsumerGroupController extends NamespacedResourceController {
* @param namespace The namespace
* @param consumerGroup The consumer group
* @param consumerGroupResetOffsets The information about how to reset
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return The reset offsets response
*/
@Post("/{consumerGroup}/reset{?dryrun}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Optional<Namespace> get(String namespace) {
* Create a namespace.
*
* @param namespace The namespace
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created namespace
*/
@Post("{?dryrun}")
Expand Down Expand Up @@ -118,19 +118,21 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
* Delete a namespace.
*
* @param namespace The namespace
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return An HTTP response
* @deprecated use bulkDelete instead.
*/
@Delete("/{namespace}{?dryrun}")
@Deprecated(since = "1.13.0")
public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
Optional<Namespace> optionalNamespace = namespaceService.findByName(namespace);

if (optionalNamespace.isEmpty()) {
return HttpResponse.notFound();
}

List<String> namespaceResources = namespaceService.findAllResourcesByNamespace(optionalNamespace.get());

if (!namespaceResources.isEmpty()) {
List<String> validationErrors = namespaceResources
.stream()
Expand All @@ -143,21 +145,31 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
return HttpResponse.noContent();
}

performDeletion(optionalNamespace.get());
sendEventLog(
optionalNamespace.get(),
ApplyStatus.deleted,
optionalNamespace.get().getSpec(),
null,
EMPTY_STRING
);

namespaceService.delete(optionalNamespace.get());

return HttpResponse.noContent();
}

/**
* Delete namespaces.
*
* @param dryrun Is dry run mode or not ?
* @param name The name parameter
* @param dryrun Is dry run mode or not?
* @param name The name parameter
* @return An HTTP response
*/
@Delete
public HttpResponse<Void> bulkDelete(@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
public HttpResponse<List<Namespace>> bulkDelete(@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
List<Namespace> namespaces = namespaceService.findByWildcardName(name);

if (namespaces.isEmpty()) {
return HttpResponse.notFound();
}
Expand All @@ -182,26 +194,21 @@ public HttpResponse<Void> bulkDelete(@QueryValue(defaultValue = "*") String name
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(namespaces);
}

namespaces.forEach(this::performDeletion);
return HttpResponse.noContent();
}
namespaces.forEach(namespace -> {
sendEventLog(
namespace,
ApplyStatus.deleted,
namespace.getSpec(),
null,
EMPTY_STRING
);

/**
* Perform the deletion of the namespace and send an event log.
*
* @param namespace The namespace to delete
*/
private void performDeletion(Namespace namespace) {
sendEventLog(
namespace,
ApplyStatus.deleted,
namespace.getSpec(),
null,
EMPTY_STRING
);
namespaceService.delete(namespace);
namespaceService.delete(namespace);
});

return HttpResponse.ok(namespaces);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Optional<RoleBinding> get(String namespace, String name) {
*
* @param namespace The namespace
* @param roleBinding The role binding
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created role binding
*/
@Post("{?dryrun}")
Expand Down Expand Up @@ -147,17 +147,18 @@ public HttpResponse<Void> delete(String namespace, String name,
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
public HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
public HttpResponse<List<RoleBinding>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
List<RoleBinding> roleBindings = roleBindingService.findByWildcardName(namespace, name);

if (roleBindings.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(roleBindings);
}

roleBindings.forEach(roleBinding -> {
Expand All @@ -171,6 +172,6 @@ public HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue
roleBindingService.delete(roleBinding);
});

return HttpResponse.noContent();
return HttpResponse.ok(roleBindings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Instant;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -88,7 +89,7 @@ public Mono<Schema> get(String namespace, String subject) {
*
* @param namespace The namespace
* @param schema The schema to create
* @param dryrun Does the creation is a dry run
* @param dryrun Is dry run mode or not?
* @return The created schema
*/
@Post
Expand Down Expand Up @@ -161,12 +162,12 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
public Mono<HttpResponse<List<Schema>>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

return schemaService.findByWildcardName(ns, name)
Expand All @@ -176,17 +177,22 @@ public Mono<HttpResponse<Void>> bulkDelete(String namespace,
.map(Optional::of)
.defaultIfEmpty(Optional.empty()))
.collectList()
.flatMap(schemas -> {
if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) {
.flatMap(optionalSchemas -> {
if (optionalSchemas.isEmpty() || optionalSchemas.stream().anyMatch(Optional::isEmpty)) {
return Mono.just(HttpResponse.notFound());
}

List<Schema> schemas = optionalSchemas
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

if (dryrun) {
return Mono.just(HttpResponse.noContent());
return Mono.just(HttpResponse.ok(schemas));
}

return Flux.fromIterable(schemas)
.map(Optional::get)
.flatMap(schema -> (versionOptional.isEmpty()
? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) :
schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get()))
Expand All @@ -201,7 +207,7 @@ public Mono<HttpResponse<Void>> bulkDelete(String namespace,
);
return Mono.just(HttpResponse.noContent());
}))
.then(Mono.just(HttpResponse.noContent()));
.then(Mono.just(HttpResponse.ok(schemas)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,18 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Status(HttpStatus.OK)
@Delete
HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
HttpResponse<List<KafkaStream>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<KafkaStream> kafkaStreams = streamService.findByWildcardName(ns, name);

List<String> validationErrors = kafkaStreams.stream()
.filter(kafkaStream ->
!streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName()))
.map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName()))
.toList();
.filter(kafkaStream ->
!streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName()))
.map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(KAFKA_STREAM, name, validationErrors);
Expand All @@ -178,8 +177,9 @@ HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*")
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(kafkaStreams);
}

kafkaStreams.forEach(kafkaStream -> {
sendEventLog(
kafkaStream,
Expand All @@ -188,9 +188,10 @@ HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*")
null,
EMPTY_STRING
);

streamService.delete(ns, kafkaStream);
});

return HttpResponse.noContent();
return HttpResponse.ok(kafkaStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
* @return An HTTP response
*/
@Delete
@Status(HttpStatus.NO_CONTENT)
public HttpResponse<Void> bulkDelete(Authentication authentication, String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
@Status(HttpStatus.OK)
public HttpResponse<List<AccessControlEntry>> bulkDelete(Authentication authentication, String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {

Namespace ns = getNamespace(namespace);
List<AccessControlEntry> acls = aclService.findAllGrantedByNamespaceByWildcardName(ns, name);
Expand All @@ -192,7 +192,7 @@ public HttpResponse<Void> bulkDelete(Authentication authentication, String names
}

if (dryrun) {
return HttpResponse.noContent();
return HttpResponse.ok(acls);
}

acls.forEach(acl -> {
Expand All @@ -205,7 +205,7 @@ public HttpResponse<Void> bulkDelete(Authentication authentication, String names
aclService.delete(acl);
});

return HttpResponse.noContent();
return HttpResponse.ok(acls);
}

/**
Expand Down
Loading

0 comments on commit 662e773

Please sign in to comment.