From d65dde7d74b251c80b8fe0d75a701e5441fa1297 Mon Sep 17 00:00:00 2001 From: nscuro Date: Thu, 3 Oct 2024 15:03:13 +0200 Subject: [PATCH] Introduce Kafka topic init task Adds a new init task that creates all required topics using their respective default configuration during startup. This replaces the need for a separate init container (i.e. `redpanda-init` in our `docker-compose.yml`: https://github.com/DependencyTrack/hyades/blob/008c3a1b024969a159de085d5ca6b464147faa9a/docker-compose.yml#L251-L273). The creation of topics is also faster because it can be done in a single batched request, vs. 10s of repetitive CLI invocations. This init task is disabled by default for now, since it's not meant to be used for production deployments. The idea is to build on this work, and allow users to customize the default topic configurations (e.g. via env vars or config files). Relates to https://github.com/DependencyTrack/hyades/issues/1195 Signed-off-by: nscuro --- .../org/dependencytrack/common/ConfigKey.java | 1 + .../dev/DevServicesInitializer.java | 46 +--- .../event/kafka/KafkaEventConverter.java | 42 +-- .../event/kafka/KafkaTopicInitializer.java | 177 +++++++++++++ .../event/kafka/KafkaTopics.java | 248 ++++++++++++++---- .../kafka/processor/ProcessorInitializer.java | 10 +- .../VulnerabilityScanResultProcessor.java | 2 +- src/main/resources/application.properties | 10 + src/main/webapp/WEB-INF/web.xml | 3 + .../event/kafka/KafkaEventDispatcherTest.java | 12 +- .../event/kafka/KafkaTopicsTest.java | 4 +- .../SupportedMetaHandlerTest.java | 12 +- .../UnSupportedMetaHandlerTest.java | 4 +- ...dVulnerabilityScanResultProcessorTest.java | 30 +-- .../VulnerabilityScanResultProcessorTest.java | 80 +++--- .../processor/api/ProcessorManagerTest.java | 14 +- .../resources/v1/AnalysisResourceTest.java | 24 +- .../resources/v1/BomResourceTest.java | 4 +- .../resources/v1/ComponentResourceTest.java | 20 +- .../resources/v1/ProjectResourceTest.java | 2 +- .../v1/UserResourceAuthenticatedTest.java | 10 +- .../v1/ViolationAnalysisResourceTest.java | 16 +- .../tasks/BomUploadProcessingTaskTest.java | 148 +++++------ .../IntegrityMetaInitializerTaskTest.java | 2 +- .../tasks/RepositoryMetaAnalysisTaskTest.java | 34 +-- .../tasks/VulnerabilityAnalysisTaskTest.java | 32 +-- 26 files changed, 642 insertions(+), 345 deletions(-) create mode 100644 src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java diff --git a/src/main/java/org/dependencytrack/common/ConfigKey.java b/src/main/java/org/dependencytrack/common/ConfigKey.java index 5e42f2412..e32d48cdf 100644 --- a/src/main/java/org/dependencytrack/common/ConfigKey.java +++ b/src/main/java/org/dependencytrack/common/ConfigKey.java @@ -58,6 +58,7 @@ public enum ConfigKey implements Config.Key { DATABASE_RUN_MIGRATIONS("database.run.migrations", true), DATABASE_RUN_MIGRATIONS_ONLY("database.run.migrations.only", false), INIT_TASKS_ENABLED("init.tasks.enabled", true), + INIT_TASKS_KAFKA_TOPICS_ENABLED("init.tasks.kafka.topics.enabled", false), INIT_AND_EXIT("init.and.exit", false), DEV_SERVICES_ENABLED("dev.services.enabled", false), diff --git a/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java b/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java index c9c6e4f4f..93f3dee9d 100644 --- a/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java +++ b/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java @@ -20,33 +20,26 @@ import alpine.Config; import alpine.common.logging.Logger; + import jakarta.servlet.ServletContextEvent; import jakarta.servlet.ServletContextListener; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; -import org.dependencytrack.event.kafka.KafkaTopics; - import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static alpine.Config.AlpineKey.DATABASE_PASSWORD; import static alpine.Config.AlpineKey.DATABASE_URL; import static alpine.Config.AlpineKey.DATABASE_USERNAME; -import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_ENABLED; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_FRONTEND; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_KAFKA; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_POSTGRES; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_ENABLED; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_KAFKA_TOPICS_ENABLED; import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS; /** @@ -138,6 +131,8 @@ public void contextInitialized(final ServletContextEvent event) { configOverrides.put(DATABASE_USERNAME.getPropertyName(), postgresUsername); configOverrides.put(DATABASE_PASSWORD.getPropertyName(), postgresPassword); configOverrides.put(KAFKA_BOOTSTRAP_SERVERS.getPropertyName(), kafkaBootstrapServers); + configOverrides.put(INIT_TASKS_ENABLED.getPropertyName(), "true"); + configOverrides.put(INIT_TASKS_KAFKA_TOPICS_ENABLED.getPropertyName(), "true"); try { LOGGER.info("Applying config overrides: %s".formatted(configOverrides)); @@ -150,37 +145,6 @@ public void contextInitialized(final ServletContextEvent event) { throw new RuntimeException("Failed to update configuration", e); } - final var topicsToCreate = new ArrayList<>(List.of( - new NewTopic(KafkaTopics.NEW_EPSS.name(), 1, (short) 1).configs(Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)), - new NewTopic(KafkaTopics.NEW_VULNERABILITY.name(), 1, (short) 1).configs(Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)), - new NewTopic(KafkaTopics.NOTIFICATION_ANALYZER.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_BOM.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_CONFIGURATION.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_FILE_SYSTEM.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_INTEGRATION.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_POLICY_VIOLATION.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_REPOSITORY.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_VEX.name(), 1, (short) 1), - new NewTopic(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name(), 1, (short) 1), - new NewTopic(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(), 1, (short) 1), - new NewTopic(KafkaTopics.VULN_ANALYSIS_COMMAND.name(), 1, (short) 1), - new NewTopic(KafkaTopics.VULN_ANALYSIS_RESULT.name(), 1, (short) 1), - new NewTopic(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name(), 1, (short) 1) - )); - - try (final var adminClient = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) { - LOGGER.info("Creating topics: %s".formatted(topicsToCreate)); - adminClient.createTopics(topicsToCreate).all().get(); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException("Failed to create topics", e); - } - LOGGER.info("PostgreSQL is listening at localhost:%d".formatted(postgresPort)); LOGGER.info("Kafka is listening at localhost:%d".formatted(kafkaPort)); LOGGER.info("Frontend is listening at http://localhost:%d".formatted(frontendPort)); diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java b/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java index cae504507..30865ce8e 100644 --- a/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java @@ -123,7 +123,7 @@ static KafkaEvent convert(final ComponentVulnerabilityAnal .build(); return new KafkaEvent<>( - KafkaTopics.VULN_ANALYSIS_COMMAND, + KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, scanKey, scanCommand, Map.of(KafkaEventHeaders.VULN_ANALYSIS_LEVEL, event.level().name(), KafkaEventHeaders.IS_NEW_COMPONENT, String.valueOf(event.isNewComponent())) @@ -145,46 +145,46 @@ static KafkaEvent convert(final ComponentRepositoryMeta .setFetchMeta(event.fetchMeta()) .build(); - return new KafkaEvent<>(KafkaTopics.REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null); + return new KafkaEvent<>(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null); } static KafkaEvent convert(final GitHubAdvisoryMirrorEvent ignored) { final String key = Vulnerability.Source.GITHUB.name(); - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, null); } static KafkaEvent convert(final NistMirrorEvent ignored) { final String key = Vulnerability.Source.NVD.name(); - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, null); } static KafkaEvent convert(final OsvMirrorEvent event) { final String key = Vulnerability.Source.OSV.name(); final String value = event.ecosystem(); - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, value); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, value); } static KafkaEvent convert(final EpssMirrorEvent ignored) { - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, "EPSS", null); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, "EPSS", null); } private static Topic extractDestinationTopic(final Notification notification) { return switch (notification.getGroup()) { - case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER; - case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED, GROUP_BOM_VALIDATION_FAILED -> KafkaTopics.NOTIFICATION_BOM; - case GROUP_CONFIGURATION -> KafkaTopics.NOTIFICATION_CONFIGURATION; - case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING; - case GROUP_FILE_SYSTEM -> KafkaTopics.NOTIFICATION_FILE_SYSTEM; - case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION; - case GROUP_NEW_VULNERABILITY -> KafkaTopics.NOTIFICATION_NEW_VULNERABILITY; - case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; - case GROUP_POLICY_VIOLATION -> KafkaTopics.NOTIFICATION_POLICY_VIOLATION; - case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE; - case GROUP_PROJECT_CREATED -> KafkaTopics.NOTIFICATION_PROJECT_CREATED; - case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; - case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY; - case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX; - case GROUP_USER_CREATED, GROUP_USER_DELETED -> KafkaTopics.NOTIFICATION_USER; + case GROUP_ANALYZER -> KafkaTopics.TOPIC_NOTIFICATION_ANALYZER; + case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED, GROUP_BOM_VALIDATION_FAILED -> KafkaTopics.TOPIC_NOTIFICATION_BOM; + case GROUP_CONFIGURATION -> KafkaTopics.TOPIC_NOTIFICATION_CONFIGURATION; + case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.TOPIC_NOTIFICATION_DATASOURCE_MIRRORING; + case GROUP_FILE_SYSTEM -> KafkaTopics.TOPIC_NOTIFICATION_FILE_SYSTEM; + case GROUP_INTEGRATION -> KafkaTopics.TOPIC_NOTIFICATION_INTEGRATION; + case GROUP_NEW_VULNERABILITY -> KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY; + case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; + case GROUP_POLICY_VIOLATION -> KafkaTopics.TOPIC_NOTIFICATION_POLICY_VIOLATION; + case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE; + case GROUP_PROJECT_CREATED -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED; + case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; + case GROUP_REPOSITORY -> KafkaTopics.TOPIC_NOTIFICATION_REPOSITORY; + case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.TOPIC_NOTIFICATION_VEX; + case GROUP_USER_CREATED, GROUP_USER_DELETED -> KafkaTopics.TOPIC_NOTIFICATION_USER; case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException(""" Unable to determine destination topic because the notification does not \ specify a notification group: %s""".formatted(notification.getGroup())); diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java b/src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java new file mode 100644 index 000000000..04ebe8e17 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java @@ -0,0 +1,177 @@ +/* + * This file is part of Dependency-Track. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) OWASP Foundation. All Rights Reserved. + */ +package org.dependencytrack.event.kafka; + +import alpine.Config; +import alpine.common.logging.Logger; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; + +import jakarta.servlet.ServletContextEvent; +import jakarta.servlet.ServletContextListener; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_ENABLED; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_KAFKA_TOPICS_ENABLED; +import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS; + +/** + * @since 5.6.0 + */ +public class KafkaTopicInitializer implements ServletContextListener { + + private static final Logger LOGGER = Logger.getLogger(KafkaTopicInitializer.class); + + private final Config config = Config.getInstance(); + + @Override + public void contextInitialized(final ServletContextEvent event) { + if (!config.getPropertyAsBoolean(INIT_TASKS_ENABLED)) { + LOGGER.debug("Not initializing Kafka topics because %s is disabled" + .formatted(INIT_TASKS_ENABLED.getPropertyName())); + return; + } + if (!config.getPropertyAsBoolean(INIT_TASKS_KAFKA_TOPICS_ENABLED)) { + LOGGER.debug("Not initializing Kafka topics because %s is disabled" + .formatted(INIT_TASKS_KAFKA_TOPICS_ENABLED.getPropertyName())); + return; + } + + LOGGER.warn("Auto-creating topics with default configuration is not recommended for production deployments"); + + try (final AdminClient adminClient = createAdminClient()) { + final List> topicsToCreate = determineTopicsToCreate(adminClient); + if (topicsToCreate.isEmpty()) { + LOGGER.info("All topics exist already; Nothing to do"); + return; + } + + createTopics(adminClient, topicsToCreate); + LOGGER.info("Successfully created %d topics".formatted(topicsToCreate.size())); + } + } + + private List> determineTopicsToCreate(final AdminClient adminClient) { + final Map> topicByName = KafkaTopics.ALL_TOPICS.stream() + .collect(Collectors.toMap(KafkaTopics.Topic::name, Function.identity())); + + final var topicsToCreate = new ArrayList>(topicByName.size()); + + final var describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(3_000); + final DescribeTopicsResult topicsResult = adminClient.describeTopics(topicByName.keySet(), describeTopicsOptions); + + final var exceptionsByTopicName = new HashMap(); + for (final Map.Entry> entry : topicsResult.topicNameValues().entrySet()) { + final String topicName = entry.getKey(); + try { + entry.getValue().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + LOGGER.debug("Topic %s does not exist and will need to be created".formatted(topicName)); + topicsToCreate.add(topicByName.get(topicName)); + } else { + exceptionsByTopicName.put(topicName, e.getCause()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(""" + Thread was interrupted while waiting for broker response. \ + The existence of topic %s can not be determined.""".formatted(topicName), e); + } + } + + if (!exceptionsByTopicName.isEmpty()) { + final String exceptionSummary = exceptionsByTopicName.entrySet().stream() + .map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "[", "]")); + throw new IllegalStateException("Existence of %d topic(s) could not be verified: %s" + .formatted(exceptionsByTopicName.size(), exceptionSummary)); + } + + return topicsToCreate; + } + + private void createTopics(final AdminClient adminClient, final Collection> topics) { + final List newTopics = topics.stream() + .map(topic -> { + final var newTopic = new NewTopic( + topic.name(), + topic.defaultConfig().partitions(), + topic.defaultConfig().replicationFactor()); + if (topic.defaultConfig().configs() != null) { + return newTopic.configs(topic.defaultConfig().configs()); + } + + return newTopic; + }) + .toList(); + + final var createTopicsOptions = new CreateTopicsOptions().timeoutMs(3_000); + final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics, createTopicsOptions); + + final var exceptionsByTopicName = new HashMap(); + for (final Map.Entry> entry : createTopicsResult.values().entrySet()) { + final String topicName = entry.getKey(); + try { + entry.getValue().get(); + } catch (ExecutionException e) { + exceptionsByTopicName.put(topicName, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(""" + Thread was interrupted while waiting for broker response. \ + Successful creation of topic %s can not be verified.""".formatted(topicName), e); + } + } + + if (!exceptionsByTopicName.isEmpty()) { + final String exceptionSummary = exceptionsByTopicName.entrySet().stream() + .map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "[", "]")); + throw new IllegalStateException("Failed to create %d topic(s): %s" + .formatted(exceptionsByTopicName.size(), exceptionSummary)); + } + } + + private AdminClient createAdminClient() { + final var adminClientConfig = new HashMap(); + adminClientConfig.put(BOOTSTRAP_SERVERS_CONFIG, config.getProperty(KAFKA_BOOTSTRAP_SERVERS)); + adminClientConfig.put(CLIENT_ID_CONFIG, "%s-admin-client".formatted("instanceId")); + + LOGGER.debug("Creating admin client with options %s".formatted(adminClientConfig)); + return AdminClient.create(adminClientConfig); + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java b/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java index cc5260445..1cd56ab80 100644 --- a/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java @@ -18,7 +18,6 @@ */ package org.dependencytrack.event.kafka; -import alpine.Config; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.cyclonedx.proto.v1_6.Bom; @@ -32,67 +31,210 @@ import org.dependencytrack.proto.vulnanalysis.v1.ScanKey; import org.dependencytrack.proto.vulnanalysis.v1.ScanResult; -public final class KafkaTopics { +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; - public static final Topic NOTIFICATION_ANALYZER; - public static final Topic NOTIFICATION_BOM; - public static final Topic NOTIFICATION_CONFIGURATION; - public static final Topic NOTIFICATION_DATASOURCE_MIRRORING; - public static final Topic NOTIFICATION_FILE_SYSTEM; - public static final Topic NOTIFICATION_INTEGRATION; - public static final Topic NOTIFICATION_NEW_VULNERABILITY; - public static final Topic NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; - public static final Topic NOTIFICATION_POLICY_VIOLATION; - public static final Topic NOTIFICATION_PROJECT_AUDIT_CHANGE; - public static final Topic NOTIFICATION_PROJECT_CREATED; - public static final Topic NOTIFICATION_REPOSITORY; - public static final Topic NOTIFICATION_VEX; - public static final Topic NOTIFICATION_USER; - public static final Topic VULNERABILITY_MIRROR_COMMAND; - public static final Topic NEW_VULNERABILITY; - public static final Topic REPO_META_ANALYSIS_COMMAND; - public static final Topic REPO_META_ANALYSIS_RESULT; - public static final Topic VULN_ANALYSIS_COMMAND; - public static final Topic VULN_ANALYSIS_RESULT; - public static final Topic VULN_ANALYSIS_RESULT_PROCESSED; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; - public static final Topic NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; - public static final Topic NEW_EPSS; - private static final Serde NOTIFICATION_SERDE = new KafkaProtobufSerde<>(Notification.parser()); +public final class KafkaTopics { - static { - NOTIFICATION_ANALYZER = new Topic<>("dtrack.notification.analyzer", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_BOM = new Topic<>("dtrack.notification.bom", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_CONFIGURATION = new Topic<>("dtrack.notification.configuration", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_DATASOURCE_MIRRORING = new Topic<>("dtrack.notification.datasource-mirroring", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_FILE_SYSTEM = new Topic<>("dtrack.notification.file-system", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_INTEGRATION = new Topic<>("dtrack.notification.integration", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_NEW_VULNERABILITY = new Topic<>("dtrack.notification.new-vulnerability", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_NEW_VULNERABLE_DEPENDENCY = new Topic<>("dtrack.notification.new-vulnerable-dependency", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_POLICY_VIOLATION = new Topic<>("dtrack.notification.policy-violation", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_PROJECT_AUDIT_CHANGE = new Topic<>("dtrack.notification.project-audit-change", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_PROJECT_CREATED = new Topic<>("dtrack.notification.project-created", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_REPOSITORY = new Topic<>("dtrack.notification.repository", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_VEX = new Topic<>("dtrack.notification.vex", Serdes.String(), NOTIFICATION_SERDE); - VULNERABILITY_MIRROR_COMMAND = new Topic<>("dtrack.vulnerability.mirror.command", Serdes.String(), Serdes.String()); - NEW_VULNERABILITY = new Topic<>("dtrack.vulnerability", Serdes.String(), new KafkaProtobufSerde<>(Bom.parser())); - REPO_META_ANALYSIS_COMMAND = new Topic<>("dtrack.repo-meta-analysis.component", Serdes.String(), new KafkaProtobufSerde<>(AnalysisCommand.parser())); - REPO_META_ANALYSIS_RESULT = new Topic<>("dtrack.repo-meta-analysis.result", Serdes.String(), new KafkaProtobufSerde<>(AnalysisResult.parser())); - VULN_ANALYSIS_COMMAND = new Topic<>("dtrack.vuln-analysis.component", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanCommand.parser())); - VULN_ANALYSIS_RESULT = new Topic<>("dtrack.vuln-analysis.result", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanResult.parser())); - VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>("dtrack.vuln-analysis.result.processed", Serdes.String(), new KafkaProtobufSerde<>(ScanResult.parser())); - NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>("dtrack.notification.project-vuln-analysis-complete", Serdes.String(), NOTIFICATION_SERDE); - NEW_EPSS = new Topic<>("dtrack.epss", Serdes.String(), new KafkaProtobufSerde<>(EpssItem.parser())); - NOTIFICATION_USER = new Topic<>("dtrack.notification.user", Serdes.String(), NOTIFICATION_SERDE); - } + public record Topic( + String name, + Serde keySerde, + Serde valueSerde, + Config defaultConfig) { - public record Topic(String name, Serde keySerde, Serde valueSerde) { + /** + * @since 5.6.0 + */ + public record Config( + int partitions, + short replicationFactor, + Map configs) { + } @Override public String name() { - return Config.getInstance().getProperty(ConfigKey.DT_KAFKA_TOPIC_PREFIX) + name; + return alpine.Config.getInstance().getProperty(ConfigKey.DT_KAFKA_TOPIC_PREFIX) + name; } } + public static final Topic TOPIC_EPSS; + public static final Topic TOPIC_NOTIFICATION_ANALYZER; + public static final Topic TOPIC_NOTIFICATION_BOM; + public static final Topic TOPIC_NOTIFICATION_CONFIGURATION; + public static final Topic TOPIC_NOTIFICATION_DATASOURCE_MIRRORING; + public static final Topic TOPIC_NOTIFICATION_FILE_SYSTEM; + public static final Topic TOPIC_NOTIFICATION_INTEGRATION; + public static final Topic TOPIC_NOTIFICATION_NEW_VULNERABILITY; + public static final Topic TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; + public static final Topic TOPIC_NOTIFICATION_POLICY_VIOLATION; + public static final Topic TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE; + public static final Topic TOPIC_NOTIFICATION_PROJECT_CREATED; + public static final Topic TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; + public static final Topic TOPIC_NOTIFICATION_REPOSITORY; + public static final Topic TOPIC_NOTIFICATION_VEX; + public static final Topic TOPIC_NOTIFICATION_USER; + public static final Topic TOPIC_REPO_META_ANALYSIS_COMMAND; + public static final Topic TOPIC_REPO_META_ANALYSIS_RESULT; + public static final Topic TOPIC_VULN_ANALYSIS_COMMAND; + public static final Topic TOPIC_VULN_ANALYSIS_RESULT; + public static final Topic TOPIC_VULN_ANALYSIS_RESULT_PROCESSED; + public static final Topic TOPIC_VULNERABILITY; + public static final Topic TOPIC_VULNERABILITY_MIRROR_COMMAND; + public static final List> ALL_TOPICS; + + private static final String DEFAULT_RETENTION_MS = String.valueOf(TimeUnit.HOURS.toMillis(12)); + private static final Serde NOTIFICATION_PROTO_SERDE = new KafkaProtobufSerde<>(Notification.parser()); + + + static { + // TODO: Provide a way to (partially) overwrite the default configs. + + TOPIC_EPSS = new Topic<>( + "dtrack.epss", + Serdes.String(), + new KafkaProtobufSerde<>(EpssItem.parser()), + new Topic.Config(3, (short) 1, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT))); + TOPIC_NOTIFICATION_ANALYZER = new Topic<>( + "dtrack.notification.analyzer", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_BOM = new Topic<>( + "dtrack.notification.bom", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_CONFIGURATION = new Topic<>( + "dtrack.notification.configuration", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_DATASOURCE_MIRRORING = new Topic<>( + "dtrack.notification.datasource-mirroring", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_FILE_SYSTEM = new Topic<>( + "dtrack.notification.file-system", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_INTEGRATION = new Topic<>( + "dtrack.notification.integration", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_NEW_VULNERABILITY = new Topic<>( + "dtrack.notification.new-vulnerability", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY = new Topic<>( + "dtrack.notification.new-vulnerable-dependency", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_POLICY_VIOLATION = new Topic<>( + "dtrack.notification.policy-violation", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE = new Topic<>( + "dtrack.notification.project-audit-change", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_PROJECT_CREATED = new Topic<>( + "dtrack.notification.project-created", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>( + "dtrack.notification.project-vuln-analysis-complete", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_REPOSITORY = new Topic<>( + "dtrack.notification.repository", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_USER = new Topic<>( + "dtrack.notification.user", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_VEX = new Topic<>( + "dtrack.notification.vex", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_REPO_META_ANALYSIS_COMMAND = new Topic<>( + "dtrack.repo-meta-analysis.component", + Serdes.String(), + new KafkaProtobufSerde<>(AnalysisCommand.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_REPO_META_ANALYSIS_RESULT = new Topic<>( + "dtrack.repo-meta-analysis.result", + Serdes.String(), + new KafkaProtobufSerde<>(AnalysisResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_COMMAND = new Topic<>( + "dtrack.vuln-analysis.component", + new KafkaProtobufSerde<>(ScanKey.parser()), + new KafkaProtobufSerde<>(ScanCommand.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_RESULT = new Topic<>( + "dtrack.vuln-analysis.result", + new KafkaProtobufSerde<>(ScanKey.parser()), + new KafkaProtobufSerde<>(ScanResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>( + "dtrack.vuln-analysis.result.processed", + Serdes.String(), + new KafkaProtobufSerde<>(ScanResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULNERABILITY = new Topic<>( + "dtrack.vulnerability", + Serdes.String(), + new KafkaProtobufSerde<>(Bom.parser()), + new Topic.Config(1, (short) 1, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT))); + TOPIC_VULNERABILITY_MIRROR_COMMAND = new Topic<>( + "dtrack.vulnerability.mirror.command", + Serdes.String(), + Serdes.String(), + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + + ALL_TOPICS = List.of( + TOPIC_EPSS, + TOPIC_NOTIFICATION_ANALYZER, + TOPIC_NOTIFICATION_BOM, + TOPIC_NOTIFICATION_CONFIGURATION, + TOPIC_NOTIFICATION_DATASOURCE_MIRRORING, + TOPIC_NOTIFICATION_FILE_SYSTEM, + TOPIC_NOTIFICATION_INTEGRATION, + TOPIC_NOTIFICATION_NEW_VULNERABILITY, + TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY, + TOPIC_NOTIFICATION_POLICY_VIOLATION, + TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, + TOPIC_NOTIFICATION_PROJECT_CREATED, + TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, + TOPIC_NOTIFICATION_REPOSITORY, + TOPIC_NOTIFICATION_VEX, + TOPIC_NOTIFICATION_USER, + TOPIC_REPO_META_ANALYSIS_COMMAND, + TOPIC_REPO_META_ANALYSIS_RESULT, + TOPIC_VULN_ANALYSIS_COMMAND, + TOPIC_VULN_ANALYSIS_RESULT, + TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, + TOPIC_VULNERABILITY, + TOPIC_VULNERABILITY_MIRROR_COMMAND); + } + } diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java index f96819fad..e84b45a44 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java @@ -36,15 +36,15 @@ public void contextInitialized(final ServletContextEvent event) { LOGGER.info("Initializing processors"); PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME, - KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor()); + KafkaTopics.TOPIC_VULNERABILITY, new VulnerabilityMirrorProcessor()); PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME, - KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor()); + KafkaTopics.TOPIC_REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor()); PROCESSOR_MANAGER.registerBatchProcessor(EpssMirrorProcessor.PROCESSOR_NAME, - KafkaTopics.NEW_EPSS, new EpssMirrorProcessor()); + KafkaTopics.TOPIC_EPSS, new EpssMirrorProcessor()); PROCESSOR_MANAGER.registerProcessor(VulnerabilityScanResultProcessor.PROCESSOR_NAME, - KafkaTopics.VULN_ANALYSIS_RESULT, new VulnerabilityScanResultProcessor()); + KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT, new VulnerabilityScanResultProcessor()); PROCESSOR_MANAGER.registerBatchProcessor(ProcessedVulnerabilityScanResultProcessor.PROCESSOR_NAME, - KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, new ProcessedVulnerabilityScanResultProcessor()); + KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, new ProcessedVulnerabilityScanResultProcessor()); PROCESSOR_MANAGER.startAll(); } diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java index e0a17ec63..573b2c4e7 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java @@ -686,7 +686,7 @@ private void maybeQueueResultProcessedEvent(final ScanKey scanKey, final ScanRes .toList()) .build(); - final var event = new KafkaEvent<>(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, scanKey.getScanToken(), strippedScanResult); + final var event = new KafkaEvent<>(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, scanKey.getScanToken(), strippedScanResult); eventsToDispatch.get().add(event); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index a45e9cf85..1cb446856 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1277,12 +1277,22 @@ vulnerability.policy.s3.region= #
    #
  • Execution of database migrations
  • #
  • Populating the database with default objects (permissions, users, licenses, etc.)
  • +#
  • Creating Kafka topics with default configuration
  • #
# # @category: General # @type: boolean init.tasks.enabled=true +# Whether to initialize (i.e. create, with default configuration) Kafka topics on startup. +# This option is intended for testing and should not be used for production deployments, +# since production-grade topic configurations will differ vastly. +# Has no effect unless init.tasks.enabled is `true`. +# +# @category: General +# @type: boolean +init.tasks.kafka.topics.enabled=false + # Whether to only execute initialization tasks and exit. # # @category: General diff --git a/src/main/webapp/WEB-INF/web.xml b/src/main/webapp/WEB-INF/web.xml index 17e64e4bf..96d73b50b 100644 --- a/src/main/webapp/WEB-INF/web.xml +++ b/src/main/webapp/WEB-INF/web.xml @@ -35,6 +35,9 @@ org.dependencytrack.persistence.migration.MigrationInitializer + + org.dependencytrack.event.kafka.KafkaTopicInitializer + alpine.server.persistence.PersistenceManagerFactory diff --git a/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java b/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java index 0c2b3bf6b..167f8c39e 100644 --- a/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java @@ -72,7 +72,7 @@ public void testDispatchEventWithComponentRepositoryMetaAnalysisEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("pkg:maven/foo/bar@1.2.3"); assertThat(record.value()).isNotNull(); assertThat(record.headers()).isEmpty(); @@ -89,7 +89,7 @@ public void testDispatchEventWithComponentVulnerabilityAnalysisEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); assertThat(record.key()).isNotNull(); assertThat(record.value()).isNotNull(); assertThat(record.headers()).satisfiesExactlyInAnyOrder( @@ -112,7 +112,7 @@ public void testDispatchEventWithGitHubAdvisoryMirrorEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULNERABILITY_MIRROR_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("GITHUB"); assertThat(record.value()).isNull(); assertThat(record.headers()).isEmpty(); @@ -126,7 +126,7 @@ public void testDispatchEventWithNistMirrorEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULNERABILITY_MIRROR_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("NVD"); assertThat(record.value()).isNull(); assertThat(record.headers()).isEmpty(); @@ -140,7 +140,7 @@ public void testDispatchEventWithOsvMirrorEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULNERABILITY_MIRROR_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("OSV"); assertThat(record.value()).asString().isEqualTo("Maven"); assertThat(record.headers()).isEmpty(); @@ -179,7 +179,7 @@ public void testDispatchNotification() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_ANALYZER.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_ANALYZER.name()); assertThat(record.key()).isNull(); assertThat(record.value()).isNotNull(); assertThat(record.headers()).isEmpty(); diff --git a/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java b/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java index 3d211df55..85b5db0a5 100644 --- a/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java @@ -32,12 +32,12 @@ public class KafkaTopicsTest { @Test public void testTopicNameWithPrefix() { environmentVariables.set("DT_KAFKA_TOPIC_PREFIX", "foo-bar.baz."); - assertThat(KafkaTopics.VULN_ANALYSIS_RESULT.name()).isEqualTo("foo-bar.baz.dtrack.vuln-analysis.result"); + assertThat(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT.name()).isEqualTo("foo-bar.baz.dtrack.vuln-analysis.result"); } @Test public void testTopicNameWithoutPrefix() { - assertThat(KafkaTopics.VULN_ANALYSIS_RESULT.name()).isEqualTo("dtrack.vuln-analysis.result"); + assertThat(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT.name()).isEqualTo("dtrack.vuln-analysis.result"); } } \ No newline at end of file diff --git a/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java b/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java index c888bfab5..d27b31612 100644 --- a/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java @@ -53,8 +53,8 @@ public void testHandleIntegrityComponentNotInDB() throws MalformedPackageURLExce IntegrityMetaComponent result = handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/org.http4s/blaze-core_2.12"); assertThat(command.getComponent().getUuid()).isEqualTo(uuid.toString()); assertThat(command.getComponent().getInternal()).isFalse(); @@ -81,8 +81,8 @@ public void testHandleIntegrityComponentInDBForMoreThanAnHour() throws Malformed IntegrityMetaComponent integrityMetaComponent = handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/org.http4s/blaze-core_2.12"); assertThat(command.getComponent().getUuid()).isEqualTo(uuid.toString()); assertThat(command.getComponent().getInternal()).isFalse(); @@ -111,8 +111,8 @@ public void testHandleIntegrityWhenMetadataExists() throws MalformedPackageURLEx IntegrityMetaComponent integrityMetaComponent = handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/org.http4s/blaze-core_2.12"); assertThat(command.getComponent().getUuid()).isEqualTo(uuid.toString()); assertThat(command.getComponent().getInternal()).isFalse(); diff --git a/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java b/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java index 8d54704e3..9b1f94a86 100644 --- a/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java @@ -46,8 +46,8 @@ public void testHandleComponentInDb() throws MalformedPackageURLException { handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:golang/foo/bar@baz"); assertThat(command.getComponent().getInternal()).isFalse(); assertThat(command.getFetchMeta()).isEqualTo(FetchMeta.FETCH_META_LATEST_VERSION); diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java index d92e7a97c..44440157d 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java @@ -153,12 +153,12 @@ public void testProcessWithFailureThresholdExceeded() throws Exception { ); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_VULN_ANALYSIS_COMPLETE); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -225,12 +225,12 @@ public void testProcessWithResultWithoutScannerResults() throws Exception { ); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_VULN_ANALYSIS_COMPLETE); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -295,14 +295,14 @@ public void testProcessWithDelayedBomProcessedNotification() throws Exception { processor.process(List.of(aConsumerRecord(vulnScan.getToken().toString(), scanResult).build())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_BOM, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSED); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -358,14 +358,14 @@ public void testProcessWithDelayedBomProcessedNotificationWhenVulnerabilityScanF processor.process(List.of(aConsumerRecord(vulnScan.getToken().toString(), scanResult).build())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_BOM, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSED); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -412,7 +412,7 @@ public void testProcessWithDelayedBomProcessedNotificationWithoutCompletedBomPro processor.process(List.of(aConsumerRecord(vulnScan.getToken().toString(), scanResult).build())); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())); await("Internal event publish") .atMost(Duration.ofSeconds(1)) diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java index 72d619691..c7b29abc7 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java @@ -146,8 +146,8 @@ public void dropFailedScanResultTest() { assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_ANALYZER.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_ANALYZER, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_ANALYZER.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_ANALYZER, record); assertThat(notification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(notification.getLevel()).isEqualTo(LEVEL_ERROR); assertThat(notification.getGroup()).isEqualTo(GROUP_ANALYZER); @@ -157,10 +157,10 @@ record -> { component.getUuid(), SCANNER_INTERNAL, scanToken + "/" + component.getUuid()); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); } ); @@ -191,10 +191,10 @@ public void dropPendingScanResultTest() { processor.process(aConsumerRecord(scanKey, scanResult).build()); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -218,10 +218,10 @@ public void processSuccessfulScanResultWhenComponentDoesNotExistTest() { processor.process(aConsumerRecord(scanKey, scanResult).build()); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -300,15 +300,15 @@ public void processSuccessfulScanResult() { assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABLE_DEPENDENCY); @@ -329,8 +329,8 @@ record -> { ); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABILITY); @@ -340,8 +340,8 @@ record -> { assertThat(subject.getVulnerabilityAnalysisLevel()).isEqualTo("BOM_UPLOAD_ANALYSIS"); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABILITY); @@ -400,10 +400,10 @@ public void processSuccessfulScanResultWithExistingFindingTest() { // Because the vulnerability was reported already, no notification must be sent. assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -712,15 +712,15 @@ public void analysisThroughPolicyNewAnalysisTest() { // TODO: There should be PROJECT_AUDIT_CHANGE notifications. assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABILITY); @@ -810,10 +810,10 @@ public void analysisThroughPolicyNewAnalysisSuppressionTest() { // The vulnerability was suppressed, so no notifications to be expected. // TODO: There should be PROJECT_AUDIT_CHANGE notifications. assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -925,15 +925,15 @@ public void analysisThroughPolicyExistingDifferentAnalysisTest() { // There should be PROJECT_AUDIT_CHANGE notification. assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -1030,10 +1030,10 @@ public void analysisThroughPolicyExistingEqualAnalysisTest() { // The vulnerability already existed, so no notifications to be expected. assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -1388,7 +1388,7 @@ public void analysisThroughPolicyWithAnalysisUpdateNotOnStateOrSuppressionTest() assertThat(analysis.getAnalysisDetails()).isEqualTo("newDetails"); assertThat(kafkaMockProducer.history()).noneSatisfy( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name())); + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE.name())); } @Test diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java index c1fa0eb76..347117ee5 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java @@ -87,7 +87,7 @@ public void tearDown() { @Test public void testSingleRecordProcessor() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final var recordsProcessed = new AtomicInteger(0); @@ -116,7 +116,7 @@ record -> recordsProcessed.incrementAndGet(); @Test public void testSingleRecordProcessorRetry() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final var attemptsCounter = new AtomicInteger(0); @@ -153,7 +153,7 @@ public void testSingleRecordProcessorRetry() throws Exception { @Test public void testBatchProcessor() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final var recordsProcessed = new AtomicInteger(0); @@ -187,7 +187,7 @@ public void testBatchProcessor() throws Exception { @Test public void testWithMaxConcurrencyMatchingPartitionCount() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 12, (short) 1))).all().get(); environmentVariables.set("KAFKA_PROCESSOR_FOO_PROCESSING_ORDER", "partition"); @@ -218,8 +218,8 @@ public void testWithMaxConcurrencyMatchingPartitionCount() throws Exception { @Test public void testStartAllWithMissingTopics() throws Exception { - final var inputTopicA = new Topic<>("input-a", Serdes.String(), Serdes.String()); - final var inputTopicB = new Topic<>("input-b", Serdes.String(), Serdes.String()); + final var inputTopicA = new Topic<>("input-a", Serdes.String(), Serdes.String(), /* defaultConfig */ null); + final var inputTopicB = new Topic<>("input-b", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopicA.name(), 3, (short) 1))).all().get(); final Processor processor = record -> { @@ -240,7 +240,7 @@ public void testStartAllWithMissingTopics() throws Exception { @Test public void testProbeHealth() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final Processor processor = record -> { diff --git a/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java index 112e18421..52f8b4f60 100644 --- a/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java @@ -335,9 +335,9 @@ public void updateAnalysisCreateNewTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isTrue(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -396,9 +396,9 @@ public void updateAnalysisCreateNewWithUserTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isTrue(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -446,9 +446,9 @@ public void updateAnalysisCreateNewWithEmptyRequestTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isFalse(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -524,9 +524,9 @@ public void updateAnalysisUpdateExistingTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isFalse(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -641,9 +641,9 @@ public void updateAnalysisUpdateExistingWithEmptyRequestTest() throws Exception .hasFieldOrPropertyWithValue("commenter", Json.createValue("Test Users")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -811,9 +811,9 @@ public void updateAnalysisIssue1409Test() throws InterruptedException { assertThat(responseJson.getBoolean("isSuppressed")).isFalse(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); diff --git a/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java index d5f323e90..0f1501647 100644 --- a/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java @@ -1196,7 +1196,7 @@ public void uploadBomInvalidJsonTest() throws InterruptedException { """); assertThat(kafkaMockProducer.history()).hasSize(1); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); AssertionsForClassTypes.assertThat(userNotification).isNotNull(); AssertionsForClassTypes.assertThat(userNotification.getScope()).isEqualTo(SCOPE_PORTFOLIO); AssertionsForClassTypes.assertThat(userNotification.getGroup()).isEqualTo(GROUP_BOM_VALIDATION_FAILED); @@ -1250,7 +1250,7 @@ public void uploadBomInvalidXmlTest() throws InterruptedException { """); assertThat(kafkaMockProducer.history()).hasSize(1); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); AssertionsForClassTypes.assertThat(userNotification).isNotNull(); AssertionsForClassTypes.assertThat(userNotification.getScope()).isEqualTo(SCOPE_PORTFOLIO); AssertionsForClassTypes.assertThat(userNotification.getGroup()).isEqualTo(GROUP_BOM_VALIDATION_FAILED); diff --git a/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java index d0863a1b9..aec0e110d 100644 --- a/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java @@ -644,15 +644,15 @@ public void createComponentTest() { Assert.assertEquals("SampleAuthor" ,json.getJsonArray("authors").getJsonObject(0).getString("name")); Assert.assertTrue(UuidUtil.isValidUUID(json.getString("uuid"))); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo(json.getString("purl")); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(json.getString("uuid")); } ); @@ -725,15 +725,15 @@ public void updateComponentTest() { Assert.assertEquals("Test component", json.getString("description")); Assert.assertEquals(1, json.getJsonArray("externalReferences").size()); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo(json.getString("purl")); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(json.getString("uuid")); } ); diff --git a/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java index 6f57aaa49..04378a646 100644 --- a/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java @@ -1477,7 +1477,7 @@ public void createProjectTest() throws Exception { Assert.assertTrue(UuidUtil.isValidUUID(json.getString("uuid"))); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); assertThat(projectNotification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(projectNotification.getGroup()).isEqualTo(GROUP_PROJECT_CREATED); diff --git a/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java b/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java index 19b844be2..271b28a3c 100644 --- a/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java @@ -229,7 +229,7 @@ public void createLdapUserTest() throws InterruptedException { Assert.assertEquals("blackbeard", json.getString("username")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_CREATED); @@ -277,7 +277,7 @@ public void deleteLdapUserTest() throws InterruptedException { Assert.assertEquals(204, response.getStatus(), 0); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_DELETED); @@ -306,7 +306,7 @@ public void createManagedUserTest() throws InterruptedException { Assert.assertEquals("blackbeard", json.getString("username")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_CREATED); @@ -512,7 +512,7 @@ public void deleteManagedUserTest() throws InterruptedException { Assert.assertEquals(204, response.getStatus(), 0); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_DELETED); @@ -535,7 +535,7 @@ public void createOidcUserTest() throws InterruptedException { Assert.assertEquals("blackbeard", json.getString("username")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_CREATED); diff --git a/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java index 2c35255d5..c5d759bd5 100644 --- a/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java @@ -215,9 +215,9 @@ public void updateAnalysisCreateNewTest() throws Exception { .doesNotContainKey("commenter"); // Not set when authenticating via API key; assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -265,9 +265,9 @@ public void updateAnalysisCreateNewWithEmptyRequestTest() throws Exception { assertThat(jsonObject.getJsonArray("analysisComments")).isEmpty(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -333,9 +333,9 @@ public void updateAnalysisUpdateExistingTest() throws Exception { .doesNotContainKey("commenter"); // Not set when authenticating via API key assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -441,9 +441,9 @@ public void updateAnalysisUpdateExistingWithEmptyRequestTest() throws Exception .doesNotContainKey("commenter"); // Not set when authenticating via API key assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); diff --git a/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java b/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java index b02edd317..990fb9643 100644 --- a/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java @@ -113,11 +113,11 @@ public void informTest() throws Exception { new BomUploadProcessingTask().inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) ); qm.getPersistenceManager().refresh(project); qm.getPersistenceManager().refreshAll(qm.getAllWorkflowStatesForAToken(bomUploadEvent.getChainIdentifier())); @@ -271,11 +271,11 @@ public void informTestWithComponentAlreadyExistsForIntegrityCheck() throws Excep new BomUploadProcessingTask().inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) ); qm.getPersistenceManager().refresh(project); qm.getPersistenceManager().refreshAll(qm.getAllWorkflowStatesForAToken(bomUploadEvent.getChainIdentifier())); @@ -354,9 +354,9 @@ public void informWithEmptyBomTest() throws Exception { new BomUploadProcessingTask().inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()) ); qm.getPersistenceManager().refreshAll(qm.getAllWorkflowStatesForAToken(bomUploadEvent.getChainIdentifier())); qm.getPersistenceManager().refresh(project); @@ -412,10 +412,10 @@ public void informWithInvalidBomTest() throws Exception { new BomUploadProcessingTask().inform(bomUploadEvent); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSING_FAILED); assertThat(notification.getLevel()).isEqualTo(LEVEL_ERROR); @@ -537,20 +537,20 @@ public void informWithBloatedBomTest() throws Exception { assertThat(kafkaMockProducer.history()) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_PROCESSED); }) .noneSatisfy(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSING_FAILED); }); @@ -606,14 +606,14 @@ public void informWithBloatedBomTest() throws Exception { // Verify that all vulnerability analysis commands have been sent. final long vulnAnalysisCommandsSent = kafkaMockProducer.history().stream() .map(ProducerRecord::topic) - .filter(KafkaTopics.VULN_ANALYSIS_COMMAND.name()::equals) + .filter(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()::equals) .count(); assertThat(vulnAnalysisCommandsSent).isEqualTo(9056); // Verify that all repository meta analysis commands have been sent. final long repoMetaAnalysisCommandsSent = kafkaMockProducer.history().stream() .map(ProducerRecord::topic) - .filter(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()::equals) + .filter(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()::equals) .count(); assertThat(repoMetaAnalysisCommandsSent).isEqualTo(9056); } @@ -764,20 +764,20 @@ public void informWithComponentsUnderMetadataBomTest() throws Exception { assertThat(kafkaMockProducer.history()) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_PROCESSED); }) .noneSatisfy(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSING_FAILED); }); @@ -812,14 +812,14 @@ public void informWithDelayedBomProcessedNotification() throws Exception { new BomUploadProcessingTask(new KafkaEventDispatcher(), /* delayBomProcessedNotification */ true).inform(bomUploadEvent); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }, - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) // BOM_PROCESSED notification should not have been sent. ); } @@ -834,15 +834,15 @@ public void informWithDelayedBomProcessedNotificationAndNoComponents() throws Ex new BomUploadProcessingTask(new KafkaEventDispatcher(), /* delayBomProcessedNotification */ true).inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }, event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_PROCESSED); } ); @@ -858,10 +858,10 @@ public void informWithComponentWithoutPurl() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) // (No REPO_META_ANALYSIS_COMMAND event because the component doesn't have a PURL) ); @@ -883,12 +883,12 @@ public void informWithCustomLicenseResolutionTest() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly( @@ -921,10 +921,10 @@ public void informWithBomContainingLicenseExpressionTest() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly(component -> { @@ -949,10 +949,10 @@ public void informWithBomContainingLicenseExpressionWithSingleIdTest() throws Ex assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly(component -> { @@ -973,10 +973,10 @@ public void informWithBomContainingInvalidLicenseExpressionTest() throws Excepti assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly(component -> { @@ -1210,10 +1210,10 @@ public void informWithBomContainingServiceTest() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).isNotEmpty(); @@ -1669,14 +1669,14 @@ public void informWithEmptyComponentAndServiceNameTest() throws Exception { private void assertBomProcessedNotification() throws Exception { try { assertThat(kafkaMockProducer.history()).anySatisfy(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSED); }); } catch (AssertionError e) { final Optional optionalNotification = kafkaMockProducer.history().stream() - .filter(record -> record.topic().equals(KafkaTopics.NOTIFICATION_BOM.name())) - .map(record -> deserializeValue(KafkaTopics.NOTIFICATION_BOM, record)) + .filter(record -> record.topic().equals(KafkaTopics.TOPIC_NOTIFICATION_BOM.name())) + .map(record -> deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)) .filter(notification -> notification.getGroup() == GROUP_BOM_PROCESSING_FAILED) .findAny(); if (optionalNotification.isEmpty()) { diff --git a/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java b/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java index a9da28052..35814551e 100644 --- a/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java @@ -64,7 +64,7 @@ public void testIntegrityMetaInitializer() { new IntegrityMetaInitializerTask().inform(new IntegrityMetaInitializerEvent()); assertThat(qm.getIntegrityMetaComponentCount()).isEqualTo(1); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) ); } diff --git a/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java b/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java index cd52caaac..3b567877a 100644 --- a/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java @@ -88,14 +88,14 @@ public void testPortfolioRepositoryMetaAnalysis() { new RepositoryMetaAnalysisTask().inform(new PortfolioRepositoryMetaAnalysisEvent()); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectA - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectB - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectC - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectD - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectE + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectA + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectB + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectC + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectD + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectE record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isFalse(); }, @@ -103,8 +103,8 @@ record -> { // componentProjectC must not have been submitted, because it belongs to an inactive project // componentProjectD has the same PURL coordinates as componentProjectA and is not submitted again record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isTrue(); } @@ -159,23 +159,23 @@ public void testProjectRepositoryMetaAnalysis() { new RepositoryMetaAnalysisTask().inform(new ProjectRepositoryMetaAnalysisEvent(project.getUuid())); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isFalse(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isTrue(); }, // componentB must not have been submitted, because it does not have a PURL record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-c@3.0.1"); assertThat(command.getComponent().getInternal()).isFalse(); } @@ -195,7 +195,7 @@ public void testProjectRepositoryMetaAnalysisWithInactiveProject() { new RepositoryMetaAnalysisTask().inform(new ProjectRepositoryMetaAnalysisEvent(project.getUuid())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()) // Component of inactive project must not have been submitted for analysis ); } diff --git a/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java b/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java index 97995e419..98ec39618 100644 --- a/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java @@ -69,12 +69,12 @@ public void testPortfolioVulnerabilityAnalysis() { new VulnerabilityAnalysisTask().inform(new PortfolioVulnerabilityAnalysisEvent()); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentProjectB.getUuid().toString()); assertThat(command.getComponent().getCpe()).isEqualTo(componentProjectB.getCpe()); assertThat(command.getComponent().hasPurl()).isFalse(); @@ -82,8 +82,8 @@ record -> { assertThat(command.getComponent().getInternal()).isTrue(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentProjectA.getUuid().toString()); assertThat(command.getComponent().hasCpe()).isFalse(); assertThat(command.getComponent().getPurl()).isEqualTo(componentProjectA.getPurl().toString()); @@ -134,10 +134,10 @@ public void testProjectVulnerabilityAnalysis() { assertThat(scan.getUpdatedAt()).isNotNull(); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentC.getUuid().toString()); assertThat(command.getComponent().getCpe()).isEqualTo(componentC.getCpe()); assertThat(command.getComponent().getPurl()).isEqualTo(componentC.getPurl().toString()); @@ -145,8 +145,8 @@ record -> { assertThat(command.getComponent().getInternal()).isFalse(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentB.getUuid().toString()); assertThat(command.getComponent().getCpe()).isEqualTo(componentB.getCpe()); assertThat(command.getComponent().hasPurl()).isFalse(); @@ -154,8 +154,8 @@ record -> { assertThat(command.getComponent().getInternal()).isFalse(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentA.getUuid().toString()); assertThat(command.getComponent().hasCpe()).isFalse(); assertThat(command.getComponent().getPurl()).isEqualTo(componentA.getPurl().toString()); @@ -176,7 +176,7 @@ public void testProjectVulnerabilityAnalysisWithNoComponents() { assertThat(scan).isNull(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()) // Project does not have any components so nothing should've been submitted for analysis ); } @@ -193,7 +193,7 @@ public void testProjectVulnerabilityAnalysisWithInactiveProject() { new VulnerabilityAnalysisTask().inform(new ProjectVulnerabilityAnalysisEvent(project.getUuid())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()) // Component of inactive project must not have been submitted for analysis ); }