Skip to content

Commit

Permalink
Merge pull request #304 from DependencyTrack/with-update-select-for-s…
Browse files Browse the repository at this point in the history
…canresult-recording

Improve performance of `ScanResult` recording
  • Loading branch information
VithikaS authored Sep 13, 2023
2 parents 05ce74a + ab8656f commit b115c70
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,9 @@ Topology createTopology() {
.repartition(Repartitioned
.with(Serdes.String(), KafkaTopics.VULN_ANALYSIS_RESULT.valueSerde())
.withName("processed-vuln-scan-result-by-scan-token"))
.mapValues((scanToken, value) -> {
try (final var qm = new QueryManager().withL2CacheDisabled()) {
// Detach VulnerabilityScan objects when committing changes. Without this,
// all fields except the ID field will be unloaded on commit (the object will become HOLLOW),
// causing the call to getStatus() to trigger a database query behind the scenes.
qm.getPersistenceManager().setProperty(PropertyNames.PROPERTY_DETACH_ALL_ON_COMMIT, "true");

final VulnerabilityScan vulnScan = qm.recordVulnerabilityScanResult(scanToken, value);
if (vulnScan == null || vulnScan.getStatus() != VulnerabilityScan.Status.COMPLETED) {
// When the vulnerability scan is not completed, we don't care about it.
// We'll filter out nulls in the next filter step.
return null;
}

return vulnScan;
.mapValues((scanToken, scanResult) -> {
try (final var qm = new QueryManager()) {
return qm.recordVulnerabilityScanResult(scanToken, scanResult);
}
}, Named.as("record_processed_vuln_scan_result"))
.filter((scanToken, vulnScan) -> vulnScan != null,
Expand Down
107 changes: 72 additions & 35 deletions src/main/java/org/dependencytrack/persistence/QueryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import javax.jdo.Transaction;
import javax.jdo.datastore.JDOConnection;
import java.security.Principal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -1498,46 +1503,78 @@ public VulnerabilityScan getVulnerabilityScan(final String token) {
}

/**
* Record the successful receipt of a {@link ScanStatus #SCAN_STATUS_COMPLETE} event for a given {@link VulnerabilityScan}.
* <p>
* This method expects that access to the {@link VulnerabilityScan} table is serialized
* through Kafka events, keyed by the scan's token. This assumption allows for optimistic
* locking to be used.
* Record the successful processing of a {@link ScanResult} event for a given {@link VulnerabilityScan}.
*
* @param scanToken The token that uniquely identifies the scan for clients
* @param value
* @return The updated {@link VulnerabilityScan}, or {@code null} when no {@link VulnerabilityScan} was found
* @param scanToken The token that uniquely identifies the scan for clients
* @param scanResult The {@link ScanResult} to record
* @return The {@link VulnerabilityScan} when its status transitioned to {@link VulnerabilityScan.Status#COMPLETED}
* as a result of recording the given {@link ScanResult}, otherwise {@code null}
*/
public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, ScanResult value) {
final Transaction trx = pm.currentTransaction();
trx.setOptimistic(true);
try {
trx.begin();
final Query<VulnerabilityScan> scanQuery = pm.newQuery(VulnerabilityScan.class);
scanQuery.setFilter("token == :token");
scanQuery.setParameters(scanToken);
final VulnerabilityScan scan = scanQuery.executeUnique();
if (scan == null) {
public VulnerabilityScan recordVulnerabilityScanResult(final String scanToken, final ScanResult scanResult) {
final int totalScannerResults = scanResult.getScannerResultsCount();
final int failedScannerResults = Math.toIntExact(scanResult.getScannerResultsList().stream()
.map(ScannerResult::getStatus)
.filter(SCAN_STATUS_FAILED::equals)
.count());

// Because this method will be called VERY frequently (once for each processed ScanResult),
// use raw SQL instead of any ORM abstractions. All we need to do is to increment some counters.
// Using a single SQL statement also removes the need for (optimistic) locking.
final JDOConnection jdoConnection = pm.getDataStoreConnection();
final var nativeConnection = (Connection) jdoConnection.getNativeConnection();
try (final PreparedStatement ps = nativeConnection.prepareStatement("""
WITH "RES" AS (
UPDATE "VULNERABILITYSCAN"
SET
"RECEIVED_RESULTS" = "RECEIVED_RESULTS" + 1,
"SCAN_TOTAL" = "SCAN_TOTAL" + ?,
"SCAN_FAILED" = "SCAN_FAILED" + ?,
"STATUS" = (
CASE WHEN "EXPECTED_RESULTS" = ("RECEIVED_RESULTS" + 1)
THEN 'COMPLETED'
ELSE 'IN_PROGRESS'
END
),
"UPDATED_AT" = NOW()
WHERE
"TOKEN" = ?
RETURNING
"SCAN_FAILED",
"SCAN_TOTAL",
"STATUS",
"TARGET_TYPE",
"TARGET_IDENTIFIER"
)
SELECT *
FROM "RES"
WHERE
-- No point in fetching any data from DB if the
-- record is not in the desired final state yet.
"STATUS" = 'COMPLETED'
""")) {
ps.setInt(1, totalScannerResults);
ps.setInt(2, failedScannerResults);
ps.setString(3, scanToken);

final ResultSet rs = ps.executeQuery();
if (rs.next()) {
final var vs = new VulnerabilityScan();
vs.setToken(scanToken);
vs.setTargetType(VulnerabilityScan.TargetType.valueOf(rs.getString("TARGET_TYPE")));
vs.setTargetIdentifier(UUID.fromString(rs.getString("TARGET_IDENTIFIER")));
vs.setScanFailed(rs.getInt("SCAN_FAILED"));
vs.setScanTotal(rs.getInt("SCAN_TOTAL"));
vs.setStatus(VulnerabilityScan.Status.valueOf(rs.getString("STATUS")));
return vs;
} else {
return null;
}
final int received = scan.getReceivedResults() + 1;
scan.setReceivedResults(received);
final long failedScanCount = value.getScannerResultsList().stream()
.map(ScannerResult::getStatus)
.filter(SCAN_STATUS_FAILED::equals)
.count();
scan.setScanFailed(scan.getScanFailed() + failedScanCount);
scan.setScanTotal(value.getScannerResultsCount() + scan.getScanTotal());
scan.setStatus(scan.getExpectedResults() - received == 0
? VulnerabilityScan.Status.COMPLETED
: VulnerabilityScan.Status.IN_PROGRESS);
scan.setUpdatedAt(new Date());
trx.commit();
return scan;
} catch (SQLException e) {
throw new RuntimeException("""
Failed to record successful processing of scan result (token=%s, component=%s)\
""".formatted(scanToken, scanResult.getKey().getComponentUuid()), e);
} finally {
if (trx.isActive()) {
trx.rollback();
}
jdoConnection.close();
}
}

Expand Down

0 comments on commit b115c70

Please sign in to comment.