From ab8656f7d1034abf1de51cd2e4444513df1a3fe2 Mon Sep 17 00:00:00 2001
From: nscuro <nscuro@protonmail.com>
Date: Sun, 10 Sep 2023 01:47:34 +0200
Subject: [PATCH] Improve performance of `ScanResult` recording

Instead of...

* fetching the entire record from the database for every incoming `ScanResult`
* modifying it in Java, and
* updating it in DB

... use a single SQL statement, and only fetch data when the modified record is in the desired state.

Signed-off-by: nscuro <nscuro@protonmail.com>
---
 .../kafka/KafkaStreamsTopologyFactory.java    |  18 +--
 .../persistence/QueryManager.java             | 107 ++++++++++++------
 2 files changed, 75 insertions(+), 50 deletions(-)

diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java b/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java
index 273ac7d0a..0b78b3f0d 100644
--- a/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java
+++ b/src/main/java/org/dependencytrack/event/kafka/KafkaStreamsTopologyFactory.java
@@ -97,21 +97,9 @@ Topology createTopology() {
                 .repartition(Repartitioned
                         .with(Serdes.String(), KafkaTopics.VULN_ANALYSIS_RESULT.valueSerde())
                         .withName("processed-vuln-scan-result-by-scan-token"))
-                .mapValues((scanToken, value) -> {
-                    try (final var qm = new QueryManager().withL2CacheDisabled()) {
-                        // Detach VulnerabilityScan objects when committing changes. Without this,
-                        // all fields except the ID field will be unloaded on commit (the object will become HOLLOW),
-                        // causing the call to getStatus() to trigger a database query behind the scenes.
-                        qm.getPersistenceManager().setProperty(PropertyNames.PROPERTY_DETACH_ALL_ON_COMMIT, "true");
-
-                        final VulnerabilityScan vulnScan = qm.recordVulnerabilityScanResult(scanToken, value);
-                        if (vulnScan == null || vulnScan.getStatus() != VulnerabilityScan.Status.COMPLETED) {
-                            // When the vulnerability scan is not completed, we don't care about it.
-                            // We'll filter out nulls in the next filter step.
-                            return null;
-                        }
-
-                        return vulnScan;
+                .mapValues((scanToken, scanResult) -> {
+                    try (final var qm = new QueryManager()) {
+                        return qm.recordVulnerabilityScanResult(scanToken, scanResult);
                     }
                 }, Named.as("record_processed_vuln_scan_result"))
                 .filter((scanToken, vulnScan) -> vulnScan != null,
diff --git a/src/main/java/org/dependencytrack/persistence/QueryManager.java b/src/main/java/org/dependencytrack/persistence/QueryManager.java
index aff236543..1df482fb2 100644
--- a/src/main/java/org/dependencytrack/persistence/QueryManager.java
+++ b/src/main/java/org/dependencytrack/persistence/QueryManager.java
@@ -88,7 +88,12 @@
 import javax.jdo.PersistenceManager;
 import javax.jdo.Query;
 import javax.jdo.Transaction;
+import javax.jdo.datastore.JDOConnection;
 import java.security.Principal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -1498,46 +1503,78 @@ public VulnerabilityScan getVulnerabilityScan(final String token) {
     }
 
     /**
-     * Record the successful receipt of a {@link ScanStatus #SCAN_STATUS_COMPLETE} event for a given {@link VulnerabilityScan}.
-     * <p>
-     * This method expects that access to the {@link VulnerabilityScan} table is serialized
-     * through Kafka events, keyed by the scan's token. This assumption allows for optimistic
-     * locking to be used.
+     * Record the successful processing of a {@link ScanResult} event for a given {@link VulnerabilityScan}.
      *
-     * @param scanToken The token that uniquely identifies the scan for clients
-     * @param value
-     * @return The updated {@link VulnerabilityScan}, or {@code null} when no {@link VulnerabilityScan} was found
+     * @param scanToken  The token that uniquely identifies the scan for clients
+     * @param scanResult The {@link ScanResult} to record
+     * @return The {@link VulnerabilityScan} when its status transitioned to {@link VulnerabilityScan.Status#COMPLETED}
+     * as a result of recording the given {@link ScanResult}, otherwise {@code null}
      */
-    public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, ScanResult value) {
-        final Transaction trx = pm.currentTransaction();
-        trx.setOptimistic(true);
-        try {
-            trx.begin();
-            final Query<VulnerabilityScan> scanQuery = pm.newQuery(VulnerabilityScan.class);
-            scanQuery.setFilter("token == :token");
-            scanQuery.setParameters(scanToken);
-            final VulnerabilityScan scan = scanQuery.executeUnique();
-            if (scan == null) {
+    public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, final ScanResult scanResult) {
+        final int totalScannerResults = scanResult.getScannerResultsCount();
+        final int failedScannerResults = Math.toIntExact(scanResult.getScannerResultsList().stream()
+                .map(ScannerResult::getStatus)
+                .filter(SCAN_STATUS_FAILED::equals)
+                .count());
+
+        // Because this method will be called VERY frequently (once for each processed ScanResult),
+        // use raw SQL instead of any ORM abstractions. All we need to do is to increment some counters.
+        // Using a single SQL statement also removes the need for (optimistic) locking.
+        final JDOConnection jdoConnection = pm.getDataStoreConnection();
+        final var nativeConnection = (Connection) jdoConnection.getNativeConnection();
+        try (final PreparedStatement ps = nativeConnection.prepareStatement("""
+                WITH "RES" AS (
+                  UPDATE "VULNERABILITYSCAN"
+                  SET
+                    "RECEIVED_RESULTS" = "RECEIVED_RESULTS" + 1,
+                    "SCAN_TOTAL" = "SCAN_TOTAL" + ?,
+                    "SCAN_FAILED" = "SCAN_FAILED" + ?,
+                    "STATUS" = (
+                      CASE WHEN "EXPECTED_RESULTS" = ("RECEIVED_RESULTS" + 1)
+                      THEN 'COMPLETED'
+                      ELSE 'IN_PROGRESS'
+                      END
+                    ),
+                    "UPDATED_AT" = NOW()
+                  WHERE
+                    "TOKEN" = ?
+                  RETURNING
+                    "SCAN_FAILED",
+                    "SCAN_TOTAL",
+                    "STATUS",
+                    "TARGET_TYPE",
+                    "TARGET_IDENTIFIER"
+                )
+                SELECT *
+                FROM "RES"
+                WHERE
+                  -- No point in fetching any data from DB if the
+                  -- record is not in the desired final state yet.
+                  "STATUS" = 'COMPLETED'
+                """)) {
+            ps.setInt(1, totalScannerResults);
+            ps.setInt(2, failedScannerResults);
+            ps.setString(3, scanToken);
+
+            final ResultSet rs = ps.executeQuery();
+            if (rs.next()) {
+                final var vs = new VulnerabilityScan();
+                vs.setToken(scanToken);
+                vs.setTargetType(VulnerabilityScan.TargetType.valueOf(rs.getString("TARGET_TYPE")));
+                vs.setTargetIdentifier(UUID.fromString(rs.getString("TARGET_IDENTIFIER")));
+                vs.setScanFailed(rs.getInt("SCAN_FAILED"));
+                vs.setScanTotal(rs.getInt("SCAN_TOTAL"));
+                vs.setStatus(VulnerabilityScan.Status.valueOf(rs.getString("STATUS")));
+                return vs;
+            } else {
                 return null;
             }
-            final int received = scan.getReceivedResults() + 1;
-            scan.setReceivedResults(received);
-            final long failedScanCount = value.getScannerResultsList().stream()
-                    .map(ScannerResult::getStatus)
-                    .filter(SCAN_STATUS_FAILED::equals)
-                    .count();
-            scan.setScanFailed(scan.getScanFailed() + failedScanCount);
-            scan.setScanTotal(value.getScannerResultsCount() + scan.getScanTotal());
-            scan.setStatus(scan.getExpectedResults() - received == 0
-                    ? VulnerabilityScan.Status.COMPLETED
-                    : VulnerabilityScan.Status.IN_PROGRESS);
-            scan.setUpdatedAt(new Date());
-            trx.commit();
-            return scan;
+        } catch (SQLException e) {
+            throw new RuntimeException("""
+                    Failed to record successful processing of scan result (token=%s, component=%s)\
+                    """.formatted(scanToken, scanResult.getKey().getComponentUuid()), e);
         } finally {
-            if (trx.isActive()) {
-                trx.rollback();
-            }
+            jdoConnection.close();
         }
     }