Skip to content

Commit

Permalink
Refactor KafkaEventDispatcher
Browse files Browse the repository at this point in the history
* Remove differentiation between `*Async` and `*Blocking` methods of dispatch, make everything asynchronous per default
* Always return one or more `CompletableFuture`, giving callers more control over what to do when the send operation completes successfully, or fails
* When a `dispatch` call is supposed to block until the record has been acknowledged, callers can simply call `CompletableFuture.join()`
* Move all event conversion logic to `KafkaEventConverter`, there should be none of it in `KafkaEventDispatcher`
* Instead of expecting a dedicated key/project UUID parameter when dispatching `Notification`s, infer the key from the `Notification`'s subject
* Dispatches via `NotificationUtil` are replaced with direct calls to `KafkaEventDispatcher`
* In `BomUploadProcessingTask`, dispatch repo meta analysis and vuln analysis events asynchronously, but also wait for all of them to be successfully delivered.

Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Mar 21, 2024
1 parent 771827e commit fe4e680
Show file tree
Hide file tree
Showing 22 changed files with 449 additions and 337 deletions.
7 changes: 2 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
</ciManagement>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>

<!-- Dependency Versions -->
<lib.alpine.version>${project.parent.version}</lib.alpine.version>
<lib.awaitility.version>4.2.0</lib.awaitility.version>
Expand Down Expand Up @@ -568,8 +565,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<source>21</source>
<target>21</target>
<compilerArgs>
<arg>-Xlint:all</arg>
<arg>-Xlint:-processing</arg>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,10 @@

import java.util.Map;

record KafkaEvent<K, V>(Topic<K, V> topic, K key, V value, Map<String, String> headers) {
public record KafkaEvent<K, V>(Topic<K, V> topic, K key, V value, Map<String, String> headers) {

public KafkaEvent(final Topic<K, V> topic, final K key, final V value) {
this(topic, key, value, null);
}

}
155 changes: 135 additions & 20 deletions src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
package org.dependencytrack.event.kafka;

import alpine.event.framework.Event;
import com.google.protobuf.InvalidProtocolBufferException;
import org.dependencytrack.event.ComponentRepositoryMetaAnalysisEvent;
import org.dependencytrack.event.ComponentVulnerabilityAnalysisEvent;
import org.dependencytrack.event.GitHubAdvisoryMirrorEvent;
import org.dependencytrack.event.NistMirrorEvent;
import org.dependencytrack.event.OsvMirrorEvent;
import org.dependencytrack.event.kafka.KafkaTopics.Topic;
import org.dependencytrack.model.Vulnerability;
import org.dependencytrack.parser.dependencytrack.NotificationModelConverter;
import org.dependencytrack.proto.notification.v1.BomConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.BomProcessingFailedSubject;
import org.dependencytrack.proto.notification.v1.NewVulnerabilitySubject;
import org.dependencytrack.proto.notification.v1.NewVulnerableDependencySubject;
import org.dependencytrack.proto.notification.v1.Notification;
import org.dependencytrack.proto.notification.v1.PolicyViolationAnalysisDecisionChangeSubject;
import org.dependencytrack.proto.notification.v1.PolicyViolationSubject;
import org.dependencytrack.proto.notification.v1.Project;
import org.dependencytrack.proto.notification.v1.ProjectVulnAnalysisCompleteSubject;
import org.dependencytrack.proto.notification.v1.VexConsumedOrProcessedSubject;
import org.dependencytrack.proto.notification.v1.VulnerabilityAnalysisDecisionChangeSubject;
import org.dependencytrack.proto.repometaanalysis.v1.AnalysisCommand;
import org.dependencytrack.proto.vulnanalysis.v1.ScanCommand;
import org.dependencytrack.proto.vulnanalysis.v1.ScanKey;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.commons.lang3.ObjectUtils.requireNonEmpty;

/**
* Utility class to convert {@link alpine.event.framework.Event}s and {@link alpine.notification.Notification}s
* to {@link KafkaEvent}s.
Expand All @@ -22,6 +43,44 @@ final class KafkaEventConverter {
private KafkaEventConverter() {
}

static KafkaEvent<?, ?> convert(final Event event) {
return switch (event) {
case ComponentRepositoryMetaAnalysisEvent e -> convert(e);
case ComponentVulnerabilityAnalysisEvent e -> convert(e);
case GitHubAdvisoryMirrorEvent e -> convert(e);
case NistMirrorEvent e -> convert(e);
case OsvMirrorEvent e -> convert(e);
default -> throw new IllegalArgumentException("Unable to convert event " + event);
};
}

static KafkaEvent<?, ?> convert(final alpine.notification.Notification notification) {
final Notification protoNotification = NotificationModelConverter.convert(notification);
return convert(protoNotification);
}

static KafkaEvent<?, ?> convert(final Notification notification) {
final Topic<String, Notification> topic = extractDestinationTopic(notification);

final String recordKey;
try {
recordKey = extractEventKey(notification);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}

return new KafkaEvent<>(topic, recordKey, notification);
}

static List<KafkaEvent<?, ?>> convertAllNotificationProtos(final Collection<Notification> notifications) {
final var kafkaEvents = new ArrayList<KafkaEvent<?, ?>>(notifications.size());
for (final Notification notification : notifications) {
kafkaEvents.add(convert(notification));
}

return kafkaEvents;
}

static KafkaEvent<ScanKey, ScanCommand> convert(final ComponentVulnerabilityAnalysisEvent event) {
final var componentBuilder = org.dependencytrack.proto.vulnanalysis.v1.Component.newBuilder()
.setUuid(event.uuid().toString());
Expand Down Expand Up @@ -55,7 +114,7 @@ static KafkaEvent<String, AnalysisCommand> convert(final ComponentRepositoryMeta
final var componentBuilder = org.dependencytrack.proto.repometaanalysis.v1.Component.newBuilder()
.setPurl(event.purlCoordinates());
Optional.ofNullable(event.internal()).ifPresent(componentBuilder::setInternal);
Optional.ofNullable(event.componentUuid()).map(uuid -> uuid.toString()).ifPresent(componentBuilder::setUuid);
Optional.ofNullable(event.componentUuid()).map(UUID::toString).ifPresent(componentBuilder::setUuid);

final var analysisCommand = AnalysisCommand.newBuilder()
.setComponent(componentBuilder)
Expand All @@ -65,37 +124,93 @@ static KafkaEvent<String, AnalysisCommand> convert(final ComponentRepositoryMeta
return new KafkaEvent<>(KafkaTopics.REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null);
}

static KafkaEvent<String, Notification> convert(final String key, final Notification notification) {
final Topic<String, Notification> topic = switch (notification.getGroup()) {
static KafkaEvent<String, String> convert(final GitHubAdvisoryMirrorEvent ignored) {
final String key = Vulnerability.Source.GITHUB.name();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null);
}

static KafkaEvent<String, String> convert(final NistMirrorEvent ignored) {
final String key = Vulnerability.Source.NVD.name();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null);
}

static KafkaEvent<String, String> convert(final OsvMirrorEvent event) {
final String key = Vulnerability.Source.OSV.name();
final String value = event.ecosystem();
return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, value);
}

private static Topic<String, Notification> extractDestinationTopic(final Notification notification) {
return switch (notification.getGroup()) {
case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER;
case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_CONFIGURATION -> KafkaTopics.NOTIFICATION_CONFIGURATION;
case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING;
case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY;
case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION;
case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER;
case GROUP_BOM_CONSUMED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_BOM_PROCESSED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_FILE_SYSTEM -> KafkaTopics.NOTIFICATION_FILE_SYSTEM;
case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION;
case GROUP_NEW_VULNERABILITY -> KafkaTopics.NOTIFICATION_NEW_VULNERABILITY;
case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY;
case GROUP_POLICY_VIOLATION -> KafkaTopics.NOTIFICATION_POLICY_VIOLATION;
case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE;
case GROUP_PROJECT_CREATED -> KafkaTopics.NOTIFICATION_PROJECT_CREATED;
case GROUP_VEX_CONSUMED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_BOM_PROCESSING_FAILED -> KafkaTopics.NOTIFICATION_BOM;
case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE;
default -> null;
case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY;
case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX;
case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("""
Unable to determine destination topic because the notification does not \
specify a notification group: %s""".formatted(notification.getGroup()));
};
if (topic == null) {
return null;
}

return new KafkaEvent<>(topic, key, notification, null);
}

static KafkaEvent<String, Notification> convert(final UUID projectUuid, final alpine.notification.Notification alpineNotification) {
final Notification notification = NotificationModelConverter.convert(alpineNotification);
return convert(projectUuid != null ? projectUuid.toString() : null, notification);
private static String extractEventKey(final Notification notification) throws InvalidProtocolBufferException {
return switch (notification.getGroup()) {
case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED -> {
final var subject = notification.getSubject().unpack(BomConsumedOrProcessedSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_BOM_PROCESSING_FAILED -> {
final var subject = notification.getSubject().unpack(BomProcessingFailedSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_NEW_VULNERABILITY -> {
final var subject = notification.getSubject().unpack(NewVulnerabilitySubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_NEW_VULNERABLE_DEPENDENCY -> {
final var subject = notification.getSubject().unpack(NewVulnerableDependencySubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_POLICY_VIOLATION -> {
final var subject = notification.getSubject().unpack(PolicyViolationSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_PROJECT_AUDIT_CHANGE -> {
if (notification.getSubject().is(PolicyViolationAnalysisDecisionChangeSubject.class)) {
final var subject = notification.getSubject().unpack(PolicyViolationAnalysisDecisionChangeSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
} else if (notification.getSubject().is(VulnerabilityAnalysisDecisionChangeSubject.class)) {
final var subject = notification.getSubject().unpack(VulnerabilityAnalysisDecisionChangeSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}

throw new IllegalArgumentException("Unexpected notification subject of type %s"
.formatted(notification.getSubject().getTypeUrl()));
}
case GROUP_PROJECT_CREATED -> requireNonEmpty(notification.getSubject().unpack(Project.class).getUuid());
case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> {
final var subject = notification.getSubject().unpack(ProjectVulnAnalysisCompleteSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> {
final var subject = notification.getSubject().unpack(VexConsumedOrProcessedSubject.class);
yield requireNonEmpty(subject.getProject().getUuid());
}
case GROUP_ANALYZER, GROUP_CONFIGURATION, GROUP_DATASOURCE_MIRRORING,
GROUP_FILE_SYSTEM, GROUP_INTEGRATION, GROUP_REPOSITORY -> null;
case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("""
Unable to determine record key because the notification does not \
specify a notification group: %s""".formatted(notification.getGroup()));
};
}

}
Loading

0 comments on commit fe4e680

Please sign in to comment.