diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java b/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java index 273ac7d0a..0b78b3f0d 100644 --- a/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java @@ -97,21 +97,9 @@ Topology createTopology() { .repartition(Repartitioned .with(Serdes.String(), KafkaTopics.VULN_ANALYSIS_RESULT.valueSerde()) .withName("processed-vuln-scan-result-by-scan-token")) - .mapValues((scanToken, value) -> { - try (final var qm = new QueryManager().withL2CacheDisabled()) { - // Detach VulnerabilityScan objects when committing changes. Without this, - // all fields except the ID field will be unloaded on commit (the object will become HOLLOW), - // causing the call to getStatus() to trigger a database query behind the scenes. - qm.getPersistenceManager().setProperty(PropertyNames.PROPERTY_DETACH_ALL_ON_COMMIT, "true"); - - final VulnerabilityScan vulnScan = qm.recordVulnerabilityScanResult(scanToken, value); - if (vulnScan == null || vulnScan.getStatus() != VulnerabilityScan.Status.COMPLETED) { - // When the vulnerability scan is not completed, we don't care about it. - // We'll filter out nulls in the next filter step. - return null; - } - - return vulnScan; + .mapValues((scanToken, scanResult) -> { + try (final var qm = new QueryManager()) { + return qm.recordVulnerabilityScanResult(scanToken, scanResult); } }, Named.as("record_processed_vuln_scan_result")) .filter((scanToken, vulnScan) -> vulnScan != null, diff --git a/src/main/java/org/dependencytrack/persistence/QueryManager.java b/src/main/java/org/dependencytrack/persistence/QueryManager.java index aff236543..1df482fb2 100644 --- a/src/main/java/org/dependencytrack/persistence/QueryManager.java +++ b/src/main/java/org/dependencytrack/persistence/QueryManager.java @@ -88,7 +88,12 @@ import javax.jdo.PersistenceManager; import javax.jdo.Query; import javax.jdo.Transaction; +import javax.jdo.datastore.JDOConnection; import java.security.Principal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Collection; import java.util.Date; import java.util.HashSet; @@ -1498,46 +1503,78 @@ public VulnerabilityScan getVulnerabilityScan(final String token) { } /** - * Record the successful receipt of a {@link ScanStatus #SCAN_STATUS_COMPLETE} event for a given {@link VulnerabilityScan}. - *

- * This method expects that access to the {@link VulnerabilityScan} table is serialized - * through Kafka events, keyed by the scan's token. This assumption allows for optimistic - * locking to be used. + * Record the successful processing of a {@link ScanResult} event for a given {@link VulnerabilityScan}. * - * @param scanToken The token that uniquely identifies the scan for clients - * @param value - * @return The updated {@link VulnerabilityScan}, or {@code null} when no {@link VulnerabilityScan} was found + * @param scanToken The token that uniquely identifies the scan for clients + * @param scanResult The {@link ScanResult} to record + * @return The {@link VulnerabilityScan} when its status transitioned to {@link VulnerabilityScan.Status#COMPLETED} + * as a result of recording the given {@link ScanResult}, otherwise {@code null} */ - public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, ScanResult value) { - final Transaction trx = pm.currentTransaction(); - trx.setOptimistic(true); - try { - trx.begin(); - final Query scanQuery = pm.newQuery(VulnerabilityScan.class); - scanQuery.setFilter("token == :token"); - scanQuery.setParameters(scanToken); - final VulnerabilityScan scan = scanQuery.executeUnique(); - if (scan == null) { + public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, final ScanResult scanResult) { + final int totalScannerResults = scanResult.getScannerResultsCount(); + final int failedScannerResults = Math.toIntExact(scanResult.getScannerResultsList().stream() + .map(ScannerResult::getStatus) + .filter(SCAN_STATUS_FAILED::equals) + .count()); + + // Because this method will be called VERY frequently (once for each processed ScanResult), + // use raw SQL instead of any ORM abstractions. All we need to do is to increment some counters. + // Using a single SQL statement also removes the need for (optimistic) locking. + final JDOConnection jdoConnection = pm.getDataStoreConnection(); + final var nativeConnection = (Connection) jdoConnection.getNativeConnection(); + try (final PreparedStatement ps = nativeConnection.prepareStatement(""" + WITH "RES" AS ( + UPDATE "VULNERABILITYSCAN" + SET + "RECEIVED_RESULTS" = "RECEIVED_RESULTS" + 1, + "SCAN_TOTAL" = "SCAN_TOTAL" + ?, + "SCAN_FAILED" = "SCAN_FAILED" + ?, + "STATUS" = ( + CASE WHEN "EXPECTED_RESULTS" = ("RECEIVED_RESULTS" + 1) + THEN 'COMPLETED' + ELSE 'IN_PROGRESS' + END + ), + "UPDATED_AT" = NOW() + WHERE + "TOKEN" = ? + RETURNING + "SCAN_FAILED", + "SCAN_TOTAL", + "STATUS", + "TARGET_TYPE", + "TARGET_IDENTIFIER" + ) + SELECT * + FROM "RES" + WHERE + -- No point in fetching any data from DB if the + -- record is not in the desired final state yet. + "STATUS" = 'COMPLETED' + """)) { + ps.setInt(1, totalScannerResults); + ps.setInt(2, failedScannerResults); + ps.setString(3, scanToken); + + final ResultSet rs = ps.executeQuery(); + if (rs.next()) { + final var vs = new VulnerabilityScan(); + vs.setToken(scanToken); + vs.setTargetType(VulnerabilityScan.TargetType.valueOf(rs.getString("TARGET_TYPE"))); + vs.setTargetIdentifier(UUID.fromString(rs.getString("TARGET_IDENTIFIER"))); + vs.setScanFailed(rs.getInt("SCAN_FAILED")); + vs.setScanTotal(rs.getInt("SCAN_TOTAL")); + vs.setStatus(VulnerabilityScan.Status.valueOf(rs.getString("STATUS"))); + return vs; + } else { return null; } - final int received = scan.getReceivedResults() + 1; - scan.setReceivedResults(received); - final long failedScanCount = value.getScannerResultsList().stream() - .map(ScannerResult::getStatus) - .filter(SCAN_STATUS_FAILED::equals) - .count(); - scan.setScanFailed(scan.getScanFailed() + failedScanCount); - scan.setScanTotal(value.getScannerResultsCount() + scan.getScanTotal()); - scan.setStatus(scan.getExpectedResults() - received == 0 - ? VulnerabilityScan.Status.COMPLETED - : VulnerabilityScan.Status.IN_PROGRESS); - scan.setUpdatedAt(new Date()); - trx.commit(); - return scan; + } catch (SQLException e) { + throw new RuntimeException(""" + Failed to record successful processing of scan result (token=%s, component=%s)\ + """.formatted(scanToken, scanResult.getKey().getComponentUuid()), e); } finally { - if (trx.isActive()) { - trx.rollback(); - } + jdoConnection.close(); } }