diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java b/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java index 273ac7d0a..0b78b3f0d 100644 --- a/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java @@ -97,21 +97,9 @@ Topology createTopology() { .repartition(Repartitioned .with(Serdes.String(), KafkaTopics.VULN_ANALYSIS_RESULT.valueSerde()) .withName("processed-vuln-scan-result-by-scan-token")) - .mapValues((scanToken, value) -> { - try (final var qm = new QueryManager().withL2CacheDisabled()) { - // Detach VulnerabilityScan objects when committing changes. Without this, - // all fields except the ID field will be unloaded on commit (the object will become HOLLOW), - // causing the call to getStatus() to trigger a database query behind the scenes. - qm.getPersistenceManager().setProperty(PropertyNames.PROPERTY_DETACH_ALL_ON_COMMIT, "true"); - - final VulnerabilityScan vulnScan = qm.recordVulnerabilityScanResult(scanToken, value); - if (vulnScan == null || vulnScan.getStatus() != VulnerabilityScan.Status.COMPLETED) { - // When the vulnerability scan is not completed, we don't care about it. - // We'll filter out nulls in the next filter step. - return null; - } - - return vulnScan; + .mapValues((scanToken, scanResult) -> { + try (final var qm = new QueryManager()) { + return qm.recordVulnerabilityScanResult(scanToken, scanResult); } }, Named.as("record_processed_vuln_scan_result")) .filter((scanToken, vulnScan) -> vulnScan != null, diff --git a/src/main/java/org/dependencytrack/persistence/QueryManager.java b/src/main/java/org/dependencytrack/persistence/QueryManager.java index aff236543..1df482fb2 100644 --- a/src/main/java/org/dependencytrack/persistence/QueryManager.java +++ b/src/main/java/org/dependencytrack/persistence/QueryManager.java @@ -88,7 +88,12 @@ import javax.jdo.PersistenceManager; import javax.jdo.Query; import javax.jdo.Transaction; +import javax.jdo.datastore.JDOConnection; import java.security.Principal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Collection; import java.util.Date; import java.util.HashSet; @@ -1498,46 +1503,78 @@ public VulnerabilityScan getVulnerabilityScan(final String token) { } /** - * Record the successful receipt of a {@link ScanStatus #SCAN_STATUS_COMPLETE} event for a given {@link VulnerabilityScan}. - *
- * This method expects that access to the {@link VulnerabilityScan} table is serialized
- * through Kafka events, keyed by the scan's token. This assumption allows for optimistic
- * locking to be used.
+ * Record the successful processing of a {@link ScanResult} event for a given {@link VulnerabilityScan}.
*
- * @param scanToken The token that uniquely identifies the scan for clients
- * @param value
- * @return The updated {@link VulnerabilityScan}, or {@code null} when no {@link VulnerabilityScan} was found
+ * @param scanToken The token that uniquely identifies the scan for clients
+ * @param scanResult The {@link ScanResult} to record
+ * @return The {@link VulnerabilityScan} when its status transitioned to {@link VulnerabilityScan.Status#COMPLETED}
+ * as a result of recording the given {@link ScanResult}, otherwise {@code null}
*/
- public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, ScanResult value) {
- final Transaction trx = pm.currentTransaction();
- trx.setOptimistic(true);
- try {
- trx.begin();
- final Query