-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Assign user a given password and check connection status for a given password #430
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidKafkaUser; | ||
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_RESET_PASSWORD; | ||
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_CHECK_PASSWORD; | ||
|
||
import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; | ||
import com.michelin.ns4kafka.model.KafkaUserResetPassword; | ||
|
@@ -12,17 +13,21 @@ | |
import com.michelin.ns4kafka.util.exception.ResourceValidationException; | ||
import io.micronaut.context.ApplicationContext; | ||
import io.micronaut.http.HttpResponse; | ||
import io.micronaut.http.annotation.Body; | ||
import io.micronaut.http.annotation.Controller; | ||
import io.micronaut.http.annotation.Post; | ||
import io.micronaut.http.annotation.Patch; | ||
import io.micronaut.inject.qualifiers.Qualifiers; | ||
import io.swagger.v3.oas.annotations.tags.Tag; | ||
import lombok.extern.slf4j.Slf4j; | ||
import jakarta.inject.Inject; | ||
import java.time.Instant; | ||
import java.util.Date; | ||
|
||
/** | ||
* Controller to manage users. | ||
*/ | ||
@Slf4j | ||
@Tag(name = "Users", description = "Manage the users.") | ||
@Controller(value = "/api/namespaces/{namespace}/users") | ||
public class UserController extends NamespacedResourceController { | ||
|
@@ -34,7 +39,7 @@ public class UserController extends NamespacedResourceController { | |
* | ||
* @param namespace The namespace | ||
* @param user The user | ||
* @return The new password | ||
* @return The new randomly generated password | ||
*/ | ||
@Post("/{user}/reset-password") | ||
public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, String user) { | ||
|
@@ -49,10 +54,39 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri | |
|
||
String password = userAsyncExecutor.resetPassword(ns.getSpec().getKafkaUser()); | ||
|
||
return setPasswordResponse(ns, password); | ||
} | ||
|
||
/** | ||
* Set a password. | ||
* | ||
* @param namespace The namespace | ||
* @param user The user | ||
* @param password The new password | ||
* @return void | ||
*/ | ||
@Patch("/{user}/set-password") | ||
public HttpResponse<KafkaUserResetPassword> setPassword(String namespace, String user, @Body String password) { | ||
log.debug("asked to setPassword [{}] for user [{}]", password, user); | ||
Namespace ns = getNamespace(namespace); | ||
|
||
if (!ns.getSpec().getKafkaUser().equals(user)) { | ||
throw new ResourceValidationException(KAFKA_USER_RESET_PASSWORD, user, invalidKafkaUser(user)); | ||
} | ||
|
||
UserAsyncExecutor userAsyncExecutor = | ||
applicationContext.getBean(UserAsyncExecutor.class, Qualifiers.byName(ns.getMetadata().getCluster())); | ||
|
||
userAsyncExecutor.setPassword(ns.getSpec().getKafkaUser(), password); | ||
|
||
return setPasswordResponse(ns, password); | ||
} | ||
|
||
protected HttpResponse<KafkaUserResetPassword> setPasswordResponse(Namespace ns, String password) { | ||
KafkaUserResetPassword response = KafkaUserResetPassword.builder() | ||
.metadata(Metadata.builder() | ||
.name(ns.getSpec().getKafkaUser()) | ||
.namespace(namespace) | ||
.namespace(ns.getMetadata().getName()) | ||
.cluster(ns.getMetadata().getCluster()) | ||
.creationTimestamp(Date.from(Instant.now())) | ||
.build()) | ||
|
@@ -64,4 +98,30 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri | |
sendEventLog(response, ApplyStatus.changed, null, response.getSpec()); | ||
return HttpResponse.ok(response); | ||
} | ||
|
||
/** | ||
* Check against broker if given password matches actual one | ||
* | ||
* @param namespace The namespace | ||
* @param user The user | ||
* @param password The current password | ||
* @return http status 200 (correct) or 401 (wrong password, thus unauthorized) | ||
*/ | ||
@Post("/{user}/check-password") | ||
public HttpResponse<Void> checkPassword(String namespace, String user, @Body String password) { | ||
log.debug("asked to checkPassword '{}' for user '{}'", password, user); | ||
Namespace ns = getNamespace(namespace); | ||
|
||
if (!ns.getSpec().getKafkaUser().equals(user)) { | ||
throw new ResourceValidationException(KAFKA_USER_CHECK_PASSWORD, user, invalidKafkaUser(user)); | ||
} | ||
|
||
UserAsyncExecutor userAsyncExecutor = | ||
applicationContext.getBean(UserAsyncExecutor.class, Qualifiers.byName(ns.getMetadata().getCluster())); | ||
|
||
if (userAsyncExecutor.checkPassword(ns.getSpec().getKafkaUser(), password)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As suggested in michelin/kafkactl#91, a status that specifies if the connection with the given password ends with |
||
return HttpResponse.ok(); | ||
} | ||
return HttpResponse.unauthorized(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,10 +25,15 @@ | |
import org.apache.kafka.clients.admin.ScramCredentialInfo; | ||
import org.apache.kafka.clients.admin.ScramMechanism; | ||
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion; | ||
import org.apache.kafka.clients.CommonClientConfigs; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.common.config.SaslConfigs; | ||
import org.apache.kafka.common.quota.ClientQuotaAlteration; | ||
import org.apache.kafka.common.quota.ClientQuotaEntity; | ||
import org.apache.kafka.common.quota.ClientQuotaFilter; | ||
import org.apache.kafka.common.quota.ClientQuotaFilterComponent; | ||
import java.util.Properties; | ||
|
||
/** | ||
* User executor. | ||
|
@@ -126,6 +131,32 @@ public String resetPassword(String user) { | |
} | ||
} | ||
|
||
/** | ||
* Set the password of a given user. | ||
* | ||
* @param user The user | ||
* @param password The new password | ||
*/ | ||
public void setPassword(String user, String password) { | ||
if (userExecutor.canResetPassword()) { | ||
userExecutor.setPassword(user, password); | ||
} else { | ||
throw new ResourceValidationException(KAFKA_USER_RESET_PASSWORD, user, | ||
invalidResetPasswordProvider(managedClusterProperties.getProvider())); | ||
} | ||
} | ||
|
||
/** | ||
* Check if the password matches for given user. | ||
* | ||
* @param user The user | ||
* @param password The password to test | ||
* @return true if given password matches current one | ||
*/ | ||
public boolean checkPassword(String user, String password) { | ||
return userExecutor.checkPassword(user, password, managedClusterProperties.getConfig()); | ||
} | ||
|
||
private Map<String, Map<String, Double>> collectNs4kafkaQuotas() { | ||
return namespaceRepository.findAllForCluster(managedClusterProperties.getName()) | ||
.stream() | ||
|
@@ -155,6 +186,8 @@ interface AbstractUserSynchronizer { | |
boolean canResetPassword(); | ||
|
||
String resetPassword(String user); | ||
void setPassword(String user, String password); | ||
boolean checkPassword(String user, String password, Properties config); | ||
|
||
void applyQuotas(String user, Map<String, Double> quotas); | ||
|
||
|
@@ -200,6 +233,66 @@ public String resetPassword(String user) { | |
return password; | ||
} | ||
|
||
@Override | ||
public void setPassword(String user, String password) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See suggestions from above: ➡️ I think
|
||
UserScramCredentialUpsertion update = new UserScramCredentialUpsertion(user, info, password); | ||
try { | ||
admin.alterUserScramCredentials(List.of(update)).all().get(10, TimeUnit.SECONDS); | ||
log.info("Success setting password for user {}", user); | ||
} catch (InterruptedException e) { | ||
log.error("Error", e); | ||
Thread.currentThread().interrupt(); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean checkPassword(String user, String password, Properties config) { | ||
Properties props = new Properties(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap.servers")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using an |
||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||
|
||
// Configure SCRAM-SHA256 authentication | ||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.get("security.protocol")); | ||
// TODO : what if it's not SCRAM ?? | ||
props.put(SaslConfigs.SASL_MECHANISM, config.get("sasl.mechanism")); | ||
props.put(SaslConfigs.SASL_JAAS_CONFIG, | ||
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" | ||
+ user + "\" password=\"" + password + "\";"); | ||
// props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); // Reduce the delivery timeout to avoid blocking caller too long | ||
|
||
log.debug("Configuring connection to " + config.get("bootstrap.servers")); | ||
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | ||
|
||
try { | ||
// Create a Kafka producer with the configured properties | ||
// List<PartitionInfo> partitionInfoList = | ||
log.debug("Connecting ..."); | ||
Object dummy = producer.partitionsFor("__consumer_offsets"); // this topic should always exist | ||
|
||
// If the authentication is successful, the password is correct | ||
log.debug("Access to topic -> Password is correct"); | ||
return true; | ||
|
||
} catch (org.apache.kafka.common.errors.TopicAuthorizationException e) { | ||
// not allowed to access to topic, but if broker says this, it means authent itself was successful | ||
log.debug("Topic forbidden, but password is correct"); | ||
return true; | ||
} catch (org.apache.kafka.common.errors.SaslAuthenticationException e) { | ||
log.debug("Authent failed -> Password is incorrect"); | ||
return false; | ||
} catch (Exception e) { | ||
// consider any other exception as a failed authent. | ||
log.debug("other exception {} -> Password is incorrect", e); | ||
return false; | ||
} finally { | ||
// Close the producer | ||
producer.close(); | ||
} | ||
} | ||
|
||
@Override | ||
public Map<String, Map<String, Double>> listQuotas() { | ||
ClientQuotaFilter filter = ClientQuotaFilter.containsOnly( | ||
|
@@ -262,6 +355,14 @@ public boolean canResetPassword() { | |
public String resetPassword(String user) { | ||
throw exception; | ||
} | ||
@Override | ||
public void setPassword(String user, String password) { | ||
throw exception; | ||
} | ||
@Override | ||
public boolean checkPassword(String user, String password, Properties config) { | ||
throw exception; | ||
} | ||
|
||
@Override | ||
public void applyQuotas(String user, Map<String, Double> quotas) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As suggested in michelin/kafkactl#91, I think we can use a single endpoint for that, following this logic:
userAsyncExecutor#resetPassword
with the given username/passworduserAsyncExecutor#resetPassword
with the given username/random password.➡️ I think
userAsyncExecutor#resetPassword
anduserAsyncExecutor#setPassword
can be a single method as well:password
parameter touserAsyncExecutor#resetPassword
userAsyncExecutor#resetPassword
in order to generate it if the @Body is empty