Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add temporary feature to delay BOM_PROCESSED notification until vulnerability analysis completes #299

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public enum ConfigKey implements Config.Key {
TASK_PORTFOLIO_VULN_ANALYSIS_LOCK_AT_LEAST_FOR("task.portfolio.vulnAnalysis.lockAtLeastForInMillis", String.valueOf(Duration.ofMinutes(5).toMillis())),
BOM_UPLOAD_PROCESSING_TRX_FLUSH_THRESHOLD("bom.upload.processing.trx.flush.threshold", "10000"),
WORKFLOW_RETENTION_DURATION("workflow.retention.duration", "P3D"),
WORKFLOW_STEP_TIMEOUT_DURATION("workflow.step.timeout.duration", "PT1H");
WORKFLOW_STEP_TIMEOUT_DURATION("workflow.step.timeout.duration", "PT1H"),
TMP_DELAY_BOM_PROCESSED_NOTIFICATION("tmp.delay.bom.processed.notification", "false");

private final String propertyName;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.dependencytrack.event.kafka;

import alpine.Config;
import alpine.common.logging.Logger;
import alpine.event.framework.ChainableEvent;
import alpine.event.framework.Event;
Expand All @@ -14,11 +15,13 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.datanucleus.PropertyNames;
import org.dependencytrack.common.ConfigKey;
import org.dependencytrack.event.ComponentMetricsUpdateEvent;
import org.dependencytrack.event.ComponentPolicyEvaluationEvent;
import org.dependencytrack.event.PortfolioVulnerabilityAnalysisEvent;
import org.dependencytrack.event.ProjectMetricsUpdateEvent;
import org.dependencytrack.event.ProjectPolicyEvaluationEvent;
import org.dependencytrack.event.kafka.processor.DelayedBomProcessedNotificationProcessor;
import org.dependencytrack.event.kafka.processor.MirrorVulnerabilityProcessor;
import org.dependencytrack.event.kafka.processor.RepositoryMetaResultProcessor;
import org.dependencytrack.event.kafka.processor.VulnerabilityScanResultProcessor;
Expand All @@ -44,6 +47,16 @@ class KafkaStreamsTopologyFactory {

private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopologyFactory.class);

private final boolean delayBomProcessedNotification;

public KafkaStreamsTopologyFactory() {
this(Config.getInstance().getPropertyAsBoolean(ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION));
}

KafkaStreamsTopologyFactory(final boolean delayBomProcessedNotification) {
this.delayBomProcessedNotification = delayBomProcessedNotification;
}

Topology createTopology() {
final var streamsBuilder = new StreamsBuilder();

Expand Down Expand Up @@ -113,6 +126,8 @@ Topology createTopology() {
qm.getPersistenceManager().setProperty(PropertyNames.PROPERTY_DETACH_ALL_ON_COMMIT, "true");
vulnScan = qm.updateVulnerabilityScanStatus(vulnScan.getToken(), VulnerabilityScan.Status.FAILED);
vulnScan.setFailureReason("Failure threshold of " + vulnScan.getFailureThreshold() + "% exceeded: " + failureRate + "% of scans failed");
LOGGER.warn("Detected failure of vulnerability scan (token=%s, targetType=%s, targetIdentifier=%s): %s"
.formatted(vulnScan.getToken(), vulnScan.getTargetType(), vulnScan.getTargetIdentifier(), vulnScan.getFailureReason()));
}
}

Expand Down Expand Up @@ -143,9 +158,13 @@ Topology createTopology() {
}
}, Named.as("update_vuln_analysis_workflow_status"));

completedVulnScanStream
final KStream<String, VulnerabilityScan> completedVulnScanWithProjectTargetStream = completedVulnScanStream
.filter((scanToken, vulnScan) -> vulnScan.getTargetType() == VulnerabilityScan.TargetType.PROJECT,
Named.as("filter_vuln_scans_with_project_target"))
Named.as("filter_vuln_scans_with_project_target"));

// For each completed vulnerability scan that targeted a project (opposed to individual components),
// determine its overall status, gather all findings, and emit a PROJECT_VULN_ANALYSIS_COMPLETE notification.
completedVulnScanWithProjectTargetStream
.map((scanToken, vulnScan) -> {
final alpine.notification.Notification alpineNotification;
try {
Expand All @@ -170,6 +189,20 @@ Topology createTopology() {
KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.valueSerde())
.withName("produce_to_%s_topic".formatted(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())));

// When delaying of BOM_PROCESSED notifications is enabled, emit a BOM_PROCESSED notification
// for each completed vulnerability scan that targeted a project. But only do so when the scan is
// part of a workflow that includes a BOM_PROCESSING step with status COMPLETED.
if (delayBomProcessedNotification) {
completedVulnScanStream
.process(DelayedBomProcessedNotificationProcessor::new,
Named.as("tmp_delay_bom_processed_notification_process_completed_vuln_scan"))
.to(KafkaTopics.NOTIFICATION_BOM.name(), Produced
.with(KafkaTopics.NOTIFICATION_BOM.keySerde(), KafkaTopics.NOTIFICATION_BOM.valueSerde())
.withName("tmp_delay_bom_processed_notification_produce_to_%s_topic".formatted(KafkaTopics.NOTIFICATION_BOM.name())));
}

// For each successfully completed vulnerability scan, trigger a policy evaluation and metrics update
// for the targeted entity (project or individual component).
completedVulnScanStream
.filter((scanToken, vulnScan) -> vulnScan.getStatus() != VulnerabilityScan.Status.FAILED,
Named.as("filter_failed_vuln_scans"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import alpine.notification.NotificationLevel;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.dependencytrack.model.Bom;
import org.dependencytrack.model.Project;
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.model.WorkflowStatus;
import org.dependencytrack.model.WorkflowStep;
import org.dependencytrack.notification.NotificationConstants;
import org.dependencytrack.notification.NotificationGroup;
import org.dependencytrack.notification.NotificationScope;
import org.dependencytrack.notification.vo.BomConsumedOrProcessed;
import org.dependencytrack.persistence.QueryManager;
import org.hyades.proto.notification.v1.Notification;

import javax.jdo.Query;
import java.util.UUID;

import static org.dependencytrack.parser.hyades.NotificationModelConverter.convert;

/**
* A {@link Processor} responsible for dispatching {@link NotificationGroup#BOM_PROCESSED} notifications
* upon detection of a completed {@link VulnerabilityScan}.
*/
public class DelayedBomProcessedNotificationProcessor extends ContextualProcessor<String, VulnerabilityScan, String, Notification> {

private static final Logger LOGGER = Logger.getLogger(DelayedBomProcessedNotificationProcessor.class);

@Override
public void process(final Record<String, VulnerabilityScan> record) {
final VulnerabilityScan vulnScan = record.value();

if (vulnScan.getStatus() != VulnerabilityScan.Status.COMPLETED
&& vulnScan.getStatus() != VulnerabilityScan.Status.FAILED) {
LOGGER.warn("Received vulnerability scan with non-terminal status %s; Dropping (token=%s, project=%s)"
.formatted(vulnScan.getStatus(), vulnScan.getToken(), vulnScan.getTargetIdentifier()));
return;
}

final Project project;
try (final var qm = new QueryManager()) {
if (!qm.hasWorkflowStepWithStatus(UUID.fromString(vulnScan.getToken()), WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED)) {
LOGGER.debug("Received completed vulnerability scan, but no %s step exists in this workflow; Dropping (token=%s, project=%s)"
.formatted(WorkflowStep.BOM_PROCESSING, vulnScan.getToken(), vulnScan.getTargetIdentifier()));
return;
}

project = getProject(qm, vulnScan.getTargetIdentifier());
if (project == null) {
LOGGER.warn("Received completed vulnerability scan, but the target project does not exist; Dropping (token=%s, project=%s)"
.formatted(vulnScan.getToken(), vulnScan.getTargetIdentifier()));
return;
}
}

final var alpineNotification = new alpine.notification.Notification()
.scope(NotificationScope.PORTFOLIO)
.group(NotificationGroup.BOM_PROCESSED)
.level(NotificationLevel.INFORMATIONAL)
.title(NotificationConstants.Title.BOM_PROCESSED)
// BOM format and spec version are hardcoded because we don't have this information at this point.
// DT currently only accepts CycloneDX anyway.
.content("A %s BOM was processed".formatted(Bom.Format.CYCLONEDX.getFormatShortName()))
.subject(new BomConsumedOrProcessed(project, /* bom */ "(Omitted)", Bom.Format.CYCLONEDX, "Unknown"));

context().forward(record.withKey(project.getUuid().toString()).withValue(convert(alpineNotification)));
LOGGER.info("Dispatched delayed %s notification (token=%s, project=%s)"
.formatted(NotificationGroup.BOM_PROCESSED, vulnScan.getToken(), vulnScan.getTargetIdentifier()));
}

private static Project getProject(final QueryManager qm, final UUID uuid) {
final Query<Project> projectQuery = qm.getPersistenceManager().newQuery(Project.class);
projectQuery.setFilter("uuid == :uuid");
projectQuery.setParameters(uuid);
projectQuery.getFetchPlan().clearGroups(); // Ensure we're not loading too much bloat.
projectQuery.getFetchPlan().setGroup(Project.FetchGroup.NOTIFICATION.name());
try {
return qm.getPersistenceManager().detachCopy(projectQuery.executeResultUnique(Project.class));
} finally {
projectQuery.closeAll();
}
}

}
9 changes: 9 additions & 0 deletions src/main/java/org/dependencytrack/model/Project.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@
@Persistent(name = "lastInheritedRiskScore"),
@Persistent(name = "uuid")
}),
@FetchGroup(name = "NOTIFICATION", members = {
@Persistent(name = "uuid"),
@Persistent(name = "name"),
@Persistent(name = "version"),
@Persistent(name = "description"),
@Persistent(name = "purl"),
@Persistent(name = "tags")
}),
@FetchGroup(name = "PARENT", members = {
@Persistent(name = "parent")
})
Expand All @@ -116,6 +124,7 @@ public enum FetchGroup {
ALL,
IDENTIFIERS,
METRICS_UPDATE,
NOTIFICATION,
PARENT
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1637,4 +1637,9 @@ public void updateWorkflowStateToComplete(WorkflowState workflowState) {
public void updateWorkflowStateToFailed(WorkflowState workflowState, String failureReason) {
getWorkflowStateQueryManager().updateWorkflowStateToFailed(workflowState, failureReason);
}

public boolean hasWorkflowStepWithStatus(final UUID token, final WorkflowStep step, final WorkflowStatus status) {
return getWorkflowStateQueryManager().hasWorkflowStepWithStatus(token, step, status);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class WorkflowStateQueryManager extends QueryManager implements IQueryManager {
Expand Down Expand Up @@ -311,4 +312,21 @@ public void createWorkflowSteps(UUID token) {
}
}
}

public boolean hasWorkflowStepWithStatus(final UUID token, final WorkflowStep step, final WorkflowStatus status) {
final Query<WorkflowState> stateQuery = pm.newQuery(WorkflowState.class);
stateQuery.setFilter("token == :token && step == :step && status == :status");
stateQuery.setNamedParameters(Map.of(
"token", token,
"step", step,
"status", status
));
stateQuery.setResult("count(this)");
try {
return stateQuery.executeResultUnique(Long.class) > 0;
} finally {
stateQuery.closeAll();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.cyclonedx.model.Dependency;
import org.cyclonedx.parsers.Parser;
import org.datanucleus.flush.FlushMode;
import org.dependencytrack.common.ConfigKey;
import org.dependencytrack.event.BomUploadEvent;
import org.dependencytrack.event.ComponentRepositoryMetaAnalysisEvent;
import org.dependencytrack.event.ComponentVulnerabilityAnalysisEvent;
Expand Down Expand Up @@ -106,13 +107,15 @@ public class BomUploadProcessingTask implements Subscriber {
private static final int FLUSH_THRESHOLD = Config.getInstance().getPropertyAsInt(BOM_UPLOAD_PROCESSING_TRX_FLUSH_THRESHOLD);

private final KafkaEventDispatcher kafkaEventDispatcher;
private final boolean delayBomProcessedNotification;

public BomUploadProcessingTask() {
this(new KafkaEventDispatcher());
this(new KafkaEventDispatcher(), Config.getInstance().getPropertyAsBoolean(ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION));
}

BomUploadProcessingTask(final KafkaEventDispatcher kafkaEventDispatcher) {
BomUploadProcessingTask(final KafkaEventDispatcher kafkaEventDispatcher, final boolean delayBomProcessedNotification) {
this.kafkaEventDispatcher = kafkaEventDispatcher;
this.delayBomProcessedNotification = delayBomProcessedNotification;
}

/**
Expand All @@ -126,14 +129,14 @@ public void inform(final Event e) {
processBom(ctx, event.getFile());
LOGGER.info("BOM processed successfully (%s)".formatted(ctx));
updateState(ctx, WorkflowStep.BOM_PROCESSING, WorkflowStatus.COMPLETED);
kafkaEventDispatcher.dispatchAsync(ctx.project.getUuid(), new Notification()
.scope(NotificationScope.PORTFOLIO)
.group(NotificationGroup.BOM_PROCESSED)
.level(NotificationLevel.INFORMATIONAL)
.title(NotificationConstants.Title.BOM_PROCESSED)
.content("A %s BOM was processed".formatted(ctx.bomFormat.getFormatShortName()))
// FIXME: Add reference to BOM after we have dedicated BOM server
.subject(new BomConsumedOrProcessed(ctx.project, /* bom */ "(Omitted)", ctx.bomFormat, ctx.bomSpecVersion)));
if (!delayBomProcessedNotification) {
dispatchBomProcessedNotification(ctx);
} else {
// The notification will be dispatched by the Kafka Streams topology,
// when it detects that the vulnerability scan completed.
LOGGER.warn("Not dispatching %s notification, because %s is enabled (%s)"
.formatted(NotificationGroup.BOM_PROCESSED, ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION.getPropertyName(), ctx));
}
} catch (BomConsumptionException ex) {
LOGGER.error("BOM consumption failed (%s)".formatted(ex.ctx), ex);
updateStateAndCancelDescendants(ctx, WorkflowStep.BOM_CONSUMPTION, WorkflowStatus.FAILED, ex.getMessage());
Expand Down Expand Up @@ -341,6 +344,12 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
qm.persist(vulnAnalysisState);
}
} else {
// No components to be sent for vulnerability analysis.
// If the BOM_PROCESSED notification was delayed, dispatch it now.
if (delayBomProcessedNotification) {
dispatchBomProcessedNotification(ctx);
}

if (vulnAnalysisState != null) {
vulnAnalysisState.setStatus(WorkflowStatus.NOT_APPLICABLE);
vulnAnalysisState.setUpdatedAt(Date.from(Instant.now()));
Expand Down Expand Up @@ -875,6 +884,17 @@ private static Predicate<ServiceComponent> distinctServicesByIdentity(final Map<
};
}

private void dispatchBomProcessedNotification(final Context ctx) {
kafkaEventDispatcher.dispatchAsync(ctx.project.getUuid(), new Notification()
.scope(NotificationScope.PORTFOLIO)
.group(NotificationGroup.BOM_PROCESSED)
.level(NotificationLevel.INFORMATIONAL)
.title(NotificationConstants.Title.BOM_PROCESSED)
.content("A %s BOM was processed".formatted(ctx.bomFormat.getFormatShortName()))
// FIXME: Add reference to BOM after we have dedicated BOM server
.subject(new BomConsumedOrProcessed(ctx.project, /* bom */ "(Omitted)", ctx.bomFormat, ctx.bomSpecVersion)));
}

/**
* An {@link Exception} that signals failures during BOM processing.
*/
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,10 @@ workflow.step.timeout.duration=PT1H
# state (CANCELLED, COMPLETED, FAILED, NOT_APPLICABLE).
# The duration must be specified in ISO8601 notation (https://en.wikipedia.org/wiki/ISO_8601#Durations).
workflow.retention.duration=P3D

# Optional
# Delays the BOM_PROCESSED notification until the vulnerability analysis associated with a given BOM upload
# is completed. The intention being that it is then "safe" to query the API for any identified vulnerabilities.
# This is specifically for cases where polling the /api/v1/bom/token/<TOKEN> endpoint is not feasible.
# THIS IS A TEMPORARY FUNCTIONALITY AND MAY BE REMOVED IN FUTURE RELEASES WITHOUT FURTHER NOTICE.
tmp.delay.bom.processed.notification=false
Loading
Loading