Skip to content

Commit

Permalink
Fixing join resolution for muliple resolves (to one) to the same reso…
Browse files Browse the repository at this point in the history
…urce.
  • Loading branch information
piotrszul committed Dec 19, 2024
1 parent 00a9cc4 commit 50dfe36
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Value;
import org.apache.spark.sql.Column;
Expand Down Expand Up @@ -170,14 +170,15 @@ private Dataset<Row> computeResolveJoin(@Nonnull final Dataset<Row> parentDatase
// TODO: replace with the map_combine function
.map(functions::col);

final String uniqueValueTag = typedRoot.getValueTag() + "_unique";
// create one key-value pair map a the value
final Stream<Column> keyValuesColumns = Stream.of(
childDataset.col("key").alias(typedRoot.getChildKeyTag()),
functions.map_from_arrays(
functions.array(childDataset.col("key")),
// maybe need to be wrapped in another array
functions.array(childResource.getColumnValue())
).alias(typedRoot.getValueTag())
).alias(uniqueValueTag)
);

final Dataset<Row> childResult = childDataset.select(
Expand All @@ -189,10 +190,19 @@ private Dataset<Row> computeResolveJoin(@Nonnull final Dataset<Row> parentDatase
final Collection parentRegKey = parentExecutor.evaluate(new Traversal("reference"),
referenceCollection);

final boolean needsMerging = List.of(parentDataset.columns())
.contains(typedRoot.getValueTag());
final Dataset<Row> joinedDataset = parentDataset.join(childResult,
parentRegKey.getColumnValue().equalTo(functions.col(typedRoot.getChildKeyTag())),
"left_outer")
.drop(typedRoot.getChildKeyTag());
.withColumn(typedRoot.getValueTag(),
needsMerging
? functions.map_concat(
functions.col(typedRoot.getValueTag()),
functions.col(uniqueValueTag))
: functions.col(uniqueValueTag)
)
.drop(typedRoot.getChildKeyTag(), uniqueValueTag);

System.out.println("Joined dataset:");
joinedDataset.show();
Expand Down Expand Up @@ -346,12 +356,13 @@ private Dataset<Row> computeReverseJoin(@Nonnull final Dataset<Row> parentDatase
@Nonnull
private Dataset<Row> resolveJoinsEx(@Nonnull final JoinSet joinSet,
@Nonnull final Dataset<Row> parentDataset) {

// now just reduce current children
return joinSet.getChildren().stream()
.reduce(parentDataset, (dataset, subset) ->
// the parent dataset for subjoin should be different
computeJoin(dataset, resolveJoinsEx(subset, resourceDataset(subset.getMaster().getResourceType())),
computeJoin(dataset,
resolveJoinsEx(subset, resourceDataset(subset.getMaster().getResourceType())),
(JoinRoot) subset.getMaster()), (dataset1, dataset2) -> dataset1);
}

Expand Down

0 comments on commit 50dfe36

Please sign in to comment.