Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix EventLog display with resource version #454

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
id("io.micronaut.application") version "4.4.2"
id("jacoco")
id("org.sonarqube") version "5.1.0.4882"
id("pl.allegro.tech.build.axion-release") version "1.18.5"
id("pl.allegro.tech.build.axion-release") version "1.18.9"
id("checkstyle")
}

Expand Down Expand Up @@ -37,7 +37,7 @@ dependencies {
implementation("io.swagger.core.v3:swagger-annotations")
implementation("jakarta.annotation:jakarta.annotation-api")
implementation("jakarta.validation:jakarta.validation-api")
implementation('io.confluent:kafka-schema-registry-client:7.7.0')
implementation('io.confluent:kafka-schema-registry-client:7.7.1')


compileOnly("org.projectlombok:lombok")
Expand All @@ -46,10 +46,10 @@ dependencies {
runtimeOnly("ch.qos.logback:logback-classic")

testImplementation("org.mockito:mockito-core")
testImplementation("org.testcontainers:junit-jupiter:1.20.0")
testImplementation("org.testcontainers:testcontainers:1.20.0")
testImplementation("org.testcontainers:kafka:1.20.0")
testImplementation("org.mockito:mockito-junit-jupiter:5.12.0")
testImplementation("org.testcontainers:junit-jupiter:1.20.1")
testImplementation("org.testcontainers:testcontainers:1.20.1")
testImplementation("org.testcontainers:kafka:1.20.1")
testImplementation("org.mockito:mockito-junit-jupiter:5.14.1")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0")
testImplementation("io.projectreactor:reactor-test")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
* @param namespace The namespace
* @param dryrun Is dry run mode or not ?
* @return An HTTP response
* @deprecated use bulkDelete instead.
*/
@Delete("/{namespace}{?dryrun}")
@Deprecated(since = "1.13.0")
public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
Optional<Namespace> optionalNamespace = namespaceService.findByName(namespace);
if (optionalNamespace.isEmpty()) {
Expand All @@ -141,17 +143,65 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
return HttpResponse.noContent();
}

var namespaceToDelete = optionalNamespace.get();
performDeletion(optionalNamespace.get());
return HttpResponse.noContent();
}

/**
* Delete namespaces.
*
* @param dryrun Is dry run mode or not ?
* @param name The name parameter
* @return An HTTP response
*/
@Delete
public HttpResponse<Void> bulkDelete(@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
List<Namespace> namespaces = namespaceService.findByWildcardName(name);
if (namespaces.isEmpty()) {
return HttpResponse.notFound();
}

List<String> namespaceResources = namespaces
.stream()
.flatMap(namespace -> namespaceService.findAllResourcesByNamespace(namespace)
.stream())
.toList();

if (!namespaceResources.isEmpty()) {
List<String> validationErrors = namespaceResources
.stream()
.map(FormatErrorUtils::invalidNamespaceDeleteOperation)
.toList();

throw new ResourceValidationException(
NAMESPACE,
String.join(",", namespaces.stream().map(namespace -> namespace.getMetadata().getName()).toList()),
validationErrors
);
}

if (dryrun) {
return HttpResponse.noContent();
}

namespaces.forEach(this::performDeletion);
return HttpResponse.noContent();
}

/**
* Perform the deletion of the namespace and send an event log.
*
* @param namespace The namespace to delete
*/
private void performDeletion(Namespace namespace) {
sendEventLog(
namespaceToDelete,
namespace,
ApplyStatus.deleted,
namespaceToDelete.getSpec(),
namespace.getSpec(),
null,
EMPTY_STRING
);

namespaceService.delete(optionalNamespace.get());
return HttpResponse.noContent();
namespaceService.delete(namespace);
}
}
22 changes: 8 additions & 14 deletions src/main/java/com/michelin/ns4kafka/service/ConnectorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,14 @@ public Flux<Connector> listUnsynchronizedConnectors(Namespace namespace) {
public Mono<HttpResponse<Void>> restart(Namespace namespace, Connector connector) {
return kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(),
connector.getMetadata().getName())
.flatMap(status -> {
Flux<HttpResponse<Void>> responses = Flux.fromIterable(status.tasks())
.flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId()))
.map(response -> {
log.info("Success restarting connector [{}] on namespace [{}] connect [{}]",
connector.getMetadata().getName(),
namespace.getMetadata().getName(),
connector.getSpec().getConnectCluster());
return HttpResponse.ok();
});

return Mono.from(responses);
});
.flatMap(status -> Flux.fromIterable(status.tasks())
.flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId()))
.doOnNext(restart -> log.info("Success restarting connector [{}] on namespace [{}] connect [{}]",
connector.getMetadata().getName(),
namespace.getMetadata().getName(),
connector.getSpec().getConnectCluster()))
.then(Mono.just(HttpResponse.ok())));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
@Singleton
public class KafkaConnectClient {
private static final String CONNECTORS = "/connectors/";

@Inject
ConnectClusterRepository connectClusterRepository;

@Inject
@Client(id = "kafka-connect")
private HttpClient httpClient;

@Inject
private List<ManagedClusterProperties> managedClusterProperties;

@Inject
private SecurityProperties securityProperties;

Expand Down Expand Up @@ -222,7 +226,8 @@ public Mono<HttpResponse<Void>> resume(String kafkaCluster, String connectCluste
* @return The Kafka Connect configuration
*/
public KafkaConnectClient.KafkaConnectHttpConfig getKafkaConnectConfig(String kafkaCluster, String connectCluster) {
Optional<ManagedClusterProperties> config = managedClusterProperties.stream()
Optional<ManagedClusterProperties> config = managedClusterProperties
.stream()
.filter(kafkaAsyncExecutorConfig -> kafkaAsyncExecutorConfig.getName().equals(kafkaCluster))
.findFirst();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public ConnectorState(@JsonProperty("state") String state, @JsonProperty("worker
public static class TaskState extends AbstractState implements Comparable<TaskState> {
private final int id;

public TaskState(@JsonProperty("id") int id, @JsonProperty("state") String state,
public TaskState(@JsonProperty("id") int id,
@JsonProperty("state") String state,
@JsonProperty("worker_id") String worker,
@JsonProperty("msg") String msg) {
super(state, worker, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ void shouldUpdateNamespaceInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteNamespace() {
Namespace existing = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -317,6 +318,7 @@ void shouldDeleteNamespace() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteNamespaceInDryRunMode() {
Namespace existing = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -340,6 +342,7 @@ void shouldDeleteNamespaceInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteNamespaceWhenNotFound() {
when(namespaceService.findByName("namespace"))
.thenReturn(Optional.empty());
Expand All @@ -351,6 +354,7 @@ void shouldNotDeleteNamespaceWhenNotFound() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteNamespaceWhenResourcesAreStillLinkedWithIt() {
Namespace existing = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -371,4 +375,117 @@ void shouldNotDeleteNamespaceWhenResourcesAreStillLinkedWithIt() {
() -> namespaceController.delete("namespace", false));
verify(namespaceService, never()).delete(any());
}

@Test
void shouldDeleteNamespaces() {
Namespace namespace1 = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace1")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.kafkaUser("user")
.build())
.build();

Namespace namespace2 = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace2")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.kafkaUser("user")
.build())
.build();

when(namespaceService.findByWildcardName("namespace*"))
.thenReturn(List.of(namespace1, namespace2));
when(namespaceService.findAllResourcesByNamespace(namespace1))
.thenReturn(List.of());
when(namespaceService.findAllResourcesByNamespace(namespace2))
.thenReturn(List.of());
when(securityService.username())
.thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN))
.thenReturn(false);

doNothing().when(applicationEventPublisher).publishEvent(any());
var result = namespaceController.bulkDelete("namespace*", false);
assertEquals(HttpResponse.noContent().getStatus(), result.getStatus());
}

@Test
void shouldDeleteNamespacesInDryRunMode() {
Namespace namespace1 = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace1")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.kafkaUser("user")
.build())
.build();

Namespace namespace2 = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace2")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.kafkaUser("user")
.build())
.build();

when(namespaceService.findByWildcardName("namespace*"))
.thenReturn(List.of(namespace1, namespace2));
when(namespaceService.findAllResourcesByNamespace(namespace1))
.thenReturn(List.of());
when(namespaceService.findAllResourcesByNamespace(namespace2))
.thenReturn(List.of());

var result = namespaceController.bulkDelete("namespace*", true);
verify(namespaceService, never()).delete(any());
assertEquals(HttpResponse.noContent().getStatus(), result.getStatus());
}

@Test
void shouldNotDeleteNamespacesWhenResourcesAreStillLinkedWithIt() {
Namespace namespace1 = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace1")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.kafkaUser("user")
.build())
.build();

Namespace namespace2 = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace2")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.kafkaUser("user")
.build())
.build();

when(namespaceService.findByWildcardName("namespace*"))
.thenReturn(List.of(namespace1, namespace2));
when(namespaceService.findAllResourcesByNamespace(namespace1))
.thenReturn(List.of("Topic/topic1"));
when(namespaceService.findAllResourcesByNamespace(namespace2))
.thenReturn(List.of());

assertThrows(ResourceValidationException.class,
() -> namespaceController.bulkDelete("namespace*", false));
verify(namespaceService, never()).delete(any());
}

@Test
void shouldNotDeleteNamespacesWhenPatternMatchesNothing() {
when(namespaceService.findByWildcardName("namespace*")).thenReturn(List.of());
var result = namespaceController.bulkDelete("namespace*", false);
assertEquals(HttpResponse.notFound().getStatus(), result.getStatus());
}
}
Loading