Skip to content

Commit

Permalink
Migrate RepositoryMetaResultProcessor from Kafka Streams to Paralle…
Browse files Browse the repository at this point in the history
…l Consumer

Depends on #552

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Feb 26, 2024
1 parent 55f9850 commit 3e655f7
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.processor.api.ProcessorManager;

import javax.servlet.ServletContextEvent;
Expand All @@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener {
public void contextInitialized(final ServletContextEvent event) {
LOGGER.info("Initializing processors");

// TODO: Register processor here!
PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME,
KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor());

PROCESSOR_MANAGER.startAll();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.dependencytrack.event.kafka.streams.processor;
package org.dependencytrack.event.kafka.processor;

import alpine.common.logging.Logger;
import alpine.common.metrics.Metrics;
import com.github.packageurl.MalformedPackageURLException;
import com.github.packageurl.PackageURL;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dependencytrack.event.kafka.processor.api.Processor;
import org.dependencytrack.event.kafka.processor.exception.ProcessingException;
import org.dependencytrack.model.FetchStatus;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.dependencytrack.model.RepositoryMetaComponent;
Expand All @@ -29,16 +28,14 @@
/**
* A {@link Processor} responsible for processing result of component repository meta analyses.
*/
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult, Void, Void> {
public class RepositoryMetaResultProcessor implements Processor<String, AnalysisResult> {

static final String PROCESSOR_NAME = "repo.meta.analysis.result";

private static final Logger LOGGER = Logger.getLogger(RepositoryMetaResultProcessor.class);
private static final Timer TIMER = Timer.builder("repo_meta_result_processing")
.description("Time taken to process repository meta analysis results")
.register(Metrics.getRegistry());

@Override
public void process(final Record<String, AnalysisResult> record) {
final Timer.Sample timerSample = Timer.start();
public void process(final ConsumerRecord<String, AnalysisResult> record) throws ProcessingException {
if (!isRecordValid(record)) {
return;
}
Expand All @@ -49,13 +46,11 @@ public void process(final Record<String, AnalysisResult> record) {
performIntegrityCheck(integrityMetaComponent, record.value(), qm);
}
} catch (Exception e) {
LOGGER.error("An unexpected error occurred while processing record %s".formatted(record), e);
} finally {
timerSample.stop(TIMER);
throw new ProcessingException(e);
}
}

private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws MalformedPackageURLException {
private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws MalformedPackageURLException {
final AnalysisResult result = record.value();
PackageURL purl = new PackageURL(result.getComponent().getPurl());
if (result.hasIntegrityMeta()) {
Expand All @@ -66,7 +61,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetadata(final QueryManager q
}
}

private void synchronizeRepositoryMetadata(final QueryManager queryManager, final Record<String, AnalysisResult> record) throws Exception {
private void synchronizeRepositoryMetadata(final QueryManager queryManager, final ConsumerRecord<String, AnalysisResult> record) throws Exception {
PersistenceManager pm = queryManager.getPersistenceManager();
final AnalysisResult result = record.value();
PackageURL purl = new PackageURL(result.getComponent().getPurl());
Expand Down Expand Up @@ -104,7 +99,7 @@ private void synchronizeRepositoryMetadata(final QueryManager queryManager, fina
}
}

private RepositoryMetaComponent createRepositoryMetaResult(Record<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
private RepositoryMetaComponent createRepositoryMetaResult(ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, PersistenceManager pm, PackageURL purl) throws Exception {
final AnalysisResult result = incomingAnalysisResultRecord.value();
if (result.hasLatestVersion()) {
try (final Query<RepositoryMetaComponent> query = pm.newQuery(RepositoryMetaComponent.class)) {
Expand Down Expand Up @@ -145,7 +140,7 @@ private RepositoryMetaComponent createRepositoryMetaResult(Record<String, Analys
}
}

private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
private IntegrityMetaComponent synchronizeIntegrityMetaResult(final ConsumerRecord<String, AnalysisResult> incomingAnalysisResultRecord, QueryManager queryManager, PackageURL purl) {
final AnalysisResult result = incomingAnalysisResultRecord.value();
IntegrityMetaComponent persistentIntegrityMetaComponent = queryManager.getIntegrityMetaComponent(purl.toString());
if (persistentIntegrityMetaComponent != null && persistentIntegrityMetaComponent.getStatus() != null && persistentIntegrityMetaComponent.getStatus().equals(FetchStatus.PROCESSED)) {
Expand Down Expand Up @@ -180,7 +175,7 @@ private IntegrityMetaComponent synchronizeIntegrityMetaResult(final Record<Strin
return queryManager.updateIntegrityMetaComponent(persistentIntegrityMetaComponent);
}

private static boolean isRecordValid(final Record<String, AnalysisResult> record) {
private static boolean isRecordValid(final ConsumerRecord<String, AnalysisResult> record) {
final AnalysisResult result = record.value();
if (!result.hasComponent()) {
LOGGER.warn("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.dependencytrack.event.kafka.KafkaTopics;
import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor;
import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor;
import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor;
import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor;
import org.dependencytrack.model.VulnerabilityScan;
import org.dependencytrack.model.WorkflowState;
Expand Down Expand Up @@ -218,12 +217,6 @@ Topology createTopology() {
Event.dispatch(policyEvaluationEvent);
}, Named.as("trigger_policy_evaluation"));

streamsBuilder
.stream(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(),
Consumed.with(KafkaTopics.REPO_META_ANALYSIS_RESULT.keySerde(), KafkaTopics.REPO_META_ANALYSIS_RESULT.valueSerde())
.withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name())))
.process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result"));

streamsBuilder
.stream(KafkaTopics.NEW_VULNERABILITY.name(),
Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde())
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,14 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M
# Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options.
# alpine.kafka.processor.<name>.consumer.<consumer.config.name>=

alpine.kafka.processor.repo.meta.analysis.result.max.concurrency=-1
alpine.kafka.processor.repo.meta.analysis.result.processing.order=key
alpine.kafka.processor.repo.meta.analysis.result.retry.initial.delay.ms=1000
alpine.kafka.processor.repo.meta.analysis.result.retry.multiplier=2
alpine.kafka.processor.repo.meta.analysis.result.retry.randomization.factor=0.3
alpine.kafka.processor.repo.meta.analysis.result.retry.max.delay.ms=180000
alpine.kafka.processor.repo.meta.analysis.result.consumer.group.id=dtrack-apiserver-processor

# Scheduling tasks after 3 minutes (3*60*1000) of starting application
task.scheduler.initial.delay=180000

Expand Down
Loading

0 comments on commit 3e655f7

Please sign in to comment.