diff --git a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java index 787bc3cb..badefe55 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java @@ -12,7 +12,6 @@ import com.michelin.ns4kafka.service.ResourceQuotaService; import com.michelin.ns4kafka.util.enumation.ApplyStatus; import com.michelin.ns4kafka.util.exception.ResourceValidationException; -import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MutableHttpResponse; @@ -89,7 +88,7 @@ public Mono> apply(String namespace, @Valid @Body Connec if (!connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) { return Mono.error(new ResourceValidationException(connector, - invalidOwner(connector.getMetadata().getName()))); + invalidOwner(connector.getMetadata().getName()))); } // Set / Override name in spec.config.name, required for several Kafka Connect API calls @@ -104,58 +103,58 @@ public Mono> apply(String namespace, @Valid @Body Connec // Validate locally return connectorService.validateLocally(ns, connector) - .flatMap(validationErrors -> { - if (!validationErrors.isEmpty()) { - return Mono.error(new ResourceValidationException(connector, validationErrors)); - } - - // Validate against connect rest API /validate - return connectorService.validateRemotely(ns, connector) - .flatMap(remoteValidationErrors -> { - if (!remoteValidationErrors.isEmpty()) { - return Mono.error( - new ResourceValidationException(connector, remoteValidationErrors)); - } - - // Augment with server side fields - connector.getMetadata().setCreationTimestamp(Date.from(Instant.now())); - connector.getMetadata().setCluster(ns.getMetadata().getCluster()); - connector.getMetadata().setNamespace(ns.getMetadata().getName()); - connector.setStatus(Connector.ConnectorStatus.builder() - .state(Connector.TaskState.UNASSIGNED) - .build()); - - Optional existingConnector = - connectorService.findByName(ns, connector.getMetadata().getName()); - if (existingConnector.isPresent() && existingConnector.get().equals(connector)) { - return Mono.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged)); - } - - ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created; - - // Only check quota on connector creation - if (status.equals(ApplyStatus.created)) { - List quotaErrors = resourceQuotaService.validateConnectorQuota(ns); - if (!quotaErrors.isEmpty()) { - return Mono.error(new ResourceValidationException(connector, quotaErrors)); - } - } - - if (dryrun) { - return Mono.just(formatHttpResponse(connector, status)); - } - - sendEventLog( - connector, - status, - existingConnector.map(Connector::getSpec).orElse(null), - connector.getSpec(), - EMPTY_STRING - ); - - return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status)); - }); - }); + .flatMap(validationErrors -> { + if (!validationErrors.isEmpty()) { + return Mono.error(new ResourceValidationException(connector, validationErrors)); + } + + // Validate against connect rest API /validate + return connectorService.validateRemotely(ns, connector) + .flatMap(remoteValidationErrors -> { + if (!remoteValidationErrors.isEmpty()) { + return Mono.error( + new ResourceValidationException(connector, remoteValidationErrors)); + } + + // Augment with server side fields + connector.getMetadata().setCreationTimestamp(Date.from(Instant.now())); + connector.getMetadata().setCluster(ns.getMetadata().getCluster()); + connector.getMetadata().setNamespace(ns.getMetadata().getName()); + connector.setStatus(Connector.ConnectorStatus.builder() + .state(Connector.TaskState.UNASSIGNED) + .build()); + + Optional existingConnector = + connectorService.findByName(ns, connector.getMetadata().getName()); + if (existingConnector.isPresent() && existingConnector.get().equals(connector)) { + return Mono.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged)); + } + + ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created; + + // Only check quota on connector creation + if (status.equals(ApplyStatus.created)) { + List quotaErrors = resourceQuotaService.validateConnectorQuota(ns); + if (!quotaErrors.isEmpty()) { + return Mono.error(new ResourceValidationException(connector, quotaErrors)); + } + } + + if (dryrun) { + return Mono.just(formatHttpResponse(connector, status)); + } + + sendEventLog( + connector, + status, + existingConnector.map(Connector::getSpec).orElse(null), + connector.getSpec(), + EMPTY_STRING + ); + + return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status)); + }); + }); } /** @@ -165,9 +164,11 @@ public Mono> apply(String namespace, @Valid @Body Connec * @param connector The current connector name to delete * @param dryrun Run in dry mode or not * @return A HTTP response + * @deprecated use bulkDelete instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{connector}{?dryrun}") + @Deprecated(since = "1.13.0") public Mono> delete(String namespace, String connector, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); @@ -189,16 +190,58 @@ public Mono> delete(String namespace, String connector, Connector connectorToDelete = optionalConnector.get(); sendEventLog( - connectorToDelete, - ApplyStatus.deleted, - connectorToDelete.getSpec(), - null, - EMPTY_STRING + connectorToDelete, + ApplyStatus.deleted, + connectorToDelete.getSpec(), + null, + EMPTY_STRING ); return connectorService - .delete(ns, optionalConnector.get()) - .map(httpResponse -> HttpResponse.noContent()); + .delete(ns, optionalConnector.get()) + .map(httpResponse -> HttpResponse.noContent()); + } + + /** + * Delete connectors. + * + * @param namespace The current namespace + * @param name The name parameter + * @param dryrun Run in dry mode or not + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public Mono> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + List connectors = connectorService.findByWildcardName(ns, name); + + // Validate ownership + List validationErrors = connectors.stream() + .filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) + .map(connector -> invalidOwner(connector.getMetadata().getName())) + .toList(); + + if (!validationErrors.isEmpty()) { + return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors)); + } + + if (connectors.isEmpty()) { + return Mono.just(HttpResponse.notFound()); + } + + if (dryrun) { + return Mono.just(HttpResponse.noContent()); + } + + return Flux.fromIterable(connectors) + .flatMap(connector -> { + sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING); + return connectorService.delete(ns, connector); + }) + .then(Mono.just(HttpResponse.noContent())); } /** @@ -232,30 +275,30 @@ public Mono> changeState(String namesp case resume -> response = connectorService.resume(ns, optionalConnector.get()); default -> { return Mono.error( - new IllegalStateException("Unspecified action " + state.getSpec().getAction())); + new IllegalStateException("Unspecified action " + state.getSpec().getAction())); } } return response - .doOnSuccess(success -> { - state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder() - .success(true) - .code(success.status()) - .build()); - state.setMetadata(optionalConnector.get().getMetadata()); - state.getMetadata().setCreationTimestamp(Date.from(Instant.now())); - }) - .doOnError(error -> { - state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder() - .success(false) - .code(HttpStatus.INTERNAL_SERVER_ERROR) - .errorMessage(error.getMessage()) - .build()); - state.setMetadata(optionalConnector.get().getMetadata()); - state.getMetadata().setCreationTimestamp(Date.from(Instant.now())); - }) - .map(httpResponse -> HttpResponse.ok(state)) - .onErrorReturn(HttpResponse.ok(state)); + .doOnSuccess(success -> { + state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder() + .success(true) + .code(success.status()) + .build()); + state.setMetadata(optionalConnector.get().getMetadata()); + state.getMetadata().setCreationTimestamp(Date.from(Instant.now())); + }) + .doOnError(error -> { + state.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder() + .success(false) + .code(HttpStatus.INTERNAL_SERVER_ERROR) + .errorMessage(error.getMessage()) + .build()); + state.setMetadata(optionalConnector.get().getMetadata()); + state.getMetadata().setCreationTimestamp(Date.from(Instant.now())); + }) + .map(httpResponse -> HttpResponse.ok(state)) + .onErrorReturn(HttpResponse.ok(state)); } /** @@ -269,24 +312,24 @@ public Mono> changeState(String namesp public Flux importResources(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); return connectorService.listUnsynchronizedConnectors(ns) - .map(unsynchronizedConnector -> { - unsynchronizedConnector.getMetadata().setCreationTimestamp(Date.from(Instant.now())); - unsynchronizedConnector.getMetadata().setCluster(ns.getMetadata().getCluster()); - unsynchronizedConnector.getMetadata().setNamespace(ns.getMetadata().getName()); - - if (dryrun) { - return unsynchronizedConnector; - } - - sendEventLog( - unsynchronizedConnector, - ApplyStatus.created, - null, - unsynchronizedConnector.getSpec(), - EMPTY_STRING - ); - - return connectorService.createOrUpdate(unsynchronizedConnector); - }); + .map(unsynchronizedConnector -> { + unsynchronizedConnector.getMetadata().setCreationTimestamp(Date.from(Instant.now())); + unsynchronizedConnector.getMetadata().setCluster(ns.getMetadata().getCluster()); + unsynchronizedConnector.getMetadata().setNamespace(ns.getMetadata().getName()); + + if (dryrun) { + return unsynchronizedConnector; + } + + sendEventLog( + unsynchronizedConnector, + ApplyStatus.created, + null, + unsynchronizedConnector.getSpec(), + EMPTY_STRING + ); + + return connectorService.createOrUpdate(unsynchronizedConnector); + }); } -} +} \ No newline at end of file diff --git a/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java index 3e0a4cef..4e1d24eb 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java @@ -62,16 +62,16 @@ class ConnectorControllerTest { @Test void shouldListConnectorsWhenEmpty() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.findByWildcardName(ns, "*")) - .thenReturn(List.of()); + .thenReturn(List.of()); assertTrue(connectorController.list("test", "*").isEmpty()); } @@ -79,11 +79,11 @@ void shouldListConnectorsWhenEmpty() { @Test void shouldListConnectorsWithWildcard() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build(); @@ -97,11 +97,11 @@ void shouldListConnectorsWithWildcard() { @Test void shouldListConnectorWithNameParameter() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); @@ -115,16 +115,16 @@ void shouldListConnectorWithNameParameter() { @SuppressWarnings("deprecation") void shouldGetConnectorWhenEmpty() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.findByName(ns, "missing")) - .thenReturn(Optional.empty()); + .thenReturn(Optional.empty()); Optional actual = connectorController.get("test", "missing"); assertTrue(actual.isEmpty()); @@ -134,21 +134,21 @@ void shouldGetConnectorWhenEmpty() { @SuppressWarnings("deprecation") void shouldGetConnector() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of( - Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build())); + .thenReturn(Optional.of( + Connector.builder() + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build())); Optional actual = connectorController.get("test", "connect1"); assertTrue(actual.isPresent()); @@ -156,355 +156,491 @@ void shouldGetConnector() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectorWhenNotOwned() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(false); + .thenReturn(false); StepVerifier.create(connectorController.delete("test", "connect1", false)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); } @Test + @SuppressWarnings("deprecation") void shouldDeleteConnector() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); when(connectorService.delete(ns, connector)) - .thenReturn(Mono.just(HttpResponse.noContent())); + .thenReturn(Mono.just(HttpResponse.noContent())); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); doNothing().when(applicationEventPublisher).publishEvent(any()); StepVerifier.create(connectorController.delete("test", "connect1", false)) - .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) - .verifyComplete(); + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); } @Test + @SuppressWarnings("deprecation") void shouldDeleteConnectorInDryRunMode() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); StepVerifier.create(connectorController.delete("test", "connect1", true)) - .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) - .verifyComplete(); + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); verify(connectorService, never()).delete(any(), any()); } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectorWhenNotFound() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.empty()); + .thenReturn(Optional.empty()); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); StepVerifier.create(connectorController.delete("test", "connect1", true)) - .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) - .verifyComplete(); + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(connectorService, never()).delete(any(), any()); + } + + @Test + void shouldDeleteConnectors() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); + Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build(); + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) + .thenReturn(true); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2")) + .thenReturn(true); + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of(connector1, connector2)); + when(connectorService.delete(ns, connector1)) + .thenReturn(Mono.just(HttpResponse.noContent())); + when(connectorService.delete(ns, connector2)) + .thenReturn(Mono.just(HttpResponse.noContent())); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + } + + @Test + void shouldNotDeleteConnectorsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of()); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); verify(connectorService, never()).delete(any(), any()); } + @Test + void shouldDeleteConnectorsInDryRunMode() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector1 = Connector.builder() + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); + + Connector connector2 = Connector.builder() + .metadata(Metadata.builder() + .name("connect2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of(connector1, connector2)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) + .thenReturn(true); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2")) + .thenReturn(true); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(connectorService, never()).delete(any(), any()); + } + + @Test + void shouldNotDeleteConnectorsWhenNotOwned() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector1 = Connector.builder() + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); + + Connector connector2 = Connector.builder() + .metadata(Metadata.builder() + .name("connect2") + .build()) + .build(); + + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of(connector1, connector2)); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) + .thenReturn(false); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2")) + .thenReturn(true); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", false)) + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals( + "Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); + } + @Test void shouldNotCreateConnectorWhenNotOwner() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(false); + .thenReturn(false); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); } @Test void shouldNotCreateConnectorWhenNotValidatedLocally() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of("Local Validation Error 1"))); + .thenReturn(Mono.just(List.of("Local Validation Error 1"))); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Local Validation Error 1", - ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Local Validation Error 1", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); } @Test void shouldNotCreateConnectorWhenNotValidatedRemotely() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.validateRemotely(ns, connector)) - .thenReturn(Mono.just(List.of("Remote Validation Error 1"))); + .thenReturn(Mono.just(List.of("Remote Validation Error 1"))); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Remote Validation Error 1", - ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Remote Validation Error 1", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); } @Test void shouldCreateConnector() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Connector expected = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(Map.of("name", "connect1")) - .build()) - .status(Connector.ConnectorStatus.builder() - .state(Connector.TaskState.UNASSIGNED) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(Map.of("name", "connect1")) + .build()) + .status(Connector.ConnectorStatus.builder() + .state(Connector.TaskState.UNASSIGNED) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.validateRemotely(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(resourceQuotaService.validateConnectorQuota(any())) - .thenReturn(List.of()); + .thenReturn(List.of()); when(securityService.username()) - .thenReturn(Optional.of("test-user")); + .thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)) - .thenReturn(false); + .thenReturn(false); doNothing().when(applicationEventPublisher).publishEvent(any()); when(connectorService.createOrUpdate(connector)) - .thenReturn(expected); + .thenReturn(expected); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeNextWith(response -> { - assertEquals("created", response.header("X-Ns4kafka-Result")); - assertTrue(response.getBody().isPresent()); - assertEquals(expected.getStatus().getState(), response.getBody().get().getStatus().getState()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertEquals("created", response.header("X-Ns4kafka-Result")); + assertTrue(response.getBody().isPresent()); + assertEquals(expected.getStatus().getState(), response.getBody().get().getStatus().getState()); + }) + .verifyComplete(); } @Test void shouldNotCreateConnectorWhenQuotaValidationFails() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.validateRemotely(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(resourceQuotaService.validateConnectorQuota(ns)) - .thenReturn(List.of("Quota error")); + .thenReturn(List.of("Quota error")); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Quota error", ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Quota error", ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); } @Test void shouldConnectorBeUnchangedWhenAlreadyExists() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Connector expected = Connector.builder() - .metadata(Metadata.builder() - .namespace("test") - .cluster("local") - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(Map.of("name", "connect1")) - .build()) - .status(Connector.ConnectorStatus.builder() - .state(Connector.TaskState.UNASSIGNED) - .build()) - .build(); + .metadata(Metadata.builder() + .namespace("test") + .cluster("local") + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(Map.of("name", "connect1")) + .build()) + .status(Connector.ConnectorStatus.builder() + .state(Connector.TaskState.UNASSIGNED) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.validateRemotely(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeNextWith(response -> { - assertEquals("unchanged", response.header("X-Ns4kafka-Result")); - assertTrue(response.getBody().isPresent()); - assertEquals(expected.getStatus().getState(), response.getBody().get().getStatus().getState()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertEquals("unchanged", response.header("X-Ns4kafka-Result")); + assertTrue(response.getBody().isPresent()); + assertEquals(expected.getStatus().getState(), response.getBody().get().getStatus().getState()); + }) + .verifyComplete(); verify(connectorService, never()).createOrUpdate(ArgumentMatchers.any()); } @@ -512,96 +648,96 @@ void shouldConnectorBeUnchangedWhenAlreadyExists() { @Test void shouldChangeConnector() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Connector connectorOld = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .labels(Map.of("label", "labelValue")) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .labels(Map.of("label", "labelValue")) + .build()) + .build(); Connector expected = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .labels(Map.of("label", "labelValue")) - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(Map.of("name", "connect1")) - .build()) - .status(Connector.ConnectorStatus.builder() - .state(Connector.TaskState.UNASSIGNED) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .labels(Map.of("label", "labelValue")) + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(Map.of("name", "connect1")) + .build()) + .status(Connector.ConnectorStatus.builder() + .state(Connector.TaskState.UNASSIGNED) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.validateRemotely(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connectorOld)); + .thenReturn(Optional.of(connectorOld)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); doNothing().when(applicationEventPublisher).publishEvent(any()); when(connectorService.createOrUpdate(connector)) - .thenReturn(expected); + .thenReturn(expected); StepVerifier.create(connectorController.apply("test", connector, false)) - .consumeNextWith(response -> { - assertEquals("changed", response.header("X-Ns4kafka-Result")); - assertTrue(response.getBody().isPresent()); - assertEquals(expected.getStatus().getState(), response.getBody().get().getStatus().getState()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertEquals("changed", response.header("X-Ns4kafka-Result")); + assertTrue(response.getBody().isPresent()); + assertEquals(expected.getStatus().getState(), response.getBody().get().getStatus().getState()); + }) + .verifyComplete(); } @Test void shouldCreateConnectorInDryRunMode() { Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(Connector.ConnectorSpec.builder() - .config(new HashMap<>()) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .config(new HashMap<>()) + .build()) + .build(); Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.validateLocally(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); when(connectorService.validateRemotely(ns, connector)) - .thenReturn(Mono.just(List.of())); + .thenReturn(Mono.just(List.of())); StepVerifier.create(connectorController.apply("test", connector, true)) - .consumeNextWith(response -> assertEquals("created", response.header("X-Ns4kafka-Result"))) - .verifyComplete(); + .consumeNextWith(response -> assertEquals("created", response.header("X-Ns4kafka-Result"))) + .verifyComplete(); verify(connectorService, never()).createOrUpdate(connector); } @@ -609,73 +745,73 @@ void shouldCreateConnectorInDryRunMode() { @Test void shouldImportConnectors() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector1 = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); Connector connector2 = Connector.builder() - .metadata(Metadata.builder() - .name("connect2") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect2") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.listUnsynchronizedConnectors(ns)) - .thenReturn(Flux.fromIterable(List.of(connector1, connector2))); + .thenReturn(Flux.fromIterable(List.of(connector1, connector2))); when(connectorService.createOrUpdate(connector1)) - .thenReturn(connector1); + .thenReturn(connector1); when(connectorService.createOrUpdate(connector2)) - .thenReturn(connector2); + .thenReturn(connector2); StepVerifier.create(connectorController.importResources("test", false)) - .consumeNextWith(connect1 -> assertEquals("connect1", connect1.getMetadata().getName())) - .consumeNextWith(connect2 -> assertEquals("connect2", connect2.getMetadata().getName())) - .verifyComplete(); + .consumeNextWith(connect1 -> assertEquals("connect1", connect1.getMetadata().getName())) + .consumeNextWith(connect2 -> assertEquals("connect2", connect2.getMetadata().getName())) + .verifyComplete(); } @Test void shouldImportConnectorInDryRunMode() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector1 = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); Connector connector2 = Connector.builder() - .metadata(Metadata.builder() - .name("connect2") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect2") + .build()) + .build(); Connector connector3 = Connector.builder() - .metadata(Metadata.builder() - .name("connect3") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect3") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.listUnsynchronizedConnectors(ns)) - .thenReturn(Flux.fromIterable(List.of(connector1, connector2))); + .thenReturn(Flux.fromIterable(List.of(connector1, connector2))); StepVerifier.create(connectorController.importResources("test", true)) - .consumeNextWith(connect1 -> assertEquals("connect1", connect1.getMetadata().getName())) - .consumeNextWith(connect2 -> assertEquals("connect2", connect2.getMetadata().getName())) - .verifyComplete(); + .consumeNextWith(connect1 -> assertEquals("connect1", connect1.getMetadata().getName())) + .consumeNextWith(connect2 -> assertEquals("connect2", connect2.getMetadata().getName())) + .verifyComplete(); verify(connectorService, never()).createOrUpdate(connector1); verify(connectorService, never()).createOrUpdate(connector2); @@ -685,64 +821,64 @@ void shouldImportConnectorInDryRunMode() { @Test void shouldNotRestartConnectorWhenNotOwned() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(false); + .thenReturn(false); ChangeConnectorState restart = ChangeConnectorState.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() - .action(ChangeConnectorState.ConnectorAction.restart) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.restart) + .build()) + .build(); StepVerifier.create(connectorController.changeState("test", "connect1", restart)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); } @Test void shouldNotRestartConnectorWhenNotExist() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.empty()); + .thenReturn(Optional.empty()); ChangeConnectorState restart = ChangeConnectorState.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() - .action(ChangeConnectorState.ConnectorAction.restart) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.restart) + .build()) + .build(); StepVerifier.create(connectorController.changeState("test", "connect1", restart)) - .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) - .verifyComplete(); + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); verify(connectorService, never()).restart(ArgumentMatchers.any(), ArgumentMatchers.any()); } @@ -750,173 +886,173 @@ void shouldNotRestartConnectorWhenNotExist() { @Test void shouldHandleExceptionWhenRestartingConnector() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); when(connectorService.restart(ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn( - Mono.error(new HttpClientResponseException("Rebalancing", HttpResponse.status(HttpStatus.CONFLICT)))); + .thenReturn( + Mono.error(new HttpClientResponseException("Rebalancing", HttpResponse.status(HttpStatus.CONFLICT)))); ChangeConnectorState restart = ChangeConnectorState.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() - .action(ChangeConnectorState.ConnectorAction.restart) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.restart) + .build()) + .build(); StepVerifier.create(connectorController.changeState("test", "connect1", restart)) - .consumeNextWith(response -> { - assertTrue(response.getBody().isPresent()); - assertFalse(response.getBody().get().getStatus().isSuccess()); - assertNotNull(response.body()); - assertEquals("Rebalancing", response.body().getStatus().getErrorMessage()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertTrue(response.getBody().isPresent()); + assertFalse(response.getBody().get().getStatus().isSuccess()); + assertNotNull(response.body()); + assertEquals("Rebalancing", response.body().getStatus().getErrorMessage()); + }) + .verifyComplete(); } @Test void shouldRestartConnector() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); when(connectorService.restart(ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn(Mono.just(HttpResponse.noContent())); + .thenReturn(Mono.just(HttpResponse.noContent())); ChangeConnectorState changeConnectorState = ChangeConnectorState.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() - .action(ChangeConnectorState.ConnectorAction.restart) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.restart) + .build()) + .build(); StepVerifier.create(connectorController.changeState("test", "connect1", changeConnectorState)) - .consumeNextWith(response -> { - assertTrue(response.getBody().isPresent()); - assertTrue(response.getBody().get().getStatus().isSuccess()); - assertEquals(HttpStatus.NO_CONTENT, response.body().getStatus().getCode()); - assertEquals("connect1", response.body().getMetadata().getName()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertTrue(response.getBody().isPresent()); + assertTrue(response.getBody().get().getStatus().isSuccess()); + assertEquals(HttpStatus.NO_CONTENT, response.body().getStatus().getCode()); + assertEquals("connect1", response.body().getMetadata().getName()); + }) + .verifyComplete(); } @Test void shouldPauseConnector() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); when(connectorService.pause(ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn(Mono.just(HttpResponse.noContent())); + .thenReturn(Mono.just(HttpResponse.noContent())); ChangeConnectorState changeConnectorState = ChangeConnectorState.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() - .action(ChangeConnectorState.ConnectorAction.pause) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.pause) + .build()) + .build(); StepVerifier.create(connectorController.changeState("test", "connect1", changeConnectorState)) - .consumeNextWith(response -> { - assertTrue(response.getBody().isPresent()); - assertTrue(response.getBody().get().getStatus().isSuccess()); - assertEquals(HttpStatus.NO_CONTENT, response.body().getStatus().getCode()); - assertEquals("connect1", response.body().getMetadata().getName()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertTrue(response.getBody().isPresent()); + assertTrue(response.getBody().get().getStatus().isSuccess()); + assertEquals(HttpStatus.NO_CONTENT, response.body().getStatus().getCode()); + assertEquals("connect1", response.body().getMetadata().getName()); + }) + .verifyComplete(); } @Test void shouldResumeConnector() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); Connector connector = Connector.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) - .thenReturn(true); + .thenReturn(true); when(connectorService.findByName(ns, "connect1")) - .thenReturn(Optional.of(connector)); + .thenReturn(Optional.of(connector)); when(connectorService.resume(ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn(Mono.just(HttpResponse.noContent())); + .thenReturn(Mono.just(HttpResponse.noContent())); ChangeConnectorState changeConnectorState = ChangeConnectorState.builder() - .metadata(Metadata.builder() - .name("connect1") - .build()) - .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() - .action(ChangeConnectorState.ConnectorAction.resume) - .build()) - .build(); + .metadata(Metadata.builder() + .name("connect1") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.resume) + .build()) + .build(); StepVerifier.create(connectorController.changeState("test", "connect1", changeConnectorState)) - .consumeNextWith(response -> { - assertTrue(response.getBody().isPresent()); - assertTrue(response.getBody().get().getStatus().isSuccess()); - assertEquals(HttpStatus.NO_CONTENT, response.body().getStatus().getCode()); - assertEquals("connect1", response.body().getMetadata().getName()); - }) - .verifyComplete(); + .consumeNextWith(response -> { + assertTrue(response.getBody().isPresent()); + assertTrue(response.getBody().get().getStatus().isSuccess()); + assertEquals(HttpStatus.NO_CONTENT, response.body().getStatus().getCode()); + assertEquals("connect1", response.body().getMetadata().getName()); + }) + .verifyComplete(); } -} +} \ No newline at end of file