diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java index 41ec500b..62da70d7 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java @@ -39,6 +39,10 @@ public VariantStatsProcessor(String studyId) { @Override public VariantDocument process(VariantDocument variant) { Map filesIdNumberOfSamplesMap = VariantStatsReader.getFilesIdAndNumberOfSamplesMap(); + if (filesIdNumberOfSamplesMap.isEmpty()) { + // No new stats can be calculated, no processing required + return variant; + } Set fidSet = filesIdNumberOfSamplesMap.keySet(); String variantRef = variant.getReference(); diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java index 6752dc6b..cf55d663 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java @@ -9,9 +9,12 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.pipeline.io.readers.VariantStatsReader; import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; public class VariantStatsWriter implements ItemWriter { private static final Logger logger = LoggerFactory.getLogger(VariantStatsWriter.class); @@ -25,20 +28,30 @@ public VariantStatsWriter(DatabaseParameters databaseParameters, MongoTemplate m @Override public void write(List variants) { - BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, VariantDocument.class, - databaseParameters.getCollectionVariantsName()); - for (VariantDocument variant : variants) { - if (variant.getVariantStatsMongo() == null || variant.getVariantStatsMongo().isEmpty()) { - continue; + Map filesIdNumberOfSamplesMap = VariantStatsReader.getFilesIdAndNumberOfSamplesMap(); + if (filesIdNumberOfSamplesMap.isEmpty()) { + // No new stats would have been calculated, no need to write anything + return; + } + + variants = variants.stream() + .filter(v -> v.getVariantStatsMongo() != null && !v.getVariantStatsMongo().isEmpty()) + .collect(Collectors.toList()); + + if (!variants.isEmpty()) { + BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, VariantDocument.class, + databaseParameters.getCollectionVariantsName()); + + for (VariantDocument variant : variants) { + Query query = new Query(Criteria.where("_id").is(variant.getId())); + Update update = new Update(); + update.set("st", variant.getVariantStatsMongo()); + + bulkOperations.updateOne(query, update); } - Query query = new Query(Criteria.where("_id").is(variant.getId())); - Update update = new Update(); - update.set("st", variant.getVariantStatsMongo()); - bulkOperations.updateOne(query, update); + bulkOperations.execute(); } - - bulkOperations.execute(); } } diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java index 8a93ebe9..e01c8b70 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java @@ -75,18 +75,21 @@ public FileStatsTasklet(DatabaseParameters databaseParameters, MongoTemplate mon @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { populateFilesIdAndNumberOfSamplesMap(); - initializeFileIdCountsMap(); - cursor = initializeCursor(); - try { - while (cursor.hasNext()) { - VariantDocument variantDocument = getVariant(cursor.next()); - processCounts(variantDocument); + + if (!fileIdNumberOfSamplesMap.isEmpty()) { + initializeFileIdCountsMap(); + cursor = initializeCursor(); + try { + while (cursor.hasNext()) { + VariantDocument variantDocument = getVariant(cursor.next()); + processCounts(variantDocument); + } + } finally { + cursor.close(); } - } finally { - cursor.close(); - } - writeStatsInTheDB(); + writeStatsInTheDB(); + } return RepeatStatus.FINISHED; } diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/variantstats/VariantStatsStepTestNoNewStatsCalculated.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/variantstats/VariantStatsStepTestNoNewStatsCalculated.java new file mode 100644 index 00000000..76aed57b --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/variantstats/VariantStatsStepTestNoNewStatsCalculated.java @@ -0,0 +1,163 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps.variantstats; + +import com.mongodb.client.MongoCollection; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.eva.pipeline.configuration.BeanNames; +import uk.ac.ebi.eva.pipeline.configuration.MongoConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.VariantStatsJobConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.VariantStatsStepConfiguration; +import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; +import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; +import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule; +import uk.ac.ebi.eva.test.rules.TemporaryMongoRule; +import uk.ac.ebi.eva.utils.EvaJobParameterBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted; + +/** + * Test for {@link VariantStatsStepConfiguration} + */ +@RunWith(SpringRunner.class) +@TestPropertySource({"classpath:test-stats.properties"}) +@ContextConfiguration(classes = {VariantStatsJobConfiguration.class, BatchTestConfiguration.class, + TemporaryRuleConfiguration.class, MongoConfiguration.class}) +public class VariantStatsStepTestNoNewStatsCalculated { + private static final String COLLECTION_VARIANTS_NAME = "variants"; + + private static final String COLLECTION_FILES_NAME = "files"; + + private static final String DATABASE_NAME = "variant_stats_test_db"; + + @Autowired + @Rule + public TemporaryMongoRule mongoRule; + + @Rule + public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule(); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Before + public void setUp() throws Exception { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); + } + + @After + public void cleanUp() { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); + } + + @Test + public void variantStatsStepShouldCalculateAndLoadStats_NoNewStatsCanbeCalculated() throws Exception { + MongoCollection filesCollection = mongoRule.getCollection(DATABASE_NAME, COLLECTION_FILES_NAME); + MongoCollection variantsCollection = mongoRule.getCollection(DATABASE_NAME, COLLECTION_VARIANTS_NAME); + + filesCollection.insertMany(Arrays.asList( + // multiple entries for fid2 in the files collection + new Document("sid", "sid1").append("fid", "fid2").append("fname", "fname21") + .append("samp", new Document("samp21", 0).append("samp22", 1)).append("samp23", 2), + new Document("sid", "sid1").append("fid", "fid2").append("fname", "fname22") + .append("samp", new Document("samp31", 0).append("samp32", 1)).append("samp33", 2) + )); + + variantsCollection.insertMany(Arrays.asList( + new Document("_id", "chr1_11111111_A_G").append("ref", "A").append("alt", "G") + .append("files", Arrays.asList( + // should not calculate stats - fid2 has more than one entry in the files collection + new Document("sid", "sid1").append("fid", "fid2") + .append("samp", new Document("def", "0|0").append("0|1", Arrays.asList(1))), + // should not calculate stats - no entry for fid3 in files collection + new Document("sid", "sid1").append("fid", "fid3") + .append("samp", new Document("def", "0|0").append("0|1", Arrays.asList(1))), + // should not calculate stats - different study + new Document("sid", "sid2").append("fid", "fid1") + .append("samp", new Document("def", "0|0").append("0|1", Arrays.asList(1))) + )) + .append("st", Arrays.asList( + // should not calculate or add anything for sid1 and fid2 since fid2 has more than one file + + // should not change as it belongs to different study id + new Document("sid", "sid2").append("fid", "fid1") + .append("maf", 0.20000000298023224).append("mgf", 0.20000000298023224) + .append("mafAl", "A").append("mgfGt", "0|0"), + // should not change as it belongs to different fid + new Document("sid", "sid1").append("fid", "fid3") + .append("maf", 0.30000001192092896).append("mgf", 0.30000001192092896) + .append("mafAl", "A").append("mgfGt", "0|0") + + )) + )); + + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(COLLECTION_FILES_NAME) + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(DATABASE_NAME) + .inputStudyId("sid1") + .chunkSize("100") + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.VARIANT_STATS_STEP, jobParameters); + + // check job completed successfully + assertCompleted(jobExecution); + List documents = mongoRule.getTemporaryDatabase(DATABASE_NAME).getCollection(COLLECTION_VARIANTS_NAME) + .find().into(new ArrayList<>()); + Assert.assertTrue(documents.size() == 1); + + // assert data + ArrayList variantStatsList = documents.stream().filter(doc -> doc.get("_id").equals("chr1_11111111_A_G")) + .findFirst().get().get("st", ArrayList.class); + assertEquals(2, variantStatsList.size()); + + // assert remained unchanged + Document variantStatsForSid2Fid1 = variantStatsList.stream() + .filter(st -> st.get("sid").equals("sid2") && st.get("fid").equals("fid1")).findFirst().get(); + assertEquals(0.20000000298023224, variantStatsForSid2Fid1.get("maf")); + assertEquals(0.20000000298023224, variantStatsForSid2Fid1.get("mgf")); + assertEquals("A", variantStatsForSid2Fid1.get("mafAl")); + assertEquals("0|0", variantStatsForSid2Fid1.get("mgfGt")); + + // assert remained unchanged + Document variantStatsForSid1Fid3 = variantStatsList.stream() + .filter(st -> st.get("sid").equals("sid1") && st.get("fid").equals("fid3")).findFirst().get(); + assertEquals(0.30000001192092896, variantStatsForSid1Fid3.get("maf")); + assertEquals(0.30000001192092896, variantStatsForSid1Fid3.get("mgf")); + assertEquals("A", variantStatsForSid1Fid3.get("mafAl")); + assertEquals("0|0", variantStatsForSid1Fid3.get("mgfGt")); + } + +}