Skip to content

Commit

Permalink
Replace manual transaction commits with callInTransaction (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
nscuro authored Jul 31, 2024
1 parent 6738a4a commit 72217d6
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,10 @@ public List<DependencyMetrics> getDependencyMetricsSince(Component component, Da
* Synchronizes VulnerabilityMetrics.
*/
public void synchronizeVulnerabilityMetrics(List<VulnerabilityMetrics> metrics) {
pm.currentTransaction().begin();
// No need for complex updating, just replace the existing ~400 rows with new ones
// Unless we have a contract with clients that the ID of metric records cannot change?

final Query<VulnerabilityMetrics> delete = pm.newQuery("DELETE FROM org.dependencytrack.model.VulnerabilityMetrics");
delete.execute();

// This still does ~400 queries, probably because not all databases can do bulk insert with autogenerated PKs
// Or because Datanucleus is trying to be smart as it wants to cache all these instances
pm.makePersistentAll(metrics);
pm.currentTransaction().commit();
runInTransaction(() -> {
pm.newQuery("DELETE FROM org.dependencytrack.model.VulnerabilityMetrics").execute();
pm.makePersistentAll(metrics);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,16 @@ public NotificationPublisher getDefaultNotificationPublisherByName(final String
public NotificationPublisher createNotificationPublisher(final String name, final String description,
final String publisherClass, final String templateContent,
final String templateMimeType, final boolean defaultPublisher) {
pm.currentTransaction().begin();
final NotificationPublisher publisher = new NotificationPublisher();
publisher.setName(name);
publisher.setDescription(description);
publisher.setPublisherClass(publisherClass);
publisher.setTemplate(templateContent);
publisher.setTemplateMimeType(templateMimeType);
publisher.setDefaultPublisher(defaultPublisher);
pm.makePersistent(publisher);
pm.currentTransaction().commit();
pm.getFetchPlan().addGroup(NotificationPublisher.FetchGroup.ALL.name());
return getObjectById(NotificationPublisher.class, publisher.getId());
return callInTransaction(() -> {
final NotificationPublisher publisher = new NotificationPublisher();
publisher.setName(name);
publisher.setDescription(description);
publisher.setPublisherClass(publisherClass);
publisher.setTemplate(templateContent);
publisher.setTemplateMimeType(templateMimeType);
publisher.setDefaultPublisher(defaultPublisher);
return pm.makePersistent(publisher);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,25 +775,26 @@ public List<ProjectProperty> getProjectProperties(final Project project) {
* @param project a Project object
* @param tags a List of Tag objects
*/
@SuppressWarnings("unchecked")
@Override
public void bind(Project project, List<Tag> tags) {
final Query<Tag> query = pm.newQuery(Tag.class, "projects.contains(:project)");
final List<Tag> currentProjectTags = (List<Tag>) query.execute(project);
pm.currentTransaction().begin();
for (final Tag tag : currentProjectTags) {
if (!tags.contains(tag)) {
tag.getProjects().remove(project);
runInTransaction(() -> {
final Query<Tag> query = pm.newQuery(Tag.class, "projects.contains(:project)");
query.setParameters(project);
final List<Tag> currentProjectTags = executeAndCloseList(query);

for (final Tag tag : currentProjectTags) {
if (!tags.contains(tag)) {
tag.getProjects().remove(project);
}
}
}
project.setTags(tags);
for (final Tag tag : tags) {
final List<Project> projects = tag.getProjects();
if (!projects.contains(project)) {
projects.add(project);
project.setTags(tags);
for (final Tag tag : tags) {
final List<Project> projects = tag.getProjects();
if (!projects.contains(project)) {
projects.add(project);
}
}
}
pm.currentTransaction().commit();
});
}

/**
Expand Down
72 changes: 16 additions & 56 deletions src/main/java/org/dependencytrack/persistence/QueryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import alpine.persistence.AlpineQueryManager;
import alpine.persistence.OrderDirection;
import alpine.persistence.PaginatedResult;
import alpine.persistence.ScopedCustomization;
import alpine.resources.AlpineRequest;
import alpine.server.util.DbUtil;
import com.github.packageurl.PackageURL;
Expand Down Expand Up @@ -117,9 +118,10 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.datanucleus.PropertyNames.PROPERTY_QUERY_SQL_ALLOWALL;
import static org.dependencytrack.proto.vulnanalysis.v1.ScanStatus.SCAN_STATUS_FAILED;

/**
Expand Down Expand Up @@ -1483,70 +1485,28 @@ public <T> T getObjectByUuid(final Class<T> clazz, final UUID uuid, final List<S
}
}

/**
* Convenience method to execute a given {@link Runnable} within the context of a {@link Transaction}.
* <p>
* Eventually, this may be moved to {@link alpine.persistence.AbstractAlpineQueryManager}.
*
* @param runnable The {@link Runnable} to execute
* @since 4.6.0
*/
public void runInTransaction(final Runnable runnable) {
final Transaction trx = pm.currentTransaction();
try {
trx.begin();
runnable.run();
trx.commit();
} finally {
if (trx.isActive()) {
trx.rollback();
}
}
}

/**
* Convenience method to execute a given {@link Supplier} within the context of a {@link Transaction}.
*
* @param supplier The {@link Supplier} to execute
* @param <T> Type of the result of {@code supplier}
* @return The result of the execution of {@code supplier}
*/
public <T> T runInTransaction(final Supplier<T> supplier) {
final Transaction trx = pm.currentTransaction();
try {
trx.begin();
final T result = supplier.get();
trx.commit();
return result;
} finally {
if (trx.isActive()) {
trx.rollback();
}
}
}

public <T> T runInRetryableTransaction(final Supplier<T> supplier, final Predicate<Throwable> retryOn) {
public <T> T runInRetryableTransaction(final Callable<T> supplier, final Predicate<Throwable> retryOn) {
final var retryConfig = RetryConfig.custom()
.retryOnException(retryOn)
.maxAttempts(3)
.build();

return Retry.of("runInRetryableTransaction", retryConfig)
.executeSupplier(() -> runInTransaction(supplier));
.executeSupplier(() -> callInTransaction(supplier));
}

public void recursivelyDeleteTeam(Team team) {
pm.setProperty("datanucleus.query.sql.allowAll", true);
final Transaction trx = pm.currentTransaction();
pm.currentTransaction().begin();
pm.deletePersistentAll(team.getApiKeys());
String aclDeleteQuery = """
DELETE FROM "PROJECT_ACCESS_TEAMS" WHERE "TEAM_ID" = ?
""";
final Query<?> query = pm.newQuery(JDOQuery.SQL_QUERY_LANGUAGE, aclDeleteQuery);
query.executeWithArray(team.getId());
pm.deletePersistent(team);
pm.currentTransaction().commit();
runInTransaction(() -> {
pm.deletePersistentAll(team.getApiKeys());

try (var ignored = new ScopedCustomization(pm).withProperty(PROPERTY_QUERY_SQL_ALLOWALL, "true")) {
final Query<?> aclDeleteQuery = pm.newQuery(JDOQuery.SQL_QUERY_LANGUAGE, """
DELETE FROM "PROJECT_ACCESS_TEAMS" WHERE "PROJECT_ACCESS_TEAMS"."TEAM_ID" = ?""");
executeAndCloseWithArray(aclDeleteQuery, team.getId());
}

pm.deletePersistent(team);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,16 @@ public void addVulnerability(Vulnerability vulnerability, Component component, A
* @param component the component unaffected by the vulnerabiity
*/
public void removeVulnerability(Vulnerability vulnerability, Component component) {
if (contains(vulnerability, component)) {
pm.currentTransaction().begin();
component.removeVulnerability(vulnerability);
pm.currentTransaction().commit();
}
final FindingAttribution fa = getFindingAttribution(vulnerability, component);
if (fa != null) {
delete(fa);
}
runInTransaction(() -> {
if (contains(vulnerability, component)) {
component.removeVulnerability(vulnerability);
}

final FindingAttribution fa = getFindingAttribution(vulnerability, component);
if (fa != null) {
delete(fa);
}
});
}

/**
Expand Down Expand Up @@ -640,7 +641,7 @@ SELECT COUNT(DISTINCT this.project.id)
}

public synchronized VulnerabilityAlias synchronizeVulnerabilityAlias(final VulnerabilityAlias alias) {
return runInTransaction(() -> {
return callInTransaction(() -> {
// Query existing aliases that match AT LEAST ONE identifier of the given alias.
//
// For each data source, we want to know the existing aliases where the respective identifier either:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public Response cloneProject(CloneProjectRequest jsonRequest) {
}
LOGGER.info("Project " + sourceProject + " is being cloned by " + super.getPrincipal().getName());
CloneProjectEvent event = new CloneProjectEvent(jsonRequest);
final Response response = qm.runInTransaction(() -> {
final Response response = qm.callInTransaction(() -> {
WorkflowState workflowState = qm.getWorkflowStateByTokenAndStep(event.getChainIdentifier(), WorkflowStep.PROJECT_CLONE);
if (workflowState != null) {
if (isEventBeingProcessed(event.getChainIdentifier()) || !workflowState.getStatus().isTerminal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public Response updateApiKeyComment(@PathParam("apikey") final String apikey,
try (final var qm = new QueryManager()) {
qm.getPersistenceManager().setProperty(PROPERTY_RETAIN_VALUES, "true");

return qm.runInTransaction(() -> {
return qm.callInTransaction(() -> {
final ApiKey apiKey = qm.getApiKey(apikey);
if (apiKey == null) {
return Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Response triggerVulnerabilityPolicyBundleSync() {
qm.getPersistenceManager().currentTransaction().setSerializeRead(true);

final UUID token = VulnerabilityPolicyFetchEvent.CHAIN_IDENTIFIER;
final Response response = qm.runInTransaction(() -> {
final Response response = qm.callInTransaction(() -> {
WorkflowState workflowState = qm.getWorkflowStateByTokenAndStep(token, WorkflowStep.POLICY_BUNDLE_SYNC);
if (workflowState != null) {
if (isEventBeingProcessed(token) || !workflowState.getStatus().isTerminal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private ProcessedBom processBom(final Context ctx, final ConsumedBom bom) {
// See https://www.datanucleus.org/products/accessplatform_6_0/jdo/persistence.html#lifecycle
qm.getPersistenceManager().setProperty(PROPERTY_RETAIN_VALUES, "true");

return qm.runInTransaction(() -> {
return qm.callInTransaction(() -> {
final Project persistentProject = processProject(ctx, qm, bom.project(), bom.projectMetadata());

LOGGER.info("Processing %d components".formatted(bom.components().size()));
Expand Down Expand Up @@ -1031,7 +1031,7 @@ private static List<ComponentRepositoryMetaAnalysisEvent> createRepoMetaAnalysis
continue;
}

final boolean shouldFetchIntegrityData = qm.runInTransaction(() -> prepareIntegrityMetaComponent(qm, component));
final boolean shouldFetchIntegrityData = qm.callInTransaction(() -> prepareIntegrityMetaComponent(qm, component));
if (shouldFetchIntegrityData) {
events.add(new ComponentRepositoryMetaAnalysisEvent(
component.getUuid(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static void transitionTimedOutStepsToFailed(final QueryManager qm, final
int stepsCancelled = 0;
try {
for (final WorkflowState state : failedQuery.executeList()) {
stepsCancelled += qm.runInTransaction(() -> {
stepsCancelled += qm.callInTransaction(() -> {
final Date now = new Date();
state.setStatus(WorkflowStatus.FAILED);
state.setFailureReason("Timed out");
Expand Down Expand Up @@ -207,7 +207,7 @@ HAVING max(updatedAt) < :cutoff
).isEmpty()
""");
try {
stepsDeleted += qm.runInTransaction(() -> (long) workflowDeleteQuery.executeWithMap(Map.of(
stepsDeleted += qm.callInTransaction(() -> (long) workflowDeleteQuery.executeWithMap(Map.of(
"tokens", tokenBatch,
"nonTerminalStatuses", Set.of(WorkflowStatus.PENDING, WorkflowStatus.TIMED_OUT)
)));
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date %level [%logger{0}] %msg%replace( [%mdc{}]){' \[\]', ''}%n</pattern>
Expand Down

0 comments on commit 72217d6

Please sign in to comment.