Skip to content

Commit

Permalink
fixed batching
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas0T committed Sep 11, 2024
1 parent 8bb439c commit 3c96416
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 27 deletions.
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ services:
TORCH_FLARE_URL: http://torch-flare:8080
TORCH_RESULTS_DIR: /app/output
TORCH_RESULTS_PERSISTENCE: PT12H30M5S
LOG_LEVEL: info
LOG_LEVEL: debug
NGINX_SERVERNAME: localhost
NGINX_FILELOCATION: http://localhost:80
TORCH_BATCHSIZE: 100
volumes:
- "torch-data-store:/app/output" # Shared volume with torch-nginx
- ./structureDefinitions:/app/StructureDefinitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public Flux<Resource> getResources(String resourceType,String parameters) {
.bodyValue(parameters)
.header("Content-Type", "application/x-www-form-urlencoded")
.retrieve()
.bodyToFlux(String.class)
//.doOnNext(response -> logger.debug("Response: {}", response))
.map(response -> parser.parseResource(Bundle.class, response))
.bodyToMono(String.class)
.doOnNext(response -> logger.debug("getResources Response: {}", response))
.flatMap(response -> Mono.just(parser.parseResource(Bundle.class, response)))
.expand(bundle -> Optional.ofNullable(bundle.getLink("next"))
.map(link -> fetchPage(client, link.getUrl()))
.orElse(Mono.empty()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public ResourceTransformer(DataStore dataStore, CdsStructureDefinitionHandler cd
public Flux<Resource> transformResources(String parameters, AttributeGroup group) {
String resourceType = group.getResourceType();
Flux<Resource> resources = dataStore.getResources(resourceType, parameters);

return resources.map(resource -> {
try {

return transform((DomainResource) resource, group);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException |
InstantiationException e) {
Expand All @@ -79,8 +81,13 @@ public Flux<Resource> transformResources(String parameters, AttributeGroup group
public Resource transform(DomainResource resourcesrc, AttributeGroup group) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, MustHaveViolatedException {
Class<? extends DomainResource> resourceClass = resourcesrc.getClass().asSubclass(DomainResource.class);
DomainResource tgt = resourceClass.getDeclaredConstructor().newInstance();
for (Attribute attribute : group.getAttributes()) {
copier.copy(resourcesrc, tgt, attribute);

try {
logger.debug("Handling resource {}",ResourceUtils.getPatientId(resourcesrc));
for (Attribute attribute : group.getAttributes()) {

copier.copy(resourcesrc, tgt, attribute);

}
//TODO define technically required in all Ressources
copier.copy(resourcesrc, tgt, new Attribute("meta.profile", true));
Expand All @@ -93,6 +100,9 @@ public Resource transform(DomainResource resourcesrc, AttributeGroup group) thro
copier.copy(resourcesrc, tgt, new Attribute("subject.reference", true));
}
redaction.redact(tgt, "", 1);
} catch (PatientIdNotFoundException e) {
throw new RuntimeException(e);
}
return tgt;
}

Expand Down Expand Up @@ -131,9 +141,9 @@ public Mono<Map<String, Collection<Resource>>> collectResourcesByPatientReferenc
}
})
.doOnNext(map -> {
logger.debug("Collected resources for group {}", group.getGroupReference());
logger.debug("Collected resources for group {} {}", group.getGroupReference(),map.size());
safeSet.retainAll(safeGroup); // Retain only the patients that are present in both sets
logger.debug("SafeGroup after diff with SafeSet: {}", safeGroup);
logger.debug("SafeGroup after diff with SafeSet: {} {}", safeSet.size(), safeSet);
});
})
.collect(Collectors.toList());
Expand All @@ -156,7 +166,7 @@ public Mono<Map<String, Collection<Resource>>> collectResourcesByPatientReferenc
})
.block(); // Blocking call within a Callable to get the final result
})
.doOnSuccess(result -> logger.debug("Successfully collected resources"))
.doOnSuccess(result -> logger.debug("Successfully collected resources {}",result.entrySet()))
.doOnError(error -> logger.error("Error collecting resources: {}", error.getMessage()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ private Mono<Void> processCrtdl(Crtdl crtdl, String jobId) {
logger.debug("Map {}", resourceMap.keySet());
Map<String, Bundle> bundles = bundleCreator.createBundles(resourceMap);
logger.debug("Bundles Size {}", bundles.size());

UUID uuid = UUID.randomUUID();
return Flux.fromIterable(bundles.values())
.flatMap(bundle -> {
UUID uuid = UUID.randomUUID();

// Save each serialized bundle (as an individual line in NDJSON) to the file system
return resultFileManager.saveBundleToNDJSON(jobId, uuid.toString(), bundle)
.doOnSuccess(unused -> {
logger.debug("Bundle appended: {}", batch.hashCode());
logger.debug("Bundle appended: {}", parser.setPrettyPrint(true).encodeResourceToString(bundle));
});
})
.then(); // Ensure the Mono completes after all bundles are appended
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ca.uhn.fhir.util.TerserUtilHelper;
import de.medizininformatikinitiative.torch.CdsStructureDefinitionHandler;
import de.medizininformatikinitiative.torch.exceptions.MustHaveViolatedException;
import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException;
import de.medizininformatikinitiative.torch.model.Attribute;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.r4.model.*;
Expand Down Expand Up @@ -50,7 +51,8 @@ public ElementCopier(CdsStructureDefinitionHandler handler) {
* @param attribute Attribute to copy containing ElementID and if it is a mandatory element.
* @throws MustHaveViolatedException if mandatory element is missing
*/
public void copy(DomainResource src, DomainResource tgt, Attribute attribute) throws MustHaveViolatedException {
public void copy(DomainResource src, DomainResource tgt, Attribute attribute) throws MustHaveViolatedException, PatientIdNotFoundException {
String id = ResourceUtils.getPatientId(src);
List<CanonicalType> profileurl = src.getMeta().getProfile();
StructureDefinition structureDefinition = handler.getDefinition(profileurl);
logger.debug("Empty Structuredefinition? {} {}", structureDefinition.isEmpty(),profileurl.getFirst().getValue());
Expand All @@ -63,18 +65,18 @@ public void copy(DomainResource src, DomainResource tgt, Attribute attribute) th
ElementDefinition elementDefinition =snapshot.getElementById(attribute.getAttributeRef());

TerserUtilHelper helper = TerserUtilHelper.newHelper(ctx, tgt);
logger.debug("TGT set {}", tgt.getClass());
logger.debug("Attribute FHIR PATH {}", attribute.getAttributeRef());
logger.debug("{} TGT set {}",id, tgt.getClass());
logger.debug("{} Attribute FHIR PATH {}", id, attribute.getAttributeRef());


try {
//logger.debug("Attribute Path {}", attribute.getAttributeRef());
logger.debug("Attribute Path {}", attribute.getAttributeRef());

String fhirPath = pathBuilder.handleSlicingForFhirPath(attribute.getAttributeRef(), factory, snapshot);
logger.debug("FHIR PATH {}", fhirPath);

List<Base> elements = ctx.newFhirPath().evaluate(src, fhirPath, Base.class);
//logger.debug("Elements received {}", fhirPath);
logger.debug("Elements received {}", fhirPath);
if (elements.isEmpty()) {
if (attribute.isMustHave()) {
throw new MustHaveViolatedException("Attribute " + attribute.getAttributeRef() + " must have a value");
Expand All @@ -85,21 +87,21 @@ public void copy(DomainResource src, DomainResource tgt, Attribute attribute) th
if (elements.size() == 1) {

if (terserFHIRPATH.endsWith("[x]")) {
//logger.debug("Tersertobehandled {}",terserFHIRPATH);
logger.debug("Tersertobehandled {}",terserFHIRPATH);
String type = capitalizeFirstLetter(elements.getFirst().fhirType());
terserFHIRPATH = terserFHIRPATH.replace("[x]", type);
}
//logger.debug("Setting {} {}",terserFHIRPATH,elements.getFirst().fhirType());
logger.debug("Setting {} {}",terserFHIRPATH,elements.getFirst().fhirType());
try {
TerserUtil.setFieldByFhirPath(ctx.newTerser(), terserFHIRPATH, tgt, elements.getFirst());
}catch(Exception e){
if(elementDefinition.hasType()) {
elementDefinition.getType().getFirst().getWorkingCode();
//TODO
//logger.debug("Element not recognized {} {}",terserFHIRPATH,elementDefinition.getType().getFirst().getWorkingCode());
logger.debug("Element not recognized {} {}",terserFHIRPATH,elementDefinition.getType().getFirst().getWorkingCode());
try {
Base casted = factory.stringtoPrimitive(elements.getFirst().toString(),elementDefinition.getType().getFirst().getWorkingCode());
//logger.debug("Casted {}",casted.fhirType());
logger.debug("Casted {}",casted.fhirType());
TerserUtil.setFieldByFhirPath(ctx.newTerser(), terserFHIRPATH, tgt, casted);
}catch (Exception casterException){
logger.warn("Element not recognized and cast unsupported currently {} {} ",terserFHIRPATH,elementDefinition.getType().getFirst().getWorkingCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testFlare() throws IOException {
// Count the number of patient IDs
int patientCount = patientIds.size();
logger.info(String.valueOf(patientIds));
Assertions.assertEquals(3, patientCount);
Assertions.assertEquals(4, patientCount);
}


Expand Down
Loading

0 comments on commit 3c96416

Please sign in to comment.