Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Aug 27, 2024
1 parent b1b17d5 commit 4ee5ac2
Show file tree
Hide file tree
Showing 17 changed files with 48 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
}

sendEventLog(connector, status, existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec());
connector.getSpec(), "");

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
Expand Down Expand Up @@ -180,7 +180,7 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
}

Connector connectorToDelete = optionalConnector.get();
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null);
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null, "");

return connectorService
.delete(ns, optionalConnector.get())
Expand Down Expand Up @@ -264,7 +264,7 @@ public Flux<Connector> importResources(String namespace, @QueryValue(defaultValu
return unsynchronizedConnector;
}

sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec());
sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec(), "");

return connectorService.createOrUpdate(unsynchronizedConnector);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<ConsumerGroupResetOffsetsResponse> resetOffsets(String namespace, St
consumerGroupResetOffsets.getSpec().getMethod());

if (!dryrun) {
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null, consumerGroupResetOffsets.getSpec());
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null, consumerGroupResetOffsets.getSpec(), "");
consumerGroupService.alterConsumerGroupOffsets(ns, consumerGroup, preparedOffsets);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
}

sendEventLog(namespace, status, existingNamespace.<Object>map(Namespace::getSpec).orElse(null),
namespace.getSpec());
namespace.getSpec(), "");

return formatHttpResponse(namespaceService.createOrUpdate(namespace), status);
}
Expand Down Expand Up @@ -136,7 +136,7 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
}

var namespaceToDelete = optionalNamespace.get();
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null);
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null, "");
namespaceService.delete(optionalNamespace.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public HttpResponse<RoleBinding> apply(String namespace, @Valid @Body RoleBindin
}

sendEventLog(roleBinding, status, existingRoleBinding.<Object>map(RoleBinding::getSpec).orElse(null),
roleBinding.getSpec());
roleBinding.getSpec(), "");
roleBindingService.create(roleBinding);
return formatHttpResponse(roleBinding, status);
}
Expand All @@ -116,7 +116,7 @@ public HttpResponse<Void> delete(String namespace, String name,
}

var roleBindingToDelete = roleBinding.get();
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null);
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null, "");
roleBindingService.delete(roleBindingToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
oldSchemas.isEmpty() ? null : oldSchemas.stream()
.max(Comparator.comparingInt(
(Schema s) -> s.getSpec().getId())),
schema.getSpec());
schema.getSpec(), "");

return formatHttpResponse(schema, status);
});
Expand Down Expand Up @@ -163,60 +163,29 @@ public Mono<HttpResponse<Void>> delete(String namespace,
return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject)));
}

// delete all versions of the schema
if (version.isEmpty()) {
return schemaService.getLatestSubject(ns, subject)
return version
.map(v -> schemaService.getSubject(ns, subject, v))
.orElseGet(() -> schemaService.getLatestSubject(ns, subject))
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
if (latestSubjectOptional.isEmpty()) {
.flatMap(subjectOptional -> {
if (subjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

Schema schemaToDelete = latestSubjectOptional.get();
sendEventLog(schemaToDelete, ApplyStatus.deleted, schemaToDelete.getSpec(), null);

return schemaService
.deleteAllVersions(ns, subject)
.map(deletedVersionIds -> HttpResponse.noContent());
return (version.isEmpty() ? schemaService.deleteAllVersions(ns, subject) :
schemaService.deleteVersion(ns, subject, version.get()))
.map(deletedVersionIds -> {
Schema deletedSchema = subjectOptional.get();
sendEventLog(deletedSchema, ApplyStatus.deleted, deletedSchema.getSpec(), null,
version.map(v -> "").orElseGet(() -> String.valueOf(deletedVersionIds)));
return HttpResponse.noContent();
});
});
}

// delete a specific version of the schema
return schemaService.getSubject(ns, subject, version.get())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(subjectOptional -> {
if (subjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

Schema schemaToDelete = subjectOptional.get();
return schemaService
.deleteVersion(ns, subject, version.get())
.flatMap(optionalNewLatestSubject -> {
// there was only one version & it was deleted, so the schema is deleted in ns4
if (optionalNewLatestSubject.isEmpty()) {
sendEventLog(schemaToDelete, ApplyStatus.deleted,
schemaToDelete.getSpec(), null);
}
// there were multiple versions & latest version was deleted, so the schema is changed in ns4
if (optionalNewLatestSubject.isPresent() && optionalNewLatestSubject.get()
.getSpec().getVersion() < schemaToDelete.getSpec().getVersion()) {
sendEventLog(schemaToDelete, ApplyStatus.changed,
schemaToDelete.getSpec(), optionalNewLatestSubject.get().getSpec());
}
return Mono.just(HttpResponse.noContent());
});
});
}

/**
Expand Down Expand Up @@ -259,7 +228,7 @@ public Mono<HttpResponse<SchemaCompatibilityState>> config(String namespace, @Pa
.updateSubjectCompatibility(ns, latestSubjectOptional.get(), compatibility)
.map(schemaCompatibility -> {
sendEventLog(latestSubjectOptional.get(), ApplyStatus.changed,
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility);
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility, "");

return HttpResponse.ok(state);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ HttpResponse<KafkaStream> apply(String namespace, @Body @Valid KafkaStream strea
}

sendEventLog(stream, status, existingStream.<Object>map(KafkaStream::getMetadata).orElse(null),
stream.getMetadata());
stream.getMetadata(), "");

return formatHttpResponse(streamService.create(stream), status);
}
Expand Down Expand Up @@ -126,7 +126,7 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
}

var streamToDelete = optionalStream.get();
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null);
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null, "");
streamService.delete(ns, optionalStream.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri
.build())
.build();

sendEventLog(response, ApplyStatus.changed, null, response.getSpec());
sendEventLog(response, ApplyStatus.changed, null, response.getSpec(), "");
return HttpResponse.ok(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
}

sendEventLog(accessControlEntry, status, existingAcl.<Object>map(AccessControlEntry::getSpec).orElse(null),
accessControlEntry.getSpec());
accessControlEntry.getSpec(), "");

return formatHttpResponse(aclService.create(accessControlEntry), status);
}
Expand Down Expand Up @@ -181,7 +181,7 @@ public HttpResponse<Void> delete(Authentication authentication, String namespace
return HttpResponse.noContent();
}

sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null);
sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null, "");

aclService.delete(accessControlEntry);
return HttpResponse.noContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public Mono<HttpResponse<ConnectCluster>> apply(String namespace, @Body @Valid C
}

sendEventLog(connectCluster, status,
existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null), connectCluster.getSpec());
existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null),
connectCluster.getSpec(), "");

return Mono.just(formatHttpResponse(connectClusterService.create(connectCluster), status));
});
Expand Down Expand Up @@ -161,7 +162,7 @@ public HttpResponse<Void> delete(String namespace, String connectCluster,
}

ConnectCluster connectClusterToDelete = optionalConnectCluster.get();
sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null);
sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null, "");

connectClusterService.delete(connectClusterToDelete);
return HttpResponse.noContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public <T> HttpResponse<T> formatHttpResponse(T body, ApplyStatus status) {
* @param after the resource after the operation
*/
public void sendEventLog(MetadataResource resource, ApplyStatus operation, Object before,
Object after) {
Object after, String version) {
AuditLog auditLog = new AuditLog(securityService.username().orElse(""),
securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN), Date.from(Instant.now()),
resource.getKind(), resource.getMetadata(), operation, before, after);
resource.getKind(), resource.getMetadata(), operation, before, after, version);
applicationEventPublisher.publishEvent(auditLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public HttpResponse<ResourceQuota> apply(String namespace, @Body @Valid Resource
}

sendEventLog(quota, status, resourceQuotaOptional.<Object>map(ResourceQuota::getSpec).orElse(null),
quota.getSpec());
quota.getSpec(), "");

return formatHttpResponse(resourceQuotaService.create(quota), status);
}
Expand All @@ -125,7 +125,7 @@ public HttpResponse<Void> delete(String namespace, String name,
}

ResourceQuota resourceQuotaToDelete = resourceQuota.get();
sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null);
sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null, "");
resourceQuotaService.delete(resourceQuotaToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic,
return formatHttpResponse(topic, status);
}

sendEventLog(topic, status, existingTopic.<Object>map(Topic::getSpec).orElse(null), topic.getSpec());
sendEventLog(topic, status, existingTopic.<Object>map(Topic::getSpec).orElse(null), topic.getSpec(), "");

return formatHttpResponse(topicService.create(topic), status);
}
Expand Down Expand Up @@ -178,7 +178,7 @@ public HttpResponse<Void> delete(String namespace, String topic,
}

Topic topicToDelete = optionalTopic.get();
sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null);
sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null, "");
topicService.delete(optionalTopic.get());

return HttpResponse.noContent();
Expand Down Expand Up @@ -214,7 +214,7 @@ public List<Topic> importResources(String namespace, @QueryValue(defaultValue =
return unsynchronizedTopics
.stream()
.map(topic -> {
sendEventLog(topic, ApplyStatus.created, null, topic.getSpec());
sendEventLog(topic, ApplyStatus.created, null, topic.getSpec(), "");
return topicService.create(topic);
})
.toList();
Expand Down Expand Up @@ -256,7 +256,7 @@ public List<DeleteRecordsResponse> deleteRecords(String namespace, String topic,
if (dryrun) {
deletedRecords = recordsToDelete;
} else {
sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null);
sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null, "");
deletedRecords = topicService.deleteRecords(optionalTopic.get(), recordsToDelete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ public class ConsoleLogListener implements ApplicationEventListener<AuditLog> {

@Override
public void onApplicationEvent(AuditLog event) {
log.info("{} {} {} {} {} in namespace {} on cluster {}.",
log.info("{} {} {} {}{} {} in namespace {} on cluster {}.",
event.isAdmin() ? "Admin" : "User",
event.getUser(),
event.getOperation(),
event.getKind(),
event.getMetadata().getName(),
event.getVersion().isEmpty() ? "" : " version " + event.getVersion(),
event.getMetadata().getNamespace(),
event.getMetadata().getCluster()
);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/michelin/ns4kafka/model/AuditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ public class AuditLog {
private ApplyStatus operation;
private Object before;
private Object after;
private String version;
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,13 @@ public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
* @param version The version of the schema to delete
* @return The latest subject after deletion
*/
public Mono<Optional<Schema>> deleteVersion(Namespace namespace, String subject, String version) {
public Mono<Integer> deleteVersion(Namespace namespace, String subject, String version) {
return schemaRegistryClient
.deleteSubjectVersion(namespace.getMetadata().getCluster(), subject, version, false)
.flatMap(softDeletedVersionIds -> schemaRegistryClient
.deleteSubjectVersion(namespace.getMetadata().getCluster(),
subject, Integer.toString(softDeletedVersionIds), true)
.flatMap(hardDeletedVersionId -> getLatestSubject(namespace, subject)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(Mono::just)
));
);
}

/**
Expand Down
Loading

0 comments on commit 4ee5ac2

Please sign in to comment.