From 50dfe369cd89bf7f99b5237623f3325003a467be Mon Sep 17 00:00:00 2001 From: Piotr Szul Date: Fri, 20 Dec 2024 06:43:39 +1000 Subject: [PATCH] Fixing join resolution for muliple resolves (to one) to the same resource. --- .../execution/MultiFhirPathEvaluator.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/fhirpath/src/main/java/au/csiro/pathling/fhirpath/execution/MultiFhirPathEvaluator.java b/fhirpath/src/main/java/au/csiro/pathling/fhirpath/execution/MultiFhirPathEvaluator.java index f5ab5faaeb..9c4cb7af77 100644 --- a/fhirpath/src/main/java/au/csiro/pathling/fhirpath/execution/MultiFhirPathEvaluator.java +++ b/fhirpath/src/main/java/au/csiro/pathling/fhirpath/execution/MultiFhirPathEvaluator.java @@ -40,8 +40,8 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Collections; +import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Value; import org.apache.spark.sql.Column; @@ -170,6 +170,7 @@ private Dataset computeResolveJoin(@Nonnull final Dataset parentDatase // TODO: replace with the map_combine function .map(functions::col); + final String uniqueValueTag = typedRoot.getValueTag() + "_unique"; // create one key-value pair map a the value final Stream keyValuesColumns = Stream.of( childDataset.col("key").alias(typedRoot.getChildKeyTag()), @@ -177,7 +178,7 @@ private Dataset computeResolveJoin(@Nonnull final Dataset parentDatase functions.array(childDataset.col("key")), // maybe need to be wrapped in another array functions.array(childResource.getColumnValue()) - ).alias(typedRoot.getValueTag()) + ).alias(uniqueValueTag) ); final Dataset childResult = childDataset.select( @@ -189,10 +190,19 @@ private Dataset computeResolveJoin(@Nonnull final Dataset parentDatase final Collection parentRegKey = parentExecutor.evaluate(new Traversal("reference"), referenceCollection); + final boolean needsMerging = List.of(parentDataset.columns()) + .contains(typedRoot.getValueTag()); final Dataset joinedDataset = parentDataset.join(childResult, parentRegKey.getColumnValue().equalTo(functions.col(typedRoot.getChildKeyTag())), "left_outer") - .drop(typedRoot.getChildKeyTag()); + .withColumn(typedRoot.getValueTag(), + needsMerging + ? functions.map_concat( + functions.col(typedRoot.getValueTag()), + functions.col(uniqueValueTag)) + : functions.col(uniqueValueTag) + ) + .drop(typedRoot.getChildKeyTag(), uniqueValueTag); System.out.println("Joined dataset:"); joinedDataset.show(); @@ -346,12 +356,13 @@ private Dataset computeReverseJoin(@Nonnull final Dataset parentDatase @Nonnull private Dataset resolveJoinsEx(@Nonnull final JoinSet joinSet, @Nonnull final Dataset parentDataset) { - + // now just reduce current children return joinSet.getChildren().stream() .reduce(parentDataset, (dataset, subset) -> // the parent dataset for subjoin should be different - computeJoin(dataset, resolveJoinsEx(subset, resourceDataset(subset.getMaster().getResourceType())), + computeJoin(dataset, + resolveJoinsEx(subset, resourceDataset(subset.getMaster().getResourceType())), (JoinRoot) subset.getMaster()), (dataset1, dataset2) -> dataset1); }