Skip to content

Commit

Permalink
Introduce Kafka topic init task
Browse files Browse the repository at this point in the history
Adds a new init task that creates all required topics using their respective default configuration during startup.

This replaces the need for a separate init container (i.e. `redpanda-init` in our `docker-compose.yml`: https://github.com/DependencyTrack/hyades/blob/008c3a1b024969a159de085d5ca6b464147faa9a/docker-compose.yml#L251-L273).

The creation of topics is also faster because it can be done in a single batched request, vs. 10s of repetitive CLI invocations.

This init task is disabled by default for now, since it's not meant to be used for production deployments. The idea is to build on this work, and allow users to customize the default topic configurations (e.g. via env vars or config files).

Relates to DependencyTrack/hyades#1195

Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Oct 3, 2024
1 parent 6232195 commit d65dde7
Show file tree
Hide file tree
Showing 26 changed files with 642 additions and 345 deletions.
1 change: 1 addition & 0 deletions src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public enum ConfigKey implements Config.Key {
DATABASE_RUN_MIGRATIONS("database.run.migrations", true),
DATABASE_RUN_MIGRATIONS_ONLY("database.run.migrations.only", false),
INIT_TASKS_ENABLED("init.tasks.enabled", true),
INIT_TASKS_KAFKA_TOPICS_ENABLED("init.tasks.kafka.topics.enabled", false),
INIT_AND_EXIT("init.and.exit", false),

DEV_SERVICES_ENABLED("dev.services.enabled", false),
Expand Down
46 changes: 5 additions & 41 deletions src/main/java/org/dependencytrack/dev/DevServicesInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,26 @@

import alpine.Config;
import alpine.common.logging.Logger;

import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.dependencytrack.event.kafka.KafkaTopics;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static alpine.Config.AlpineKey.DATABASE_PASSWORD;
import static alpine.Config.AlpineKey.DATABASE_URL;
import static alpine.Config.AlpineKey.DATABASE_USERNAME;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_ENABLED;
import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_FRONTEND;
import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_KAFKA;
import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_POSTGRES;
import static org.dependencytrack.common.ConfigKey.INIT_TASKS_ENABLED;
import static org.dependencytrack.common.ConfigKey.INIT_TASKS_KAFKA_TOPICS_ENABLED;
import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS;

/**
Expand Down Expand Up @@ -138,6 +131,8 @@ public void contextInitialized(final ServletContextEvent event) {
configOverrides.put(DATABASE_USERNAME.getPropertyName(), postgresUsername);
configOverrides.put(DATABASE_PASSWORD.getPropertyName(), postgresPassword);
configOverrides.put(KAFKA_BOOTSTRAP_SERVERS.getPropertyName(), kafkaBootstrapServers);
configOverrides.put(INIT_TASKS_ENABLED.getPropertyName(), "true");
configOverrides.put(INIT_TASKS_KAFKA_TOPICS_ENABLED.getPropertyName(), "true");

try {
LOGGER.info("Applying config overrides: %s".formatted(configOverrides));
Expand All @@ -150,37 +145,6 @@ public void contextInitialized(final ServletContextEvent event) {
throw new RuntimeException("Failed to update configuration", e);
}

final var topicsToCreate = new ArrayList<>(List.of(
new NewTopic(KafkaTopics.NEW_EPSS.name(), 1, (short) 1).configs(Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)),
new NewTopic(KafkaTopics.NEW_VULNERABILITY.name(), 1, (short) 1).configs(Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)),
new NewTopic(KafkaTopics.NOTIFICATION_ANALYZER.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_BOM.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_CONFIGURATION.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_FILE_SYSTEM.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_INTEGRATION.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_POLICY_VIOLATION.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_REPOSITORY.name(), 1, (short) 1),
new NewTopic(KafkaTopics.NOTIFICATION_VEX.name(), 1, (short) 1),
new NewTopic(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name(), 1, (short) 1),
new NewTopic(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(), 1, (short) 1),
new NewTopic(KafkaTopics.VULN_ANALYSIS_COMMAND.name(), 1, (short) 1),
new NewTopic(KafkaTopics.VULN_ANALYSIS_RESULT.name(), 1, (short) 1),
new NewTopic(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name(), 1, (short) 1)
));

try (final var adminClient = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
LOGGER.info("Creating topics: %s".formatted(topicsToCreate));
adminClient.createTopics(topicsToCreate).all().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to create topics", e);
}

LOGGER.info("PostgreSQL is listening at localhost:%d".formatted(postgresPort));
LOGGER.info("Kafka is listening at localhost:%d".formatted(kafkaPort));
LOGGER.info("Frontend is listening at http://localhost:%d".formatted(frontendPort));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ static KafkaEvent<ScanKey, ScanCommand> convert(final ComponentVulnerabilityAnal
.build();

return new KafkaEvent<>(
KafkaTopics.VULN_ANALYSIS_COMMAND,
KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND,
scanKey, scanCommand,
Map.of(KafkaEventHeaders.VULN_ANALYSIS_LEVEL, event.level().name(),
KafkaEventHeaders.IS_NEW_COMPONENT, String.valueOf(event.isNewComponent()))
Expand All @@ -145,46 +145,46 @@ static KafkaEvent<String, AnalysisCommand> convert(final ComponentRepositoryMeta
.setFetchMeta(event.fetchMeta())
.build();

return new KafkaEvent<>(KafkaTopics.REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null);
return new KafkaEvent<>(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null);
}

static KafkaEvent<String, String> convert(final GitHubAdvisoryMirrorEvent ignored) {
final String key = Vulnerability.Source.GITHUB.name();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null);
return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, null);
}

static KafkaEvent<String, String> convert(final NistMirrorEvent ignored) {
final String key = Vulnerability.Source.NVD.name();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null);
return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, null);
}

static KafkaEvent<String, String> convert(final OsvMirrorEvent event) {
final String key = Vulnerability.Source.OSV.name();
final String value = event.ecosystem();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, value);
return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, value);
}

static KafkaEvent<String, String> convert(final EpssMirrorEvent ignored) {
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, "EPSS", null);
return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, "EPSS", null);
}

private static Topic<String, Notification> extractDestinationTopic(final Notification notification) {
return switch (notification.getGroup()) {
case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER;
case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED, GROUP_BOM_VALIDATION_FAILED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_CONFIGURATION -> KafkaTopics.NOTIFICATION_CONFIGURATION;
case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING;
case GROUP_FILE_SYSTEM -> KafkaTopics.NOTIFICATION_FILE_SYSTEM;
case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION;
case GROUP_NEW_VULNERABILITY -> KafkaTopics.NOTIFICATION_NEW_VULNERABILITY;
case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY;
case GROUP_POLICY_VIOLATION -> KafkaTopics.NOTIFICATION_POLICY_VIOLATION;
case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE;
case GROUP_PROJECT_CREATED -> KafkaTopics.NOTIFICATION_PROJECT_CREATED;
case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY;
case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_USER_CREATED, GROUP_USER_DELETED -> KafkaTopics.NOTIFICATION_USER;
case GROUP_ANALYZER -> KafkaTopics.TOPIC_NOTIFICATION_ANALYZER;
case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED, GROUP_BOM_VALIDATION_FAILED -> KafkaTopics.TOPIC_NOTIFICATION_BOM;
case GROUP_CONFIGURATION -> KafkaTopics.TOPIC_NOTIFICATION_CONFIGURATION;
case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.TOPIC_NOTIFICATION_DATASOURCE_MIRRORING;
case GROUP_FILE_SYSTEM -> KafkaTopics.TOPIC_NOTIFICATION_FILE_SYSTEM;
case GROUP_INTEGRATION -> KafkaTopics.TOPIC_NOTIFICATION_INTEGRATION;
case GROUP_NEW_VULNERABILITY -> KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY;
case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY;
case GROUP_POLICY_VIOLATION -> KafkaTopics.TOPIC_NOTIFICATION_POLICY_VIOLATION;
case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE;
case GROUP_PROJECT_CREATED -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED;
case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
case GROUP_REPOSITORY -> KafkaTopics.TOPIC_NOTIFICATION_REPOSITORY;
case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.TOPIC_NOTIFICATION_VEX;
case GROUP_USER_CREATED, GROUP_USER_DELETED -> KafkaTopics.TOPIC_NOTIFICATION_USER;
case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("""
Unable to determine destination topic because the notification does not \
specify a notification group: %s""".formatted(notification.getGroup()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* This file is part of Dependency-Track.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) OWASP Foundation. All Rights Reserved.
*/
package org.dependencytrack.event.kafka;

import alpine.Config;
import alpine.common.logging.Logger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.dependencytrack.common.ConfigKey.INIT_TASKS_ENABLED;
import static org.dependencytrack.common.ConfigKey.INIT_TASKS_KAFKA_TOPICS_ENABLED;
import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS;

/**
* @since 5.6.0
*/
public class KafkaTopicInitializer implements ServletContextListener {

private static final Logger LOGGER = Logger.getLogger(KafkaTopicInitializer.class);

private final Config config = Config.getInstance();

@Override
public void contextInitialized(final ServletContextEvent event) {
if (!config.getPropertyAsBoolean(INIT_TASKS_ENABLED)) {
LOGGER.debug("Not initializing Kafka topics because %s is disabled"
.formatted(INIT_TASKS_ENABLED.getPropertyName()));
return;
}
if (!config.getPropertyAsBoolean(INIT_TASKS_KAFKA_TOPICS_ENABLED)) {
LOGGER.debug("Not initializing Kafka topics because %s is disabled"
.formatted(INIT_TASKS_KAFKA_TOPICS_ENABLED.getPropertyName()));
return;
}

LOGGER.warn("Auto-creating topics with default configuration is not recommended for production deployments");

try (final AdminClient adminClient = createAdminClient()) {
final List<KafkaTopics.Topic<?, ?>> topicsToCreate = determineTopicsToCreate(adminClient);
if (topicsToCreate.isEmpty()) {
LOGGER.info("All topics exist already; Nothing to do");
return;
}

createTopics(adminClient, topicsToCreate);
LOGGER.info("Successfully created %d topics".formatted(topicsToCreate.size()));
}
}

private List<KafkaTopics.Topic<?, ?>> determineTopicsToCreate(final AdminClient adminClient) {
final Map<String, KafkaTopics.Topic<?, ?>> topicByName = KafkaTopics.ALL_TOPICS.stream()
.collect(Collectors.toMap(KafkaTopics.Topic::name, Function.identity()));

final var topicsToCreate = new ArrayList<KafkaTopics.Topic<?, ?>>(topicByName.size());

final var describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(3_000);
final DescribeTopicsResult topicsResult = adminClient.describeTopics(topicByName.keySet(), describeTopicsOptions);

final var exceptionsByTopicName = new HashMap<String, Throwable>();
for (final Map.Entry<String, KafkaFuture<TopicDescription>> entry : topicsResult.topicNameValues().entrySet()) {
final String topicName = entry.getKey();
try {
entry.getValue().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
LOGGER.debug("Topic %s does not exist and will need to be created".formatted(topicName));
topicsToCreate.add(topicByName.get(topicName));
} else {
exceptionsByTopicName.put(topicName, e.getCause());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("""
Thread was interrupted while waiting for broker response. \
The existence of topic %s can not be determined.""".formatted(topicName), e);
}
}

if (!exceptionsByTopicName.isEmpty()) {
final String exceptionSummary = exceptionsByTopicName.entrySet().stream()
.map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "[", "]"));
throw new IllegalStateException("Existence of %d topic(s) could not be verified: %s"
.formatted(exceptionsByTopicName.size(), exceptionSummary));
}

return topicsToCreate;
}

private void createTopics(final AdminClient adminClient, final Collection<KafkaTopics.Topic<?, ?>> topics) {
final List<NewTopic> newTopics = topics.stream()
.map(topic -> {
final var newTopic = new NewTopic(
topic.name(),
topic.defaultConfig().partitions(),
topic.defaultConfig().replicationFactor());
if (topic.defaultConfig().configs() != null) {
return newTopic.configs(topic.defaultConfig().configs());
}

return newTopic;
})
.toList();

final var createTopicsOptions = new CreateTopicsOptions().timeoutMs(3_000);
final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics, createTopicsOptions);

final var exceptionsByTopicName = new HashMap<String, Throwable>();
for (final Map.Entry<String, KafkaFuture<Void>> entry : createTopicsResult.values().entrySet()) {
final String topicName = entry.getKey();
try {
entry.getValue().get();
} catch (ExecutionException e) {
exceptionsByTopicName.put(topicName, e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("""
Thread was interrupted while waiting for broker response. \
Successful creation of topic %s can not be verified.""".formatted(topicName), e);
}
}

if (!exceptionsByTopicName.isEmpty()) {
final String exceptionSummary = exceptionsByTopicName.entrySet().stream()
.map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "[", "]"));
throw new IllegalStateException("Failed to create %d topic(s): %s"
.formatted(exceptionsByTopicName.size(), exceptionSummary));
}
}

private AdminClient createAdminClient() {
final var adminClientConfig = new HashMap<String, Object>();
adminClientConfig.put(BOOTSTRAP_SERVERS_CONFIG, config.getProperty(KAFKA_BOOTSTRAP_SERVERS));
adminClientConfig.put(CLIENT_ID_CONFIG, "%s-admin-client".formatted("instanceId"));

LOGGER.debug("Creating admin client with options %s".formatted(adminClientConfig));
return AdminClient.create(adminClientConfig);
}

}
Loading

0 comments on commit d65dde7

Please sign in to comment.