Skip to content

Commit

Permalink
Use MongoDBCursorItemReader from variation-commons
Browse files Browse the repository at this point in the history
  • Loading branch information
sundarvenkata-EBI committed Jul 29, 2023
1 parent 38a0976 commit fc82354
Show file tree
Hide file tree
Showing 18 changed files with 85 additions and 257 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<biodata.version>0.4.9</biodata.version>
<cellbase.version>3.1.3</cellbase.version>
<postgresql.version>42.4.0</postgresql.version>
<variation-commons-version>0.8.3</variation-commons-version>
<variation-commons-version>0.8.4-SNAPSHOT</variation-commons-version>
<eva.mongo.host.test>localhost</eva.mongo.host.test>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
Expand Down Expand Up @@ -60,9 +59,9 @@ public MongoMappingContext mongoMappingContext() {

@Bean
@StepScope
public MongoOperations mongoTemplate(DatabaseParameters databaseParameters, MongoMappingContext mongoMappingContext)
public MongoTemplate mongoTemplate(DatabaseParameters databaseParameters, MongoMappingContext mongoMappingContext)
throws UnknownHostException, UnsupportedEncodingException {
return getMongoOperations(databaseParameters.getDatabaseName(), databaseParameters.getMongoConnectionDetails(),
return getMongoTemplate(databaseParameters.getDatabaseName(), databaseParameters.getMongoConnectionDetails(),
mongoMappingContext);
}

Expand All @@ -75,8 +74,8 @@ public MongoClient mongoClient(DatabaseParameters databaseParameters) throws Uns
return client;
}

public static MongoOperations getMongoOperations(String databaseName, MongoConnectionDetails mongoConnectionDetails,
MongoMappingContext mongoMappingContext)
public static MongoTemplate getMongoTemplate(String databaseName, MongoConnectionDetails mongoConnectionDetails,
MongoMappingContext mongoMappingContext)
throws UnknownHostException, UnsupportedEncodingException {
MongoClientURI uri = constructMongoClientURI(databaseName, mongoConnectionDetails);
MongoDbFactory mongoFactory = new SimpleMongoDbFactory(uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoOperations;

import org.springframework.data.mongodb.core.MongoTemplate;
import uk.ac.ebi.eva.pipeline.io.readers.VariantsMongoReader;
import uk.ac.ebi.eva.pipeline.model.EnsemblVariant;
import uk.ac.ebi.eva.pipeline.parameters.AnnotationParameters;
Expand All @@ -40,7 +41,7 @@ public class VariantsMongoReaderConfiguration {

@Bean(VARIANTS_READER)
@StepScope
public ItemStreamReader<List<EnsemblVariant>> variantsMongoReader(MongoOperations mongoOperations,
public ItemStreamReader<List<EnsemblVariant>> variantsMongoReader(MongoTemplate mongoTemplate,
DatabaseParameters databaseParameters,
InputParameters inputParameters,
AnnotationParameters annotationParameters,
Expand All @@ -49,7 +50,7 @@ public ItemStreamReader<List<EnsemblVariant>> variantsMongoReader(MongoOperation
boolean excludeAnnotated = !annotationParameters.getOverwriteAnnotation();

VariantsMongoReader variantsMongoReader = new VariantsMongoReader(
mongoOperations,
mongoTemplate,
databaseParameters.getCollectionVariantsName(),
annotationParameters.getVepVersion(),
annotationParameters.getVepCacheVersion(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/
package uk.ac.ebi.eva.pipeline.io.readers;

import org.bson.Document;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.util.ClassUtils;
import uk.ac.ebi.eva.commons.models.mongo.entity.Annotation;
import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
import uk.ac.ebi.eva.commons.models.mongo.entity.projections.SimplifiedVariant;
import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantAnnotation;
import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo;
import uk.ac.ebi.eva.commons.mongodb.readers.MongoDbCursorItemReader;
import uk.ac.ebi.eva.pipeline.model.EnsemblVariant;

import javax.annotation.PostConstruct;
Expand All @@ -38,6 +40,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.springframework.data.mongodb.core.query.Criteria.where;
import static uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument.ALTERNATE_FIELD;
import static uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument.CHROMOSOME_FIELD;
import static uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument.END_FIELD;
Expand All @@ -59,7 +62,7 @@ public class VariantsMongoReader

private static final String LAST_READ_TIMESTAMP_KEY = "last_read_timestamp";

private MongoDbCursorItemReader delegateReader;
private MongoDbCursorItemReader<VariantDocument> delegateReader;

private MongoConverter converter;

Expand All @@ -83,43 +86,47 @@ public class VariantsMongoReader
* vepVersion and vepCacheVersion parameters)
* @param chunkSize size of the list returned by the "read" method.
*/
public VariantsMongoReader(MongoOperations mongoOperations, String collectionVariantsName, String vepVersion,
public VariantsMongoReader(MongoTemplate mongoTemplate, String collectionVariantsName, String vepVersion,
String vepCacheVersion, String studyId, String fileId, boolean excludeAnnotated,
Integer chunkSize) {
setName(ClassUtils.getShortName(VariantsMongoReader.class));
delegateReader = new MongoDbCursorItemReader();
delegateReader.setTemplate(mongoOperations);
delegateReader = new MongoDbCursorItemReader<>();
delegateReader.setMongoTemplate(mongoTemplate);
delegateReader.setTargetType(VariantDocument.class);
delegateReader.setCollection(collectionVariantsName);

// the query excludes processed variants automatically, so a new query has to start from the beginning
delegateReader.setSaveState(false);

Document query = new Document();
Query query = new Query();
if (studyId != null && !studyId.isEmpty()) {
query.append(STUDY_KEY, studyId);
query.addCriteria(where(STUDY_KEY).is(studyId));

if (fileId != null && !fileId.isEmpty()) {
query.append(FILE_KEY, fileId);
query.addCriteria(where(FILE_KEY).is(fileId));
}
}

if (excludeAnnotated) {
Document exists = new Document("$exists", 1);
Document annotationSubdocument = new Document(VariantAnnotation.SO_ACCESSION_FIELD, exists)
.append(Annotation.VEP_VERSION_FIELD, vepVersion)
.append(Annotation.VEP_CACHE_VERSION_FIELD, vepCacheVersion);
Document noElementMatchesOurVersion =
new Document("$not", new Document("$elemMatch", annotationSubdocument));
query.append(VariantDocument.ANNOTATION_FIELD, noElementMatchesOurVersion);
Criteria annotationCriteria =
where(VariantAnnotation.SO_ACCESSION_FIELD).exists(true)
.and(Annotation.VEP_VERSION_FIELD).is(vepVersion)
.and(Annotation.VEP_CACHE_VERSION_FIELD).is(vepCacheVersion);
query.addCriteria(where(VariantDocument.ANNOTATION_FIELD).not().elemMatch(annotationCriteria));
}
delegateReader.setQuery(query);

String[] fields = {CHROMOSOME_FIELD, START_FIELD, END_FIELD, REFERENCE_FIELD, ALTERNATE_FIELD};
delegateReader.setFields(fields);
for (String field: fields) {
query.fields().include(field);
}

Meta meta = new Meta();
meta.addFlag(Meta.CursorOption.NO_TIMEOUT);
// Make batch size at least 2, as batch size of 1 is analogous to using limit
delegateReader.setBatchSize(Math.max(chunkSize, 2));
meta.setCursorBatchSize(Math.max(chunkSize, 2));
query.setMeta(meta);
delegateReader.setQuery(query);

this.converter = mongoOperations.getConverter();
this.converter = mongoTemplate.getConverter();
this.chunkSize = chunkSize;
this.lastRead = ZonedDateTime.now();
}
Expand Down Expand Up @@ -147,9 +154,8 @@ public List<EnsemblVariant> read() throws Exception, UnexpectedInputException, P

private List<EnsemblVariant> readBatch(Integer chunkSize) throws Exception {
List<EnsemblVariant> variants = new ArrayList<>();
Document document;
while ((document = delegateDoRead()) != null) {
SimplifiedVariant variant = converter.read(SimplifiedVariant.class, document);
VariantDocument variant;
while ((variant = delegateDoRead()) != null) {
variants.add(buildVariantWrapper(variant));
if (variants.size() == chunkSize) {
break;
Expand All @@ -158,12 +164,12 @@ private List<EnsemblVariant> readBatch(Integer chunkSize) throws Exception {
return variants;
}

private Document delegateDoRead() throws Exception {
private VariantDocument delegateDoRead() throws Exception {
lastRead = ZonedDateTime.now();
return delegateReader.doRead();
return delegateReader.read();
}

private EnsemblVariant buildVariantWrapper(SimplifiedVariant variant) {
private EnsemblVariant buildVariantWrapper(VariantDocument variant) {
return new EnsemblVariant(variant.getChromosome(),
variant.getStart(),
variant.getEnd(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ private void logBatch(List<EnsemblVariant> ensemblVariants) {
private String getVariantInVepInputFormat(EnsemblVariant ensemblVariant) {
return String.join("\t",
ensemblVariant.getChr(),
Integer.toString(ensemblVariant.getStart()),
Integer.toString(ensemblVariant.getEnd()),
Long.toString(ensemblVariant.getStart()),
Long.toString(ensemblVariant.getEnd()),
ensemblVariant.getRefAlt(),
ensemblVariant.getStrand());
}
Expand Down
Loading

0 comments on commit fc82354

Please sign in to comment.