From 4c1248f6c0b849465b40687727aabdb5d6e1981d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Sep 2024 17:51:32 +0200 Subject: [PATCH 01/11] Bump org.mockito:mockito-junit-jupiter from 5.12.0 to 5.13.0 (#435) Bumps [org.mockito:mockito-junit-jupiter](https://github.com/mockito/mockito) from 5.12.0 to 5.13.0. - [Release notes](https://github.com/mockito/mockito/releases) - [Commits](https://github.com/mockito/mockito/compare/v5.12.0...v5.13.0) --- updated-dependencies: - dependency-name: org.mockito:mockito-junit-jupiter dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 038497b3..62b4656a 100644 --- a/build.gradle +++ b/build.gradle @@ -49,7 +49,7 @@ dependencies { testImplementation("org.testcontainers:junit-jupiter:1.20.0") testImplementation("org.testcontainers:testcontainers:1.20.0") testImplementation("org.testcontainers:kafka:1.20.0") - testImplementation("org.mockito:mockito-junit-jupiter:5.12.0") + testImplementation("org.mockito:mockito-junit-jupiter:5.13.0") testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0") testImplementation("io.projectreactor:reactor-test") From be1db73b89f942a525b5a397d07c1c910b828857 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:05:46 +0200 Subject: [PATCH 02/11] Bump org.testcontainers:kafka from 1.20.0 to 1.20.1 (#436) Bumps [org.testcontainers:kafka](https://github.com/testcontainers/testcontainers-java) from 1.20.0 to 1.20.1. - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.0...1.20.1) --- updated-dependencies: - dependency-name: org.testcontainers:kafka dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 62b4656a..217fc7a6 100644 --- a/build.gradle +++ b/build.gradle @@ -48,7 +48,7 @@ dependencies { testImplementation("org.mockito:mockito-core") testImplementation("org.testcontainers:junit-jupiter:1.20.0") testImplementation("org.testcontainers:testcontainers:1.20.0") - testImplementation("org.testcontainers:kafka:1.20.0") + testImplementation("org.testcontainers:kafka:1.20.1") testImplementation("org.mockito:mockito-junit-jupiter:5.13.0") testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0") testImplementation("io.projectreactor:reactor-test") From 64c9369d0a6899aa8ebd7e0845d752dce3e4c25c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:05:53 +0200 Subject: [PATCH 03/11] Bump org.testcontainers:junit-jupiter from 1.20.0 to 1.20.1 (#437) Bumps [org.testcontainers:junit-jupiter](https://github.com/testcontainers/testcontainers-java) from 1.20.0 to 1.20.1. - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.0...1.20.1) --- updated-dependencies: - dependency-name: org.testcontainers:junit-jupiter dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 217fc7a6..18a2b3d0 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,7 @@ dependencies { runtimeOnly("ch.qos.logback:logback-classic") testImplementation("org.mockito:mockito-core") - testImplementation("org.testcontainers:junit-jupiter:1.20.0") + testImplementation("org.testcontainers:junit-jupiter:1.20.1") testImplementation("org.testcontainers:testcontainers:1.20.0") testImplementation("org.testcontainers:kafka:1.20.1") testImplementation("org.mockito:mockito-junit-jupiter:5.13.0") From 2211969e43d959e4cd9fc5ae82f9fdd4f6df789f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:06:30 +0200 Subject: [PATCH 04/11] Bump pl.allegro.tech.build.axion-release from 1.18.5 to 1.18.7 (#439) Bumps [pl.allegro.tech.build.axion-release](https://github.com/allegro/axion-release-plugin) from 1.18.5 to 1.18.7. - [Release notes](https://github.com/allegro/axion-release-plugin/releases) - [Commits](https://github.com/allegro/axion-release-plugin/compare/v1.18.5...v1.18.7) --- updated-dependencies: - dependency-name: pl.allegro.tech.build.axion-release dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 18a2b3d0..fd20bcb0 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ plugins { id("io.micronaut.application") version "4.4.2" id("jacoco") id("org.sonarqube") version "5.1.0.4882" - id("pl.allegro.tech.build.axion-release") version "1.18.5" + id("pl.allegro.tech.build.axion-release") version "1.18.7" id("checkstyle") } From 87eb7f43216562b0f8e12f8beabf89f7a0f20527 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:28:47 +0200 Subject: [PATCH 05/11] Bump org.testcontainers:testcontainers from 1.20.0 to 1.20.1 (#438) Bumps [org.testcontainers:testcontainers](https://github.com/testcontainers/testcontainers-java) from 1.20.0 to 1.20.1. - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.0...1.20.1) --- updated-dependencies: - dependency-name: org.testcontainers:testcontainers dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index fd20bcb0..102254ed 100644 --- a/build.gradle +++ b/build.gradle @@ -47,7 +47,7 @@ dependencies { testImplementation("org.mockito:mockito-core") testImplementation("org.testcontainers:junit-jupiter:1.20.1") - testImplementation("org.testcontainers:testcontainers:1.20.0") + testImplementation("org.testcontainers:testcontainers:1.20.1") testImplementation("org.testcontainers:kafka:1.20.1") testImplementation("org.mockito:mockito-junit-jupiter:5.13.0") testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0") From d529e596c8623caea01d0041d330f121e176a925 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:30:05 +0200 Subject: [PATCH 06/11] Bump pl.allegro.tech.build.axion-release from 1.18.7 to 1.18.8 (#440) Bumps [pl.allegro.tech.build.axion-release](https://github.com/allegro/axion-release-plugin) from 1.18.7 to 1.18.8. - [Release notes](https://github.com/allegro/axion-release-plugin/releases) - [Commits](https://github.com/allegro/axion-release-plugin/compare/v1.18.7...v1.18.8) --- updated-dependencies: - dependency-name: pl.allegro.tech.build.axion-release dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 102254ed..020da2e2 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ plugins { id("io.micronaut.application") version "4.4.2" id("jacoco") id("org.sonarqube") version "5.1.0.4882" - id("pl.allegro.tech.build.axion-release") version "1.18.7" + id("pl.allegro.tech.build.axion-release") version "1.18.8" id("checkstyle") } From eb5eb065c03148bfc6576b10459b1eb949beeb27 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 16 Sep 2024 17:15:39 +0200 Subject: [PATCH 07/11] Bump io.confluent:kafka-schema-registry-client from 7.7.0 to 7.7.1 (#441) Bumps [io.confluent:kafka-schema-registry-client](https://github.com/confluentinc/schema-registry) from 7.7.0 to 7.7.1. - [Commits](https://github.com/confluentinc/schema-registry/commits) --- updated-dependencies: - dependency-name: io.confluent:kafka-schema-registry-client dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 020da2e2..f4b9303d 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,7 @@ dependencies { implementation("io.swagger.core.v3:swagger-annotations") implementation("jakarta.annotation:jakarta.annotation-api") implementation("jakarta.validation:jakarta.validation-api") - implementation('io.confluent:kafka-schema-registry-client:7.7.0') + implementation('io.confluent:kafka-schema-registry-client:7.7.1') compileOnly("org.projectlombok:lombok") From 9bf19908ee52c70a36d9ee19d196c8920940f230 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 23 Sep 2024 20:49:10 +0200 Subject: [PATCH 08/11] Bump pl.allegro.tech.build.axion-release from 1.18.8 to 1.18.9 (#445) Bumps [pl.allegro.tech.build.axion-release](https://github.com/allegro/axion-release-plugin) from 1.18.8 to 1.18.9. - [Release notes](https://github.com/allegro/axion-release-plugin/releases) - [Commits](https://github.com/allegro/axion-release-plugin/compare/v1.18.8...v1.18.9) --- updated-dependencies: - dependency-name: pl.allegro.tech.build.axion-release dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index f4b9303d..45cee23d 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ plugins { id("io.micronaut.application") version "4.4.2" id("jacoco") id("org.sonarqube") version "5.1.0.4882" - id("pl.allegro.tech.build.axion-release") version "1.18.8" + id("pl.allegro.tech.build.axion-release") version "1.18.9" id("checkstyle") } From 5286f9849679d3c01aab9577d9b61f72dda4d97f Mon Sep 17 00:00:00 2001 From: adriencalime <110117127+adriencalime@users.noreply.github.com> Date: Wed, 25 Sep 2024 00:16:33 +0200 Subject: [PATCH 09/11] Handle wildcard parameter in namespace deletion API (#442) --- .../controller/NamespaceController.java | 62 +++++++++- .../controller/NamespaceControllerTest.java | 117 ++++++++++++++++++ 2 files changed, 173 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java b/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java index 1ee8be86..5ea2167e 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java @@ -120,8 +120,10 @@ public HttpResponse apply(@Valid @Body Namespace namespace, * @param namespace The namespace * @param dryrun Is dry run mode or not ? * @return An HTTP response + * @deprecated use bulkDelete instead. */ @Delete("/{namespace}{?dryrun}") + @Deprecated(since = "1.13.0") public HttpResponse delete(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) { Optional optionalNamespace = namespaceService.findByName(namespace); if (optionalNamespace.isEmpty()) { @@ -141,17 +143,65 @@ public HttpResponse delete(String namespace, @QueryValue(defaultValue = "f return HttpResponse.noContent(); } - var namespaceToDelete = optionalNamespace.get(); + performDeletion(optionalNamespace.get()); + return HttpResponse.noContent(); + } + + /** + * Delete namespaces. + * + * @param dryrun Is dry run mode or not ? + * @param name The name parameter + * @return An HTTP response + */ + @Delete + public HttpResponse bulkDelete(@QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + List namespaces = namespaceService.findByWildcardName(name); + if (namespaces.isEmpty()) { + return HttpResponse.notFound(); + } + + List namespaceResources = namespaces + .stream() + .flatMap(namespace -> namespaceService.findAllResourcesByNamespace(namespace) + .stream()) + .toList(); + + if (!namespaceResources.isEmpty()) { + List validationErrors = namespaceResources + .stream() + .map(FormatErrorUtils::invalidNamespaceDeleteOperation) + .toList(); + + throw new ResourceValidationException( + NAMESPACE, + String.join(",", namespaces.stream().map(namespace -> namespace.getMetadata().getName()).toList()), + validationErrors + ); + } + if (dryrun) { + return HttpResponse.noContent(); + } + + namespaces.forEach(this::performDeletion); + return HttpResponse.noContent(); + } + + /** + * Perform the deletion of the namespace and send an event log. + * + * @param namespace The namespace to delete + */ + private void performDeletion(Namespace namespace) { sendEventLog( - namespaceToDelete, + namespace, ApplyStatus.deleted, - namespaceToDelete.getSpec(), + namespace.getSpec(), null, EMPTY_STRING ); - - namespaceService.delete(optionalNamespace.get()); - return HttpResponse.noContent(); + namespaceService.delete(namespace); } } diff --git a/src/test/java/com/michelin/ns4kafka/controller/NamespaceControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/NamespaceControllerTest.java index d992c1e7..078e0cb9 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/NamespaceControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/NamespaceControllerTest.java @@ -291,6 +291,7 @@ void shouldUpdateNamespaceInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteNamespace() { Namespace existing = Namespace.builder() .metadata(Metadata.builder() @@ -317,6 +318,7 @@ void shouldDeleteNamespace() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteNamespaceInDryRunMode() { Namespace existing = Namespace.builder() .metadata(Metadata.builder() @@ -340,6 +342,7 @@ void shouldDeleteNamespaceInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteNamespaceWhenNotFound() { when(namespaceService.findByName("namespace")) .thenReturn(Optional.empty()); @@ -351,6 +354,7 @@ void shouldNotDeleteNamespaceWhenNotFound() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteNamespaceWhenResourcesAreStillLinkedWithIt() { Namespace existing = Namespace.builder() .metadata(Metadata.builder() @@ -371,4 +375,117 @@ void shouldNotDeleteNamespaceWhenResourcesAreStillLinkedWithIt() { () -> namespaceController.delete("namespace", false)); verify(namespaceService, never()).delete(any()); } + + @Test + void shouldDeleteNamespaces() { + Namespace namespace1 = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace1") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .kafkaUser("user") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace2") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .kafkaUser("user") + .build()) + .build(); + + when(namespaceService.findByWildcardName("namespace*")) + .thenReturn(List.of(namespace1, namespace2)); + when(namespaceService.findAllResourcesByNamespace(namespace1)) + .thenReturn(List.of()); + when(namespaceService.findAllResourcesByNamespace(namespace2)) + .thenReturn(List.of()); + when(securityService.username()) + .thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)) + .thenReturn(false); + + doNothing().when(applicationEventPublisher).publishEvent(any()); + var result = namespaceController.bulkDelete("namespace*", false); + assertEquals(HttpResponse.noContent().getStatus(), result.getStatus()); + } + + @Test + void shouldDeleteNamespacesInDryRunMode() { + Namespace namespace1 = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace1") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .kafkaUser("user") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace2") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .kafkaUser("user") + .build()) + .build(); + + when(namespaceService.findByWildcardName("namespace*")) + .thenReturn(List.of(namespace1, namespace2)); + when(namespaceService.findAllResourcesByNamespace(namespace1)) + .thenReturn(List.of()); + when(namespaceService.findAllResourcesByNamespace(namespace2)) + .thenReturn(List.of()); + + var result = namespaceController.bulkDelete("namespace*", true); + verify(namespaceService, never()).delete(any()); + assertEquals(HttpResponse.noContent().getStatus(), result.getStatus()); + } + + @Test + void shouldNotDeleteNamespacesWhenResourcesAreStillLinkedWithIt() { + Namespace namespace1 = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace1") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .kafkaUser("user") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace2") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .kafkaUser("user") + .build()) + .build(); + + when(namespaceService.findByWildcardName("namespace*")) + .thenReturn(List.of(namespace1, namespace2)); + when(namespaceService.findAllResourcesByNamespace(namespace1)) + .thenReturn(List.of("Topic/topic1")); + when(namespaceService.findAllResourcesByNamespace(namespace2)) + .thenReturn(List.of()); + + assertThrows(ResourceValidationException.class, + () -> namespaceController.bulkDelete("namespace*", false)); + verify(namespaceService, never()).delete(any()); + } + + @Test + void shouldNotDeleteNamespacesWhenPatternMatchesNothing() { + when(namespaceService.findByWildcardName("namespace*")).thenReturn(List.of()); + var result = namespaceController.bulkDelete("namespace*", false); + assertEquals(HttpResponse.notFound().getStatus(), result.getStatus()); + } } From e0afe72a84095d13e833fb128480b8cd5d9b9df6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Mon, 30 Sep 2024 15:47:22 +0200 Subject: [PATCH 10/11] Fix restart operation not restarting all connector tasks (#448) --- .../ns4kafka/service/ConnectorService.java | 22 +++---- .../client/connect/KafkaConnectClient.java | 7 +- .../connect/entities/ConnectorStateInfo.java | 3 +- .../service/ConnectorServiceTest.java | 65 ++++++++++++++++--- 4 files changed, 71 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index cb307201..575c50ef 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -256,20 +256,14 @@ public Flux listUnsynchronizedConnectors(Namespace namespace) { public Mono> restart(Namespace namespace, Connector connector) { return kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(), connector.getMetadata().getName()) - .flatMap(status -> { - Flux> responses = Flux.fromIterable(status.tasks()) - .flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(), - connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId())) - .map(response -> { - log.info("Success restarting connector [{}] on namespace [{}] connect [{}]", - connector.getMetadata().getName(), - namespace.getMetadata().getName(), - connector.getSpec().getConnectCluster()); - return HttpResponse.ok(); - }); - - return Mono.from(responses); - }); + .flatMap(status -> Flux.fromIterable(status.tasks()) + .flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId())) + .doOnNext(restart -> log.info("Success restarting connector [{}] on namespace [{}] connect [{}]", + connector.getMetadata().getName(), + namespace.getMetadata().getName(), + connector.getSpec().getConnectCluster())) + .then(Mono.just(HttpResponse.ok()))); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java index dca32ed1..3f2a9818 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java @@ -37,13 +37,17 @@ @Singleton public class KafkaConnectClient { private static final String CONNECTORS = "/connectors/"; + @Inject ConnectClusterRepository connectClusterRepository; + @Inject @Client(id = "kafka-connect") private HttpClient httpClient; + @Inject private List managedClusterProperties; + @Inject private SecurityProperties securityProperties; @@ -222,7 +226,8 @@ public Mono> resume(String kafkaCluster, String connectCluste * @return The Kafka Connect configuration */ public KafkaConnectClient.KafkaConnectHttpConfig getKafkaConnectConfig(String kafkaCluster, String connectCluster) { - Optional config = managedClusterProperties.stream() + Optional config = managedClusterProperties + .stream() .filter(kafkaAsyncExecutorConfig -> kafkaAsyncExecutorConfig.getName().equals(kafkaCluster)) .findFirst(); diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java index 32a4bbc6..d639aa45 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorStateInfo.java @@ -67,7 +67,8 @@ public ConnectorState(@JsonProperty("state") String state, @JsonProperty("worker public static class TaskState extends AbstractState implements Comparable { private final int id; - public TaskState(@JsonProperty("id") int id, @JsonProperty("state") String state, + public TaskState(@JsonProperty("id") int id, + @JsonProperty("state") String state, @JsonProperty("worker_id") String worker, @JsonProperty("msg") String msg) { super(state, worker, msg); diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index db5dd59d..447f84c1 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -1,7 +1,11 @@ package com.michelin.ns4kafka.service; +import static com.michelin.ns4kafka.service.client.connect.entities.ConnectorType.SOURCE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -21,6 +25,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConfigKeyInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConfigValueInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorType; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; import com.michelin.ns4kafka.validation.ConnectValidator; @@ -35,7 +40,6 @@ import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentMatchers; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -868,11 +872,7 @@ void shouldNotValidateRemotelyWhenErrorHappens() { List.of(new ConfigInfo(new ConfigKeyInfo(null, null, false, null, null, null, null, 0, null, null, null), new ConfigValueInfo(null, null, null, List.of("error_message"), true)))); - when(kafkaConnectClient.validate( - ArgumentMatchers.eq("local"), - ArgumentMatchers.eq("local-name"), - ArgumentMatchers.any(), - ArgumentMatchers.any())) + when(kafkaConnectClient.validate(eq("local"), eq("local-name"), any(), any())) .thenReturn(Mono.just(configInfos)); StepVerifier.create(connectorService.validateRemotely(ns, connector)) @@ -908,10 +908,10 @@ void shouldValidateRemotely() { ConfigInfos configInfos = new ConfigInfos("name", 1, List.of(), List.of()); when(kafkaConnectClient.validate( - ArgumentMatchers.eq("local"), - ArgumentMatchers.eq("local-name"), - ArgumentMatchers.any(), - ArgumentMatchers.any())) + eq("local"), + eq("local-name"), + any(), + any())) .thenReturn(Mono.just(configInfos)); StepVerifier.create(connectorService.validateRemotely(ns, connector)) @@ -1305,4 +1305,49 @@ void shouldNotDeleteConnectorWhenConnectClusterReturnsError() { verify(connectorRepository, never()).delete(connector); } + + @Test + void shouldRestartAllTasksOfConnector() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Metadata.builder() + .name("ns-connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .connectCluster("local-name") + .build()) + .build(); + + when(kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(), + connector.getMetadata().getName())) + .thenReturn(Mono.just(new ConnectorStateInfo( + "connector", + new ConnectorStateInfo.ConnectorState("RUNNING", "worker", "message"), + List.of( + new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "message"), + new ConnectorStateInfo.TaskState(1, "RUNNING", "worker", "message"), + new ConnectorStateInfo.TaskState(2, "RUNNING", "worker", "message") + ), + SOURCE))); + + when(kafkaConnectClient.restart(any(), any(), any(), anyInt())) + .thenReturn(Mono.just(HttpResponse.ok())); + + StepVerifier.create(connectorService.restart(namespace, connector)) + .consumeNextWith(response -> assertEquals(HttpStatus.OK, response.getStatus())) + .verifyComplete(); + + verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 0); + verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 1); + verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 2); + } } From 9152742092f89edf2638deb7b18b4ef1e2181b4e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 30 Sep 2024 16:51:40 +0200 Subject: [PATCH 11/11] Bump org.mockito:mockito-junit-jupiter from 5.13.0 to 5.14.1 (#453) Bumps [org.mockito:mockito-junit-jupiter](https://github.com/mockito/mockito) from 5.13.0 to 5.14.1. - [Release notes](https://github.com/mockito/mockito/releases) - [Commits](https://github.com/mockito/mockito/compare/v5.13.0...v5.14.1) --- updated-dependencies: - dependency-name: org.mockito:mockito-junit-jupiter dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 45cee23d..f7b93c47 100644 --- a/build.gradle +++ b/build.gradle @@ -49,7 +49,7 @@ dependencies { testImplementation("org.testcontainers:junit-jupiter:1.20.1") testImplementation("org.testcontainers:testcontainers:1.20.1") testImplementation("org.testcontainers:kafka:1.20.1") - testImplementation("org.mockito:mockito-junit-jupiter:5.13.0") + testImplementation("org.mockito:mockito-junit-jupiter:5.14.1") testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0") testImplementation("io.projectreactor:reactor-test")