Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign user a given password and check connection status for a given password #430

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions src/main/java/com/michelin/ns4kafka/controller/UserController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidKafkaUser;
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_RESET_PASSWORD;
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_CHECK_PASSWORD;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.KafkaUserResetPassword;
Expand All @@ -12,17 +13,21 @@
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Patch;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import jakarta.inject.Inject;
import java.time.Instant;
import java.util.Date;

/**
* Controller to manage users.
*/
@Slf4j
@Tag(name = "Users", description = "Manage the users.")
@Controller(value = "/api/namespaces/{namespace}/users")
public class UserController extends NamespacedResourceController {
Expand All @@ -34,7 +39,7 @@ public class UserController extends NamespacedResourceController {
*
* @param namespace The namespace
* @param user The user
* @return The new password
* @return The new randomly generated password
*/
@Post("/{user}/reset-password")
public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, String user) {
Expand All @@ -49,10 +54,39 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri

String password = userAsyncExecutor.resetPassword(ns.getSpec().getKafkaUser());

return setPasswordResponse(ns, password);
}

/**
* Set a password.
*
* @param namespace The namespace
* @param user The user
* @param password The new password
* @return void
*/
@Patch("/{user}/set-password")
public HttpResponse<KafkaUserResetPassword> setPassword(String namespace, String user, @Body String password) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested in michelin/kafkactl#91, I think we can use a single endpoint for that, following this logic:

  • if @Body is not empty, calling userAsyncExecutor#resetPassword with the given username/password
  • if @Body is empty, generate a random password, then calling userAsyncExecutor#resetPassword with the given username/random password.

➡️ I think userAsyncExecutor#resetPassword and userAsyncExecutor#setPassword can be a single method as well:

  • Add a password parameter to userAsyncExecutor#resetPassword
  • Extract the generation of the random password outside of userAsyncExecutor#resetPassword in order to generate it if the @Body is empty

log.debug("asked to setPassword [{}] for user [{}]", password, user);
Namespace ns = getNamespace(namespace);

if (!ns.getSpec().getKafkaUser().equals(user)) {
throw new ResourceValidationException(KAFKA_USER_RESET_PASSWORD, user, invalidKafkaUser(user));
}

UserAsyncExecutor userAsyncExecutor =
applicationContext.getBean(UserAsyncExecutor.class, Qualifiers.byName(ns.getMetadata().getCluster()));

userAsyncExecutor.setPassword(ns.getSpec().getKafkaUser(), password);

return setPasswordResponse(ns, password);
}

protected HttpResponse<KafkaUserResetPassword> setPasswordResponse(Namespace ns, String password) {
KafkaUserResetPassword response = KafkaUserResetPassword.builder()
.metadata(Metadata.builder()
.name(ns.getSpec().getKafkaUser())
.namespace(namespace)
.namespace(ns.getMetadata().getName())
.cluster(ns.getMetadata().getCluster())
.creationTimestamp(Date.from(Instant.now()))
.build())
Expand All @@ -64,4 +98,30 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri
sendEventLog(response, ApplyStatus.changed, null, response.getSpec());
return HttpResponse.ok(response);
}

/**
* Check against broker if given password matches actual one
*
* @param namespace The namespace
* @param user The user
* @param password The current password
* @return http status 200 (correct) or 401 (wrong password, thus unauthorized)
*/
@Post("/{user}/check-password")
public HttpResponse<Void> checkPassword(String namespace, String user, @Body String password) {
log.debug("asked to checkPassword '{}' for user '{}'", password, user);
Namespace ns = getNamespace(namespace);

if (!ns.getSpec().getKafkaUser().equals(user)) {
throw new ResourceValidationException(KAFKA_USER_CHECK_PASSWORD, user, invalidKafkaUser(user));
}

UserAsyncExecutor userAsyncExecutor =
applicationContext.getBean(UserAsyncExecutor.class, Qualifiers.byName(ns.getMetadata().getCluster()));

if (userAsyncExecutor.checkPassword(ns.getSpec().getKafkaUser(), password)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested in michelin/kafkactl#91, a status that specifies if the connection with the given password ends with SUCCESS or FAIL may be more significant.

return HttpResponse.ok();
}
return HttpResponse.unauthorized();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
import org.apache.kafka.clients.admin.ScramCredentialInfo;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
import java.util.Properties;

/**
* User executor.
Expand Down Expand Up @@ -126,6 +131,32 @@ public String resetPassword(String user) {
}
}

/**
* Set the password of a given user.
*
* @param user The user
* @param password The new password
*/
public void setPassword(String user, String password) {
if (userExecutor.canResetPassword()) {
userExecutor.setPassword(user, password);
} else {
throw new ResourceValidationException(KAFKA_USER_RESET_PASSWORD, user,
invalidResetPasswordProvider(managedClusterProperties.getProvider()));
}
}

/**
* Check if the password matches for given user.
*
* @param user The user
* @param password The password to test
* @return true if given password matches current one
*/
public boolean checkPassword(String user, String password) {
return userExecutor.checkPassword(user, password, managedClusterProperties.getConfig());
}

private Map<String, Map<String, Double>> collectNs4kafkaQuotas() {
return namespaceRepository.findAllForCluster(managedClusterProperties.getName())
.stream()
Expand Down Expand Up @@ -155,6 +186,8 @@ interface AbstractUserSynchronizer {
boolean canResetPassword();

String resetPassword(String user);
void setPassword(String user, String password);
boolean checkPassword(String user, String password, Properties config);

void applyQuotas(String user, Map<String, Double> quotas);

Expand Down Expand Up @@ -200,6 +233,66 @@ public String resetPassword(String user) {
return password;
}

@Override
public void setPassword(String user, String password) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See suggestions from above:

➡️ I think userAsyncExecutor#resetPassword and userAsyncExecutor#setPassword can be a single method as well:

  • Add a password parameter to userAsyncExecutor#resetPassword
  • Extract the generation of the random password outside of userAsyncExecutor#resetPassword in order to generate it if the @Body is empty

UserScramCredentialUpsertion update = new UserScramCredentialUpsertion(user, info, password);
try {
admin.alterUserScramCredentials(List.of(update)).all().get(10, TimeUnit.SECONDS);
log.info("Success setting password for user {}", user);
} catch (InterruptedException e) {
log.error("Error", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public boolean checkPassword(String user, String password, Properties config) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap.servers"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an AdminClient to verify the connexion may be more elegant than instantiating a dedicated producer, let me sleep on that

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// Configure SCRAM-SHA256 authentication
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.get("security.protocol"));
// TODO : what if it's not SCRAM ??
props.put(SaslConfigs.SASL_MECHANISM, config.get("sasl.mechanism"));
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ user + "\" password=\"" + password + "\";");
// props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); // Reduce the delivery timeout to avoid blocking caller too long

log.debug("Configuring connection to " + config.get("bootstrap.servers"));
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

try {
// Create a Kafka producer with the configured properties
// List<PartitionInfo> partitionInfoList =
log.debug("Connecting ...");
Object dummy = producer.partitionsFor("__consumer_offsets"); // this topic should always exist

// If the authentication is successful, the password is correct
log.debug("Access to topic -> Password is correct");
return true;

} catch (org.apache.kafka.common.errors.TopicAuthorizationException e) {
// not allowed to access to topic, but if broker says this, it means authent itself was successful
log.debug("Topic forbidden, but password is correct");
return true;
} catch (org.apache.kafka.common.errors.SaslAuthenticationException e) {
log.debug("Authent failed -> Password is incorrect");
return false;
} catch (Exception e) {
// consider any other exception as a failed authent.
log.debug("other exception {} -> Password is incorrect", e);
return false;
} finally {
// Close the producer
producer.close();
}
}

@Override
public Map<String, Map<String, Double>> listQuotas() {
ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
Expand Down Expand Up @@ -262,6 +355,14 @@ public boolean canResetPassword() {
public String resetPassword(String user) {
throw exception;
}
@Override
public void setPassword(String user, String password) {
throw exception;
}
@Override
public boolean checkPassword(String user, String password, Properties config) {
throw exception;
}

@Override
public void applyQuotas(String user, Map<String, Double> quotas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public enum Kind {
CONSUMER_GROUP_RESET_OFFSET_RESPONSE("ConsumerGroupResetOffsetsResponse"),
DELETE_RECORDS_RESPONSE("DeleteRecordsResponse"),
KAFKA_USER_RESET_PASSWORD("KafkaUserResetPassword"),
KAFKA_USER_CHECK_PASSWORD("KafkaUserCheckPassword"),
KAFKA_STREAM("KafkaStream"),
NAMESPACE("Namespace"),
RESOURCE_QUOTA("ResourceQuota"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,42 @@ void shouldUpdateUserFailWhenItDoesNotBelongToNamespace() {
assertEquals("Invalid value \"user2\" for field \"user\": user does not belong to namespace.",
exception.getResponse().getBody(Status.class).get().getDetails().getCauses().getFirst());
}

@Test
void shouldSetPassword() throws ExecutionException, InterruptedException {
KafkaUserResetPassword response = ns4KafkaClient
.toBlocking()
.retrieve(HttpRequest
.PATCH("/api/namespaces/ns1/users/user1/set-password", "password1")
.bearerAuth(token), KafkaUserResetPassword.class);

Map<String, UserScramCredentialsDescription> mapUser = getAdminClient()
.describeUserScramCredentials(List.of("user1")).all().get();

String newPassword = response.getSpec().getNewPassword();
assertNotNull(newPassword);
assertEquals(newPassword, "password1");
}

@Test
void shouldCheckRightPassword() throws ExecutionException, InterruptedException {
HttpResponse<Void> response = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.POST("/api/namespaces/ns1/users/user1/check-password", "password1")
.bearerAuth(token), Void.class);

assertEquals(response.code(), 200);
}

@Test
void shouldCheckWrongPassword() throws ExecutionException, InterruptedException {
HttpResponse<Void> response = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.POST("/api/namespaces/ns1/users/user1/check-password", "wrong_password")
.bearerAuth(token), Void.class);

assertEquals(response.code(), 401);
}
}