variantStatsWriter(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate) {
+ return new VariantStatsWriter(databaseParameters, mongoTemplate);
+ }
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java
new file mode 100644
index 000000000..ea85cefca
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Scope;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.CalculateAndLoadStatisticsStepConfiguration;
+import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer;
+
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_JOB;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP;
+
+/**
+ * Configuration to run a full Statistics job: variantStatsFlow: statsCreate --> statsLoad
+ *
+ * TODO add a new PopulationStatisticsJobParametersValidator
+ */
+@Configuration
+@EnableBatchProcessing
+@Import({CalculateAndLoadStatisticsStepConfiguration.class})
+public class CalculateAndLoadStatisticsJobConfiguration {
+
+ private static final Logger logger = LoggerFactory.getLogger(CalculateAndLoadStatisticsJobConfiguration.class);
+
+ @Autowired
+ @Qualifier(CALCULATE_AND_LOAD_STATISTICS_STEP)
+ private Step calculateAndLoadStatisticsStep;
+
+ @Bean(CALCULATE_AND_LOAD_STATISTICS_JOB)
+ @Scope("prototype")
+ public Job calculateAndLoadStatisticsJob(JobBuilderFactory jobBuilderFactory) {
+ logger.debug("Building '" + CALCULATE_AND_LOAD_STATISTICS_JOB + "'");
+
+ return jobBuilderFactory
+ .get(CALCULATE_AND_LOAD_STATISTICS_JOB)
+ .incrementer(new NewJobIncrementer())
+ .start(calculateAndLoadStatisticsStep)
+ .build();
+ }
+
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java
new file mode 100644
index 000000000..3aaa813e7
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs.steps;
+
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.step.tasklet.TaskletStep;
+import org.springframework.batch.item.ItemProcessor;
+import org.springframework.batch.item.ItemStreamReader;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
+import uk.ac.ebi.eva.pipeline.configuration.ChunkSizeCompletionPolicyConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.io.readers.VariantStatsReaderConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.io.writers.VariantStatsWriterConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors.VariantStatsProcessorConfiguration;
+
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_PROCESSOR;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_READER;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_WRITER;
+
+
+@Configuration
+@EnableBatchProcessing
+@Import({VariantStatsReaderConfiguration.class, VariantStatsWriterConfiguration.class,
+ VariantStatsProcessorConfiguration.class, ChunkSizeCompletionPolicyConfiguration.class})
+public class CalculateAndLoadStatisticsStepConfiguration {
+
+ @Bean(CALCULATE_AND_LOAD_STATISTICS_STEP)
+ public Step calculateAndLoadStatisticsStep(
+ @Qualifier(VARIANT_STATS_READER) ItemStreamReader variantStatsReader,
+ @Qualifier(VARIANT_STATS_PROCESSOR) ItemProcessor variantStatsProcessor,
+ @Qualifier(VARIANT_STATS_WRITER) ItemWriter variantStatsWriter,
+ StepBuilderFactory stepBuilderFactory,
+ SimpleCompletionPolicy chunkSizeCompletionPolicy) {
+ TaskletStep step = stepBuilderFactory.get(CALCULATE_AND_LOAD_STATISTICS_STEP)
+ .chunk(chunkSizeCompletionPolicy)
+ .reader(variantStatsReader)
+ .processor(variantStatsProcessor)
+ .writer(variantStatsWriter)
+ .build();
+ return step;
+ }
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantStatsProcessorConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantStatsProcessorConfiguration.java
new file mode 100644
index 000000000..23b1b92fb
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantStatsProcessorConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors;
+
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.ItemProcessor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
+import uk.ac.ebi.eva.pipeline.io.processors.VariantStatsProcessor;
+
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_PROCESSOR;
+
+@Configuration
+public class VariantStatsProcessorConfiguration {
+
+ @Bean(VARIANT_STATS_PROCESSOR)
+ @StepScope
+ public ItemProcessor variantStatsProcessor() {
+ return new VariantStatsProcessor();
+ }
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java
new file mode 100644
index 000000000..6e510345e
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java
@@ -0,0 +1,186 @@
+package uk.ac.ebi.eva.pipeline.io.processors;
+
+import com.mongodb.BasicDBObject;
+import org.opencb.biodata.models.feature.Genotype;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.item.ItemProcessor;
+import uk.ac.ebi.eva.commons.models.data.VariantStats;
+import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
+import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo;
+import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantStatsMongo;
+import uk.ac.ebi.eva.pipeline.io.readers.VariantStatsReader;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class VariantStatsProcessor implements ItemProcessor {
+ private static final Logger logger = LoggerFactory.getLogger(VariantStatsProcessor.class);
+ private static final String GENOTYPE_COUNTS_MAP = "genotypeCountsMap";
+ private static final String ALLELE_COUNTS_MAP = "alleleCountsMap";
+ private static final String MISSING_GENOTYPE = "missingGenotype";
+ private static final String MISSING_ALLELE = "missingAllele";
+ private static final String DEFAULT_GENOTYPE = "def";
+ private static final List MISSING_GENOTYPE_ALLELE_REPRESENTATIONS = Arrays.asList(".", "-1");
+
+ public VariantStatsProcessor() {
+ }
+
+ @Override
+ public VariantDocument process(VariantDocument variant) {
+ Map filesIdNumberOfSamplesMap = VariantStatsReader.getFilesIdAndNumberOfSamplesMap();
+
+ String variantRef = variant.getReference();
+ String variantAlt = variant.getAlternate();
+ Set variantStatsSet = new HashSet<>();
+
+ Set variantSourceEntrySet = variant.getVariantSources();
+ for (VariantSourceEntryMongo variantSourceEntry : variantSourceEntrySet) {
+ String studyId = variantSourceEntry.getStudyId();
+ String fileId = variantSourceEntry.getFileId();
+
+ BasicDBObject sampleData = variantSourceEntry.getSampleData();
+ if (sampleData == null || sampleData.isEmpty()) {
+ continue;
+ }
+
+ VariantStats variantStats = getVariantStats(variantRef, variantAlt, variantSourceEntry.getAlternates(), sampleData, filesIdNumberOfSamplesMap.get(fileId));
+ VariantStatsMongo variantStatsMongo = new VariantStatsMongo(studyId, fileId, "ALL", variantStats);
+
+ variantStatsSet.add(variantStatsMongo);
+ }
+
+ if (!variantStatsSet.isEmpty()) {
+ variant.setStats(variantStatsSet);
+ }
+
+ return variant;
+ }
+
+ public VariantStats getVariantStats(String variantRef, String variantAlt, String[] fileAlternates, BasicDBObject sampleData, int totalSamplesForFileId) {
+ Map> countsMap = getGenotypeAndAllelesCounts(sampleData, totalSamplesForFileId);
+ Map genotypeCountsMap = countsMap.get(GENOTYPE_COUNTS_MAP);
+ Map alleleCountsMap = countsMap.get(ALLELE_COUNTS_MAP);
+
+ // Calculate Genotype Stats
+ int missingGenotypes = genotypeCountsMap.getOrDefault(MISSING_GENOTYPE, 0);
+ genotypeCountsMap.remove(MISSING_GENOTYPE);
+ Map genotypeCount = genotypeCountsMap.entrySet().stream()
+ .collect(Collectors.toMap(entry -> new Genotype(entry.getKey(), variantRef, variantAlt), entry -> entry.getValue()));
+ // find the minor genotype i.e. second highest entry in terms of counts
+ Optional> minorGenotypeEntry = genotypeCountsMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
+ .skip(1)
+ .findFirst();
+ String minorGenotype = "";
+ float minorGenotypeFrequency = 0.0f;
+ if (minorGenotypeEntry.isPresent()) {
+ minorGenotype = minorGenotypeEntry.get().getKey();
+ int totalGenotypes = genotypeCountsMap.values().stream().reduce(0, Integer::sum);
+ minorGenotypeFrequency = (float) minorGenotypeEntry.get().getValue() / totalGenotypes;
+ }
+
+
+ // Calculate Allele Stats
+ int missingAlleles = alleleCountsMap.getOrDefault(MISSING_ALLELE, 0);
+ alleleCountsMap.remove(MISSING_ALLELE);
+ // find the minor allele i.e. second highest entry in terms of counts
+ Optional> minorAlleleEntry = alleleCountsMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
+ .skip(1)
+ .findFirst();
+ String minorAllele = "";
+ float minorAlleleFrequency = 0.0f;
+ if (minorAlleleEntry.isPresent()) {
+ int minorAlleleEntryCount = minorAlleleEntry.get().getValue();
+ int totalAlleles = alleleCountsMap.values().stream().reduce(0, Integer::sum);
+ minorAlleleFrequency = (float) minorAlleleEntryCount / totalAlleles;
+
+ String minorAlleleKey = alleleCountsMap.entrySet().stream()
+ .filter(entry -> entry.getValue().equals(minorAlleleEntryCount))
+ .sorted(Map.Entry.comparingByKey())
+ .findFirst()
+ .get()
+ .getKey();
+
+ minorAllele = minorAlleleKey.equals("0") ? variantRef : minorAlleleKey.equals("1") ? variantAlt : fileAlternates[Integer.parseInt(minorAlleleKey) - 2];
+ }
+
+ VariantStats variantStats = new VariantStats();
+ variantStats.setRefAllele(variantRef);
+ variantStats.setAltAllele(variantAlt);
+ variantStats.setMissingGenotypes(missingGenotypes);
+ variantStats.setMgf(minorGenotypeFrequency);
+ variantStats.setMgfGenotype(minorGenotype);
+ variantStats.setGenotypesCount(genotypeCount);
+ variantStats.setMissingAlleles(missingAlleles);
+ variantStats.setMaf(minorAlleleFrequency);
+ variantStats.setMafAllele(minorAllele);
+
+ return variantStats;
+ }
+
+ private Map> getGenotypeAndAllelesCounts(BasicDBObject sampleData, int totalSamplesForFileId) {
+ Map> genotypeAndAllelesCountsMap = new HashMap<>();
+ Map genotypeCountsMap = new HashMap<>();
+ Map alleleCountsMap = new HashMap<>();
+
+ String defaultGenotype = "";
+ for (Map.Entry entry : sampleData.entrySet()) {
+ String genotype = entry.getKey();
+ if (genotype.equals(DEFAULT_GENOTYPE)) {
+ defaultGenotype = entry.getValue().toString();
+ continue;
+ }
+
+ int noOfSamples = ((List) entry.getValue()).size();
+ String[] genotypeParts = genotype.split("\\||/");
+
+ if (Arrays.stream(genotypeParts).anyMatch(gp -> MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(gp))) {
+ genotypeCountsMap.put(MISSING_GENOTYPE, genotypeCountsMap.getOrDefault(MISSING_GENOTYPE, 0) + 1);
+ } else {
+ genotypeCountsMap.put(genotype, noOfSamples);
+ }
+
+ for (String genotypePart : genotypeParts) {
+ if (MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(genotypePart)) {
+ alleleCountsMap.put(MISSING_ALLELE, alleleCountsMap.getOrDefault(MISSING_ALLELE, 0) + noOfSamples);
+ } else {
+ alleleCountsMap.put(genotypePart, alleleCountsMap.getOrDefault(genotypePart, 0) + noOfSamples);
+ }
+ }
+ }
+
+ if (!defaultGenotype.isEmpty()) {
+ int defaultGenotypeCount = totalSamplesForFileId - genotypeCountsMap.values().stream().reduce(0, Integer::sum);
+
+ String[] genotypeParts = defaultGenotype.split("\\||/");
+ if (Arrays.stream(genotypeParts).anyMatch(gp -> MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(gp))) {
+ genotypeCountsMap.put(MISSING_GENOTYPE, genotypeCountsMap.getOrDefault(MISSING_GENOTYPE, 0) + 1);
+ } else {
+ genotypeCountsMap.put(defaultGenotype, defaultGenotypeCount);
+ }
+
+ for (String genotypePart : genotypeParts) {
+ if (MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(genotypePart)) {
+ alleleCountsMap.put(MISSING_ALLELE, alleleCountsMap.getOrDefault(MISSING_ALLELE, 0) + defaultGenotypeCount);
+ } else {
+ alleleCountsMap.put(genotypePart, alleleCountsMap.getOrDefault(genotypePart, 0) + defaultGenotypeCount);
+ }
+ }
+ }
+
+ genotypeAndAllelesCountsMap.put(GENOTYPE_COUNTS_MAP, genotypeCountsMap);
+ genotypeAndAllelesCountsMap.put(ALLELE_COUNTS_MAP, alleleCountsMap);
+
+ return genotypeAndAllelesCountsMap;
+ }
+
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java
new file mode 100644
index 000000000..d845d0bd3
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java
@@ -0,0 +1,119 @@
+package uk.ac.ebi.eva.pipeline.io.readers;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.ItemStreamReader;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.convert.MongoConverter;
+import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
+import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo;
+import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static com.mongodb.client.model.Aggregates.match;
+import static com.mongodb.client.model.Aggregates.project;
+import static com.mongodb.client.model.Projections.computed;
+import static com.mongodb.client.model.Projections.fields;
+import static java.util.Arrays.asList;
+
+public class VariantStatsReader implements ItemStreamReader {
+ private static final Logger logger = LoggerFactory.getLogger(VariantStatsReader.class);
+
+ private DatabaseParameters databaseParameters;
+ private MongoTemplate mongoTemplate;
+ private MongoCursor cursor;
+ private MongoConverter converter;
+ private int chunkSize;
+ private String studyId;
+
+ // Store the map of files to number of sample from the file_2_0 collection
+ private static Map filesIdNumberOfSamplesMap = new HashMap<>();
+
+ public VariantStatsReader(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate, String studyId, int chunkSize) {
+ this.databaseParameters = databaseParameters;
+ this.mongoTemplate = mongoTemplate;
+ this.studyId = studyId;
+ this.chunkSize = chunkSize;
+ }
+
+ @Override
+ public VariantDocument read() {
+ Document nextElement = cursor.tryNext();
+ return (nextElement != null) ? getVariant(nextElement) : null;
+ }
+
+ private VariantDocument getVariant(Document variantDocument) {
+ return converter.read(VariantDocument.class, new BasicDBObject(variantDocument));
+ }
+
+ @Override
+ public void open(ExecutionContext executionContext) throws ItemStreamException {
+ initializeReader();
+ }
+
+ public void initializeReader() {
+ cursor = initializeCursor();
+ converter = mongoTemplate.getConverter();
+ if (filesIdNumberOfSamplesMap.isEmpty()) {
+ populateFilesIdAndNumberOfSamplesMap();
+ }
+ }
+
+ private MongoCursor initializeCursor() {
+ Bson query = Filters.elemMatch(VariantDocument.FILES_FIELD, Filters.eq(VariantSourceEntryMongo.STUDYID_FIELD, studyId));
+ logger.info("Issuing find: {}", query);
+
+ FindIterable statsVariantDocuments = getVariants(query);
+ return statsVariantDocuments.iterator();
+ }
+
+ private FindIterable getVariants(Bson query) {
+ return mongoTemplate.getCollection(databaseParameters.getCollectionVariantsName())
+ .find(query)
+ .noCursorTimeout(true)
+ .batchSize(chunkSize);
+ }
+
+ private void populateFilesIdAndNumberOfSamplesMap() {
+ Bson matchStage = match(Filters.eq("sid", studyId));
+ Bson projectStage = project(fields(
+ computed("fid", "$fid"),
+ computed("numOfSamples", new Document("$size", new Document("$objectToArray", "$samp")))
+ ));
+ filesIdNumberOfSamplesMap = mongoTemplate.getCollection(databaseParameters.getCollectionFilesName())
+ .aggregate(asList(matchStage, projectStage))
+ .into(new ArrayList<>())
+ .stream()
+ .collect(Collectors.toMap(doc -> doc.getString("fid"), doc -> doc.getInteger("numOfSamples")));
+ }
+
+ public static Map getFilesIdAndNumberOfSamplesMap() {
+ return filesIdNumberOfSamplesMap;
+ }
+
+ @Override
+ public void update(ExecutionContext executionContext) throws ItemStreamException {
+
+ }
+
+ @Override
+ public void close() throws ItemStreamException {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+}
+
+
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java
new file mode 100644
index 000000000..6752dc6b4
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java
@@ -0,0 +1,46 @@
+package uk.ac.ebi.eva.pipeline.io.writers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.data.mongodb.core.BulkOperations;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
+import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters;
+
+import java.util.List;
+
+public class VariantStatsWriter implements ItemWriter {
+ private static final Logger logger = LoggerFactory.getLogger(VariantStatsWriter.class);
+ private DatabaseParameters databaseParameters;
+ private MongoTemplate mongoTemplate;
+
+ public VariantStatsWriter(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate) {
+ this.databaseParameters = databaseParameters;
+ this.mongoTemplate = mongoTemplate;
+ }
+
+ @Override
+ public void write(List extends VariantDocument> variants) {
+ BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, VariantDocument.class,
+ databaseParameters.getCollectionVariantsName());
+ for (VariantDocument variant : variants) {
+ if (variant.getVariantStatsMongo() == null || variant.getVariantStatsMongo().isEmpty()) {
+ continue;
+ }
+ Query query = new Query(Criteria.where("_id").is(variant.getId()));
+ Update update = new Update();
+ update.set("st", variant.getVariantStatsMongo());
+
+ bulkOperations.updateOne(query, update);
+ }
+
+ bulkOperations.execute();
+ }
+
+}
+
+
diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java
index d6ce5e36d..8a85d9979 100644
--- a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java
+++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java
@@ -15,6 +15,7 @@
*/
package uk.ac.ebi.eva.pipeline.configuration.jobs;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -32,7 +33,6 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
-
import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration;
import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration;
import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule;
@@ -60,6 +60,8 @@ public class PopulationStatisticsJobTest {
private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl";
+ private static final String DATABASE_NAME = "calculate_stats_test_db";
+
@Rule
public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule();
@@ -73,6 +75,12 @@ public class PopulationStatisticsJobTest {
@Before
public void setUp() throws Exception {
Config.setOpenCGAHome(GenotypedVcfJobTestUtils.getDefaultOpencgaHome());
+ mongoRule.restoreDump(getResourceUrl(MONGO_DUMP), DATABASE_NAME);
+ }
+
+ @After
+ public void cleanUp() {
+ mongoRule.getTemporaryDatabase(DATABASE_NAME).drop();
}
@Test
@@ -80,14 +88,13 @@ public void fullPopulationStatisticsJob() throws Exception {
//Given a valid VCF input file
String input = SMALL_VCF_FILE;
String statsDir = temporaryFolderRule.getRoot().getPath();
- String dbName = mongoRule.restoreDumpInTemporaryDatabase(getResourceUrl(MONGO_DUMP));
String fileId = "1";
String studyId = "1";
JobParameters jobParameters = new EvaJobParameterBuilder()
.collectionFilesName("files")
.collectionVariantsName("variants")
- .databaseName(dbName)
+ .databaseName(DATABASE_NAME)
.inputStudyId(studyId)
.inputVcf(getResource(input).getAbsolutePath())
.inputVcfAggregation("BASIC")
@@ -107,7 +114,7 @@ public void fullPopulationStatisticsJob() throws Exception {
// The DB docs should have the field "st"
VariantStorageManager variantStorageManager = StorageManagerFactory.getVariantStorageManager();
- VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(dbName, null);
+ VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(DATABASE_NAME, null);
VariantDBIterator iterator = variantDBAdaptor.iterator(new QueryOptions());
assertEquals(1, iterator.next().getSourceEntries().values().iterator().next().getCohortStats().size());
diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java
new file mode 100644
index 000000000..f44291f47
--- /dev/null
+++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs.steps;
+
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.test.JobLauncherTestUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+import uk.ac.ebi.eva.pipeline.configuration.BeanNames;
+import uk.ac.ebi.eva.pipeline.configuration.MongoConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.CalculateAndLoadStatisticsJobConfiguration;
+import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration;
+import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration;
+import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule;
+import uk.ac.ebi.eva.test.rules.TemporaryMongoRule;
+import uk.ac.ebi.eva.utils.EvaJobParameterBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted;
+import static uk.ac.ebi.eva.test.utils.TestFileUtils.getResourceUrl;
+
+/**
+ * Test for {@link CalculateAndLoadStatisticsStepConfiguration}
+ */
+@RunWith(SpringRunner.class)
+@TestPropertySource({"classpath:test-stats.properties"})
+@ContextConfiguration(classes = {CalculateAndLoadStatisticsJobConfiguration.class, BatchTestConfiguration.class,
+ TemporaryRuleConfiguration.class, MongoConfiguration.class})
+public class CalculateAndLoadStatisticsStepTest {
+ private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl";
+
+ private static final String COLLECTION_VARIANTS_NAME = "variants";
+
+ private static final String COLLECTION_FILES_NAME = "files";
+
+ private static final String DATABASE_NAME = "calculate_load_stats_test_db";
+
+ private static final String STUDY_ID = "1";
+
+ @Autowired
+ @Rule
+ public TemporaryMongoRule mongoRule;
+
+ @Rule
+ public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule();
+
+ @Autowired
+ private JobLauncherTestUtils jobLauncherTestUtils;
+
+ @Before
+ public void setUp() throws Exception {
+ mongoRule.getTemporaryDatabase(DATABASE_NAME).drop();
+ mongoRule.restoreDump(getResourceUrl(MONGO_DUMP), DATABASE_NAME);
+ }
+
+ @After
+ public void cleanUp() {
+ mongoRule.getTemporaryDatabase(DATABASE_NAME).drop();
+ }
+
+ @Test
+ public void calculateAndLoadStatisticsStepShouldCalculateAndLoadStats() {
+ JobParameters jobParameters = new EvaJobParameterBuilder()
+ .collectionFilesName(COLLECTION_FILES_NAME)
+ .collectionVariantsName(COLLECTION_VARIANTS_NAME)
+ .databaseName(DATABASE_NAME)
+ .inputStudyId(STUDY_ID)
+ .chunkSize("100")
+ .toJobParameters();
+
+ JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP, jobParameters);
+
+ // check job completed successfully
+ assertCompleted(jobExecution);
+ List documents = mongoRule.getTemporaryDatabase(DATABASE_NAME).getCollection(COLLECTION_VARIANTS_NAME)
+ .find().into(new ArrayList<>());
+ Assert.assertTrue(documents.size() == 300);
+ // assert all statistics are calculated for all documents
+ Assert.assertTrue(documents.stream().allMatch(doc -> doc.containsKey("st")));
+
+ // assert statistics for the variant with 20_61098_C_T
+ ArrayList variantStatsList = documents.stream().filter(doc -> doc.get("_id").equals("20_61098_C_T"))
+ .findFirst().get().get("st", ArrayList.class);
+ assertEquals(1, variantStatsList.size());
+ Document variantStats = variantStatsList.get(0);
+ Document numOfGT = (Document) variantStats.get("numGt");
+ assertEquals(1290, numOfGT.get("0|0"));
+ assertEquals(417, numOfGT.get("1|0"));
+ assertEquals(573, numOfGT.get("0|1"));
+ assertEquals(224, numOfGT.get("1|1"));
+ assertEquals(0.2871405780315399, variantStats.get("maf"));
+ assertEquals(0.228833869099617, variantStats.get("mgf"));
+ assertEquals("T", variantStats.get("mafAl"));
+ assertEquals("0|1", variantStats.get("mgfGt"));
+ assertEquals(0, variantStats.get("missAl"));
+ assertEquals(0, variantStats.get("missGt"));
+ }
+
+}
diff --git a/src/test/resources/test-stats.properties b/src/test/resources/test-stats.properties
new file mode 100644
index 000000000..9d40d0c3b
--- /dev/null
+++ b/src/test/resources/test-stats.properties
@@ -0,0 +1,15 @@
+db.collections.variants.name=variants
+db.collections.files.name=files
+db.collections.features.name=features
+db.collections.stats.name=populationStatistics
+
+spring.data.mongodb.host=|eva.mongo.host.test|
+spring.data.mongodb.port=27017
+spring.data.mongodb.password=
+mongodb.read-preference=primary
+spring.data.mongodb.authentication-mechanism=SCRAM-SHA-1
+
+config.db.read-preference=primary
+
+# See https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.1-Release-Notes#bean-overriding
+spring.main.allow-bean-definition-overriding=true
\ No newline at end of file