diff --git a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java index 4ddb8a23..a75b653d 100644 --- a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java +++ b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java @@ -189,4 +189,8 @@ public String getFileId() { public String[] getAlternates() { return alternates; } + + public BasicDBObject getAttrs() { + return attrs; + } } diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index ed7a2a48..d36a4b86 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -64,6 +64,7 @@ public class BeanNames { public static final String LOAD_ANNOTATION_METADATA_STEP = "annotation-metadata-step"; public static final String ACCESSION_IMPORT_STEP = "accession-import-step"; public static final String VARIANT_STATS_STEP = "variant-stats-step"; + public static final String FILE_STATS_STEP = "file-stats-step"; public static final String AGGREGATED_VCF_JOB = "aggregated-vcf-job"; public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job"; @@ -74,4 +75,5 @@ public class BeanNames { public static final String DROP_STUDY_JOB = "drop-study-job"; public static final String ACCESSION_IMPORT_JOB = "accession-import-job"; public static final String VARIANT_STATS_JOB = "variant-stats-job"; + public static final String FILE_STATS_JOB = "file-stats-job"; } diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/FileStatsJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/FileStatsJobConfiguration.java new file mode 100644 index 00000000..da511736 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/FileStatsJobConfiguration.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.FileStatsStepConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.FILE_STATS_JOB; +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.FILE_STATS_STEP; + +@Configuration +@EnableBatchProcessing +@Import({FileStatsStepConfiguration.class}) +public class FileStatsJobConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(FileStatsJobConfiguration.class); + + @Autowired + @Qualifier(FILE_STATS_STEP) + private Step fileStatsStep; + + @Bean(FILE_STATS_JOB) + @Scope("prototype") + public Job fileStatsJob(JobBuilderFactory jobBuilderFactory) { + logger.debug("Building '" + FILE_STATS_JOB + "'"); + + return jobBuilderFactory + .get(FILE_STATS_JOB) + .incrementer(new NewJobIncrementer()) + .start(fileStatsStep) + .build(); + } + +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java new file mode 100644 index 00000000..1ba547f3 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoTemplate; +import uk.ac.ebi.eva.pipeline.jobs.steps.tasklets.FileStatsTasklet; +import uk.ac.ebi.eva.pipeline.parameters.ChunkSizeParameters; +import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; +import uk.ac.ebi.eva.pipeline.parameters.InputParameters; +import uk.ac.ebi.eva.pipeline.parameters.JobOptions; +import uk.ac.ebi.eva.utils.TaskletUtils; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.FILE_STATS_STEP; + + +@Configuration +@EnableBatchProcessing +public class FileStatsStepConfiguration { + private static final Logger logger = LoggerFactory.getLogger(FileStatsStepConfiguration.class); + + @Bean + @StepScope + public FileStatsTasklet fileStatsTasklet(DatabaseParameters databaseParameters, + MongoTemplate mongoTemplate, + InputParameters inputParameters, + ChunkSizeParameters chunkSizeParameters) { + return new FileStatsTasklet(databaseParameters, mongoTemplate, inputParameters.getStudyId(), + chunkSizeParameters.getChunkSize()); + } + + @Bean(FILE_STATS_STEP) + public TaskletStep fileStatsStep(DatabaseParameters databaseParameters, + MongoTemplate mongoTemplate, + InputParameters inputParameters, + ChunkSizeParameters chunkSizeParameters, + StepBuilderFactory stepBuilderFactory, + JobOptions jobOptions) { + logger.debug("Building '" + FILE_STATS_STEP + "'"); + + return TaskletUtils.generateStep(stepBuilderFactory, FILE_STATS_STEP, + fileStatsTasklet(databaseParameters, mongoTemplate, inputParameters, chunkSizeParameters), + jobOptions.isAllowStartIfComplete()); + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java index d845d0bd..eda7561a 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static com.mongodb.client.model.Accumulators.sum; +import static com.mongodb.client.model.Aggregates.group; import static com.mongodb.client.model.Aggregates.match; import static com.mongodb.client.model.Aggregates.project; import static com.mongodb.client.model.Projections.computed; @@ -92,11 +94,13 @@ private void populateFilesIdAndNumberOfSamplesMap() { computed("fid", "$fid"), computed("numOfSamples", new Document("$size", new Document("$objectToArray", "$samp"))) )); + Bson groupStage = group("$fid", sum("totalNumOfSamples", "$numOfSamples")); + filesIdNumberOfSamplesMap = mongoTemplate.getCollection(databaseParameters.getCollectionFilesName()) - .aggregate(asList(matchStage, projectStage)) + .aggregate(asList(matchStage, projectStage, groupStage)) .into(new ArrayList<>()) .stream() - .collect(Collectors.toMap(doc -> doc.getString("fid"), doc -> doc.getInteger("numOfSamples"))); + .collect(Collectors.toMap(doc -> doc.getString("_id"), doc -> doc.getInteger("totalNumOfSamples"))); } public static Map getFilesIdAndNumberOfSamplesMap() { diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java new file mode 100644 index 00000000..e1a7f5da --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java @@ -0,0 +1,207 @@ +package uk.ac.ebi.eva.pipeline.jobs.steps.tasklets; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.data.mongodb.core.BulkOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import uk.ac.ebi.eva.commons.models.data.Variant; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo; +import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.mongodb.client.model.Accumulators.sum; +import static com.mongodb.client.model.Aggregates.group; +import static com.mongodb.client.model.Aggregates.match; +import static com.mongodb.client.model.Aggregates.project; +import static com.mongodb.client.model.Projections.computed; +import static com.mongodb.client.model.Projections.fields; +import static java.util.Arrays.asList; + +public class FileStatsTasklet implements Tasklet { + private static final Logger logger = LoggerFactory.getLogger(FileStatsTasklet.class); + + private static final String KEY_NO_OF_SAMPLES = "nSamp"; + private static final String KEY_NO_OF_VARIANTS = "nVar"; + private static final String KEY_NO_OF_SNP = "nSnp"; + private static final String KEY_NO_OF_INDEL = "nIndel"; + private static final String KEY_NO_OF_PASS = "nPass"; + private static final String KEY_NO_OF_TRANSITION = "nTi"; + private static final String KEY_NO_OF_TRANSVERSION = "nTv"; + private static final String KEY_NO_OF_MEANQ = "meanQ"; + + // Store the map of files to number of sample from the file_2_0 collection + private static Map fileIdNumberOfSamplesMap; + // Store the map of files to their stats counts + private static Map> fileIdCountsMap = new HashMap<>(); + + private DatabaseParameters databaseParameters; + private MongoTemplate mongoTemplate; + private MongoCursor cursor; + private MongoConverter converter; + private int chunkSize; + private String studyId; + + public FileStatsTasklet(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate, String studyId, int chunkSize) { + this.databaseParameters = databaseParameters; + this.mongoTemplate = mongoTemplate; + this.studyId = studyId; + this.chunkSize = chunkSize; + this.converter = mongoTemplate.getConverter(); + } + + @Override + public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { + populateFilesIdAndNumberOfSamplesMap(); + initializeFileIdCountsMap(); + cursor = initializeCursor(); + try { + while (cursor.hasNext()) { + VariantDocument variantDocument = getVariant(cursor.next()); + processCounts(variantDocument); + } + } finally { + cursor.close(); + } + + writeStatsInTheDB(); + + return RepeatStatus.FINISHED; + } + + private void populateFilesIdAndNumberOfSamplesMap() { + Bson matchStage = match(Filters.eq("sid", studyId)); + Bson projectStage = project(fields( + computed("fid", "$fid"), + computed("numOfSamples", new Document("$size", new Document("$objectToArray", "$samp"))) + )); + Bson groupStage = group("$fid", sum("totalNumOfSamples", "$numOfSamples")); + + fileIdNumberOfSamplesMap = mongoTemplate.getCollection(databaseParameters.getCollectionFilesName()) + .aggregate(asList(matchStage, projectStage, groupStage)) + .into(new ArrayList<>()) + .stream() + .collect(Collectors.toMap(doc -> doc.getString("_id"), doc -> doc.getInteger("totalNumOfSamples"))); + } + + private void initializeFileIdCountsMap() { + for (String fileId : fileIdNumberOfSamplesMap.keySet()) { + HashMap countsMap = new HashMap<>(); + + countsMap.put(KEY_NO_OF_SAMPLES, fileIdNumberOfSamplesMap.get(fileId)); + countsMap.put(KEY_NO_OF_VARIANTS, 0); + countsMap.put(KEY_NO_OF_SNP, 0); + countsMap.put(KEY_NO_OF_INDEL, 0); + countsMap.put(KEY_NO_OF_PASS, 0); + countsMap.put(KEY_NO_OF_TRANSITION, 0); + countsMap.put(KEY_NO_OF_TRANSVERSION, 0); + countsMap.put(KEY_NO_OF_MEANQ, 0); + + fileIdCountsMap.put(fileId, countsMap); + } + } + + private MongoCursor initializeCursor() { + Bson query = Filters.elemMatch(VariantDocument.FILES_FIELD, Filters.eq(VariantSourceEntryMongo.STUDYID_FIELD, studyId)); + logger.info("Issuing find: {}", query); + + FindIterable statsVariantDocuments = mongoTemplate.getCollection(databaseParameters.getCollectionVariantsName()) + .find(query) + .noCursorTimeout(true) + .batchSize(chunkSize); + + return statsVariantDocuments.iterator(); + } + + private VariantDocument getVariant(Document variantDocument) { + return converter.read(VariantDocument.class, new BasicDBObject(variantDocument)); + } + + private void processCounts(VariantDocument variantDocument) { + // get all fileIds this variant belongs to + Set fileIds = variantDocument.getVariantSources().stream() + .filter(vse -> vse.getStudyId().equals(studyId)) + .map(vse -> vse.getFileId()) + .collect(Collectors.toSet()); + + boolean isSNV = variantDocument.getVariantType().equals(Variant.VariantType.SNV); + boolean isINDEL = variantDocument.getVariantType().equals(Variant.VariantType.INDEL); + + boolean isTransition = false; + boolean isTransversion = false; + if (isSNV) { + String ref = variantDocument.getReference(); + String alt = variantDocument.getAlternate(); + Set transitions = new HashSet<>(Arrays.asList("AG", "GA", "CT", "TC")); + String refAlt = ref + alt; + if (transitions.contains(refAlt)) { + isTransition = true; + } else { + isTransversion = true; + } + } + + for (String fileId : fileIds) { + Map countsMap = fileIdCountsMap.get(fileId); + countsMap.merge(KEY_NO_OF_VARIANTS, 1, Integer::sum); + + if (isSNV) { + countsMap.merge(KEY_NO_OF_SNP, 1, Integer::sum); + } else if (isINDEL) { + countsMap.merge(KEY_NO_OF_INDEL, 1, Integer::sum); + } + + if (isTransition) { + countsMap.merge(KEY_NO_OF_TRANSITION, 1, Integer::sum); + } else if (isTransversion) { + countsMap.merge(KEY_NO_OF_TRANSVERSION, 1, Integer::sum); + } + + boolean hasPass = variantDocument.getVariantSources().stream() + .filter(vse -> vse.getStudyId().equals(studyId) && vse.getFileId().equals(fileId)) + .map(vse -> (vse.getAttrs() != null) ? vse.getAttrs().getOrDefault("FILTER", "") : "") + .allMatch(f -> f.equals("PASS")); + if (hasPass) { + countsMap.merge(KEY_NO_OF_PASS, 1, Integer::sum); + } + } + } + + private void writeStatsInTheDB() { + BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, + databaseParameters.getCollectionFilesName()); + + for (Map.Entry> entry : fileIdCountsMap.entrySet()) { + String fileId = entry.getKey(); + Map countsMap = entry.getValue(); + + Query query = new Query(Criteria.where("sid").is(studyId).and("fid").is(fileId)); + Update update = new Update(); + update.set("st", countsMap); + bulkOperations.updateMulti(query, update); + } + + bulkOperations.execute(); + } +} diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java new file mode 100644 index 00000000..d45dcdba --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps; + +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.eva.pipeline.configuration.BeanNames; +import uk.ac.ebi.eva.pipeline.configuration.MongoConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.FileStatsJobConfiguration; +import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; +import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; +import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule; +import uk.ac.ebi.eva.test.rules.TemporaryMongoRule; +import uk.ac.ebi.eva.utils.EvaJobParameterBuilder; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted; +import static uk.ac.ebi.eva.test.utils.TestFileUtils.getResourceUrl; + +/** + * Test for {@link VariantStatsStepConfiguration} + */ +@RunWith(SpringRunner.class) +@TestPropertySource({"classpath:test-stats.properties"}) +@ContextConfiguration(classes = {FileStatsJobConfiguration.class, BatchTestConfiguration.class, + TemporaryRuleConfiguration.class, MongoConfiguration.class}) +public class FileStatsStepTest { + private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl"; + + private static final String COLLECTION_VARIANTS_NAME = "variants"; + + private static final String COLLECTION_FILES_NAME = "files"; + + private static final String DATABASE_NAME = "file_stats_test_db"; + + private static final String STUDY_ID = "1"; + + @Autowired + @Rule + public TemporaryMongoRule mongoRule; + + @Rule + public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule(); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Before + public void setUp() throws Exception { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); + mongoRule.restoreDump(getResourceUrl(MONGO_DUMP), DATABASE_NAME); + } + + @After + public void cleanUp() { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); + } + + @Test + public void fileStatsStepShouldCalculateAndLoadStats() { + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(COLLECTION_FILES_NAME) + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(DATABASE_NAME) + .inputStudyId(STUDY_ID) + .chunkSize("100") + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.FILE_STATS_STEP, jobParameters); + + // check job completed successfully + assertCompleted(jobExecution); + List documents = mongoRule.getTemporaryDatabase(DATABASE_NAME).getCollection(COLLECTION_FILES_NAME) + .find().into(new ArrayList<>()); + assertEquals(1, documents.size()); + // assert all statistics are calculated for all documents + Assert.assertTrue(documents.stream().allMatch(doc -> doc.containsKey("st"))); + // assert statistics for the study id 1 and file id 1 + Document fileStats = documents.stream() + .filter(doc -> doc.get("sid").equals("1") && doc.get("fid").equals("1")) + .findFirst().get().get("st", Document.class); + assertEquals(2504, fileStats.get("nSamp")); + assertEquals(300, fileStats.get("nVar")); + assertEquals(281, fileStats.get("nSnp")); + assertEquals(19, fileStats.get("nIndel")); + assertEquals(300, fileStats.get("nPass")); + assertEquals(178, fileStats.get("nTi")); + assertEquals(103, fileStats.get("nTv")); + assertEquals(0, fileStats.get("meanQ")); + } +}