Skip to content

Commit

Permalink
code reivew
Browse files Browse the repository at this point in the history
  • Loading branch information
southeo committed Oct 28, 2024
1 parent 525d8cf commit 6e9f517
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -16,4 +18,10 @@ public ObjectMapper objectMapper() {
mapper.setSerializationInclusion(Include.NON_NULL);
return mapper;
}

@Bean
public DateTimeFormatter formatter() {
return DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package eu.dissco.exportjob.repository;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.CountRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -22,36 +22,34 @@
@Repository
@RequiredArgsConstructor
public class ElasticSearchRepository {

private final ElasticsearchClient client;
private final ElasticSearchProperties properties;

public Long getTotalHits(List<SearchParam> searchParams, TargetType targetType)
throws IOException {
var query = generateQuery(searchParams);
var index = getIndex(targetType);
var countRequest = new CountRequest.Builder()
.index(index)
.query(
q -> q.bool(b -> b.must(query)))
.build();
return client.count(countRequest).count();
}
private static final String SORT_BY = "dcterms:identifier.keyword";

public List<JsonNode> getTargetObjects(List<SearchParam> searchParams, TargetType targetType,
int pageNumber)
String lastId, List<String> targetFields)
throws IOException {
var query = generateQuery(searchParams);
var index = getIndex(targetType);

var searchRequest = new SearchRequest.Builder()
var searchRequestBuilder = new SearchRequest.Builder()
.index(index)
.query(
q -> q.bool(b -> b.must(query)))
.trackTotalHits(t -> t.enabled(Boolean.TRUE))
.from(getOffset(pageNumber, properties.getPageSize()))
.size(properties.getPageSize())
.build();
var searchResult = client.search(searchRequest, ObjectNode.class);
.sort(s -> s.field(f -> f.field(SORT_BY).order(SortOrder.Desc)));
if (lastId != null) {
searchRequestBuilder
.searchAfter(sa -> sa.stringValue(lastId));
}
if (targetFields != null) {
searchRequestBuilder
.source(sourceConfig -> sourceConfig
.filter(filter -> filter.includes(targetFields)));
}
var searchResult = client.search(searchRequestBuilder.build(), ObjectNode.class);

return searchResult.hits().hits().stream()
.map(Hit::source)
Expand All @@ -65,7 +63,6 @@ private String getIndex(TargetType targetType) {
: properties.getDigitalMediaObjectIndex();
}


private static List<Query> generateQuery(List<SearchParam> searchParams) {
var qList = new ArrayList<Query>();
for (var searchParam : searchParams) {
Expand All @@ -88,12 +85,4 @@ private static List<Query> generateQuery(List<SearchParam> searchParams) {
return qList;
}

private static int getOffset(int pageNumber, int pageSize) {
int offset = 0;
if (pageNumber > 1) {
offset = offset + (pageSize * (pageNumber - 1));
}
return offset;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eu.dissco.exportjob.repository;

import java.io.File;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
Expand All @@ -12,6 +14,7 @@
public class S3Repository {

private final S3AsyncClient s3Client;
private final DateTimeFormatter formatter;
private static final String BUCKET_NAME = "dissco-data-export";

public String uploadResults(File file, UUID jobId) {
Expand All @@ -20,7 +23,7 @@ public String uploadResults(File file, UUID jobId) {
.uploadFile(uploadFileRequest -> uploadFileRequest
.putObjectRequest(putObjectRequest -> putObjectRequest
.bucket(BUCKET_NAME)
.key(jobId.toString()))
.key(getDate() + "/" + jobId.toString()))
.source(file));
upload.completionFuture().join();
return s3Client.utilities().getUrl(
Expand All @@ -31,5 +34,8 @@ public String uploadResults(File file, UUID jobId) {
}
}

private String getDate() {
return formatter.format(Instant.now());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package eu.dissco.exportjob.service;

import static eu.dissco.exportjob.service.DoiListService.ID_FIELD;

import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat;
import com.fasterxml.jackson.databind.JsonNode;
import eu.dissco.exportjob.domain.JobRequest;
import eu.dissco.exportjob.domain.JobStateEndpoint;
Expand All @@ -23,13 +26,15 @@ public abstract class AbstractExportJobService {
private final ExporterBackendClient exporterBackendClient;
private final S3Repository s3Repository;
protected static final String TEMP_FILE_NAME = "src/main/resources/tmp.csv.gz";
protected static final String ID_FIELD = "dcterms:identifier";
protected static final String PHYSICAL_ID_FIELD = "ods:physicalSpecimenID";


public void handleMessage(JobRequest jobRequest) throws FailedProcessingException {
exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.RUNNING);
try {
var uploadData = processSearchResults(jobRequest);
if (uploadData){
if (uploadData) {
var url = s3Repository.uploadResults(new File(TEMP_FILE_NAME), jobRequest.jobId());
exporterBackendClient.markJobAsComplete(jobRequest.jobId(), url);
} else {
Expand All @@ -42,26 +47,29 @@ public void handleMessage(JobRequest jobRequest) throws FailedProcessingExceptio
}

private boolean processSearchResults(JobRequest jobRequest) throws IOException {
var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(),
jobRequest.targetType());
if (totalHits > 0){
int pageNum = 1;
var hitsProcessed = 0;
writeHeaderToFile();
while (hitsProcessed < totalHits) {
var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(),
jobRequest.targetType(), pageNum);
String lastId = null;
writeHeaderToFile();
boolean keepSearching = true;
boolean resultsProcessed = false;
var targetFields = targetFields();
while (keepSearching) {
var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(),
jobRequest.targetType(), lastId, targetFields);
if (searchResult.isEmpty()){
keepSearching = false;
} else {
writeResultsToFile(searchResult);
hitsProcessed = hitsProcessed + searchResult.size();
pageNum = pageNum + 1;
lastId = searchResult.getLast().get(ID_FIELD).asText();
resultsProcessed = true;
}
return true;
}
return false;
return resultsProcessed;
}

protected abstract void writeHeaderToFile() throws IOException;

protected abstract void writeResultsToFile(List<JsonNode> searchResult) throws IOException;

protected abstract List<String> targetFields();

}
12 changes: 8 additions & 4 deletions src/main/java/eu/dissco/exportjob/service/DoiListService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
@Profile(Profiles.DOI_LIST)
public class DoiListService extends AbstractExportJobService {

private static final String ODS_ID = "ods:ID";
private static final String PHYSICAL_ID = "ods:physicalSpecimenID";
private static final byte[] HEADER = (ODS_ID + "," + PHYSICAL_ID).getBytes(
private static final byte[] HEADER = (ID_FIELD + "," + PHYSICAL_ID_FIELD).getBytes(
StandardCharsets.UTF_8);

public DoiListService(
Expand All @@ -41,10 +39,16 @@ protected void writeResultsToFile(List<JsonNode> searchResults) throws IOExcepti
var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME, true);
var gzip = new GZIPOutputStream(byteOutputStream)) {
for (var result : searchResults) {
var col = ("\n" + result.get(ODS_ID).asText() + "," + result.get(PHYSICAL_ID).asText())
var col = ("\n" + result.get(ID_FIELD).asText() + "," + result.get(PHYSICAL_ID_FIELD).asText())
.getBytes(StandardCharsets.UTF_8);
gzip.write(col, 0, col.length);
}
}
}

protected List<String> targetFields(){
return List.of(ID_FIELD, PHYSICAL_ID_FIELD);
}


}
Binary file modified src/main/resources/tmp.csv.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import static eu.dissco.exportjob.utils.TestUtils.DOI_2;
import static eu.dissco.exportjob.utils.TestUtils.MAPPER;
import static eu.dissco.exportjob.utils.TestUtils.ORG_1;
import static eu.dissco.exportjob.utils.TestUtils.ORG_2;
import static eu.dissco.exportjob.utils.TestUtils.PHYS_ID_2;
import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen;
import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimenReducedDoiList;
import static eu.dissco.exportjob.utils.TestUtils.givenSearchParams;
import static eu.dissco.exportjob.utils.TestUtils.givenTargetFields;
import static org.assertj.core.api.Assertions.assertThat;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
Expand Down Expand Up @@ -99,30 +102,31 @@ void clearIndex() throws IOException {
}

@Test
void testGetTotalHits() throws IOException {
void testGetTargetObject() throws IOException {
// Given
postDigitalSpecimens(
List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2)));

// When
var result = elasticRepository.getTotalHits(givenSearchParams(), TargetType.DIGITAL_SPECIMEN);
var result = elasticRepository.getTargetObjects(givenSearchParams(),
TargetType.DIGITAL_SPECIMEN, null, givenTargetFields());

// Then
assertThat(result).isEqualTo(1L);
assertThat(result).isEqualTo(List.of(givenDigitalSpecimenReducedDoiList()));
}

@Test
void testGetTargetObject() throws IOException {
void testGetTargetObjectSecondPage() throws IOException {
// Given
postDigitalSpecimens(
List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2)));
List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_1, PHYS_ID_2)));

// When
var result = elasticRepository.getTargetObjects(givenSearchParams(),
TargetType.DIGITAL_SPECIMEN, 1);
TargetType.DIGITAL_SPECIMEN, DOI_2, givenTargetFields());

// Then
assertThat(result).isEqualTo(List.of(givenDigitalSpecimen()));
assertThat(result).isEqualTo(List.of(givenDigitalSpecimenReducedDoiList()));
}

@Test
Expand All @@ -135,7 +139,7 @@ void testGetTargetObjectEmptyOrg() throws IOException {
var searchParam = List.of(new SearchParam("$['ods:organisationID']", null));

// When
var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, 1);
var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, null, null);

// Then
assertThat(result).isEqualTo(List.of(expected));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen;
import static eu.dissco.exportjob.utils.TestUtils.givenJobRequest;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
Expand Down Expand Up @@ -43,8 +42,8 @@ void init() {
@Test
void testHandleMessageNoResultsFound() throws Exception {
// Given
given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(0L);

given(elasticSearchRepository.getTargetObjects(any(), any(), eq(null), any())).willReturn(
List.of());
// When
service.handleMessage(givenJobRequest());

Expand All @@ -57,8 +56,7 @@ void testHandleMessageNoResultsFound() throws Exception {
@Test
void testHandleMessage() throws Exception {
// GIven
given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(1L);
given(elasticSearchRepository.getTargetObjects(any(), any(), anyInt())).willReturn(
given(elasticSearchRepository.getTargetObjects(any(), any(), eq(null), any())).willReturn(
List.of(givenDigitalSpecimen()));
given(s3Repository.uploadResults(any(), eq(JOB_ID))).willReturn(DOWNLOAD_LINK);

Expand Down
20 changes: 18 additions & 2 deletions src/test/java/eu/dissco/exportjob/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ private TestUtils() {
.setSerializationInclusion(Include.NON_NULL);
public static final String DOWNLOAD_LINK = "https://aws.download/s3";
public static final String ORG_FIELD_NAME = "$['ods:organisationID']";
public static final String ID_FIELD = "dcterms:identifier";
public static final String PHYS_ID_FIELD = "ods:physicalSpecimenID";

public static JobRequest givenJobRequest() {
return new JobRequest(
Expand All @@ -45,9 +47,23 @@ public static JsonNode givenDigitalSpecimen(){

public static JsonNode givenDigitalSpecimen(String doi, String org, String physId){
return MAPPER.createObjectNode()
.put("ods:ID", doi)
.put(ID_FIELD, doi)
.put("@id", doi)
.put("ods:organisationID", org)
.put("ods:physicalSpecimenID", physId);
.put(PHYS_ID_FIELD, physId);
}

public static JsonNode givenDigitalSpecimenReducedDoiList(){
return givenDigitalSpecimenReducedDoiList(DOI_1, PHYS_ID_1);
}

public static JsonNode givenDigitalSpecimenReducedDoiList(String doi, String physId){
return MAPPER.createObjectNode()
.put(ID_FIELD, doi)
.put(PHYS_ID_FIELD, physId);
}

public static List<String> givenTargetFields(){
return List.of(ID_FIELD, PHYS_ID_FIELD);
}
}

0 comments on commit 6e9f517

Please sign in to comment.