Skip to content

Commit

Permalink
Avoid exponential planning time for LocalExchanges
Browse files Browse the repository at this point in the history
Adds a special case path through StreamPropertyDerivations that avoids
calling back into PropertyDerivations to get otherActualProperties. This
special case is specifically for PropertyDerivations to check whether a
local exchange's only input source is single stream distributed and
avoids what would otherwise be a mutually recursive exponential time
traversal.
  • Loading branch information
pettyjamesm committed Nov 8, 2023
1 parent df3a531 commit b8e3bac
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import static io.trino.sql.planner.optimizations.ActualProperties.Global.coordinatorSinglePartition;
import static io.trino.sql.planner.optimizations.ActualProperties.Global.partitionedOn;
import static io.trino.sql.planner.optimizations.ActualProperties.Global.singlePartition;
import static io.trino.sql.planner.optimizations.StreamPropertyDerivations.isLocalExchangesSourceSingleStreamDistributed;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static io.trino.sql.tree.PatternRecognitionRelation.RowsPerMatch.ONE;
Expand Down Expand Up @@ -696,18 +697,19 @@ public ActualProperties visitExchange(ExchangeNode node, List<ActualProperties>
if (node.getScope() == LOCAL) {
if (inputProperties.size() == 1) {
ActualProperties inputProperty = inputProperties.get(0);
if (inputProperty.isEffectivelySinglePartition() && node.getOrderingScheme().isEmpty()) {
if (inputProperty.isEffectivelySinglePartition() && node.getOrderingScheme().isEmpty() && !inputProperty.getLocalProperties().isEmpty()) {
verify(node.getInputs().size() == 1);
verify(node.getSources().size() == 1);
PlanNode source = node.getSources().get(0);
StreamPropertyDerivations.StreamProperties streamProperties = StreamPropertyDerivations.derivePropertiesRecursively(source, plannerContext, session, types, typeAnalyzer);
if (streamProperties.isSingleStream()) {
Map<Symbol, Symbol> inputToOutput = exchangeInputToOutput(node, 0);
Map<Symbol, Symbol> inputToOutput = exchangeInputToOutput(node, 0);
List<LocalProperty<Symbol>> inputLocalProperties = LocalProperties.translate(inputProperty.getLocalProperties(), symbol -> Optional.ofNullable(inputToOutput.get(symbol)));
// If no local properties are present to propagate, then we can skip recursive stream properties derivation
// which traverses all child plan nodes again and is therefore expensive to check
@SuppressWarnings("deprecation")
boolean propagateLocalProperties = !inputLocalProperties.isEmpty() && isLocalExchangesSourceSingleStreamDistributed(node, plannerContext.getMetadata(), session);
if (propagateLocalProperties) {
// Single stream input's local sorting and grouping properties are preserved
// In case of merging exchange, it's orderingScheme takes precedence
localProperties.addAll(LocalProperties.translate(
inputProperty.getLocalProperties(),
symbol -> Optional.ofNullable(inputToOutput.get(symbol))));
localProperties.addAll(inputLocalProperties);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import static io.trino.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.FIXED;
import static io.trino.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.MULTIPLE;
import static io.trino.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.SINGLE;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static io.trino.sql.tree.SkipTo.Position.PAST_LAST;
import static java.lang.String.format;
Expand Down Expand Up @@ -161,12 +162,9 @@ public static StreamProperties deriveProperties(
types,
typeAnalyzer);

StreamProperties result = node.accept(new Visitor(plannerContext.getMetadata(), session), inputProperties)
StreamProperties result = deriveStreamPropertiesWithoutActualProperties(node, inputProperties, plannerContext.getMetadata(), session)
.withOtherActualProperties(otherProperties);

result.getPartitioningColumns().ifPresent(columns ->
verify(node.getOutputSymbols().containsAll(columns), "Stream-level partitioning properties contain columns not present in node's output"));

Set<Symbol> localPropertyColumns = result.getLocalProperties().stream()
.flatMap(property -> property.getColumns().stream())
.collect(Collectors.toSet());
Expand All @@ -176,6 +174,50 @@ public static StreamProperties deriveProperties(
return result;
}

/**
* Determines whether a local exchange is single-stream distributed at its only input source. This method will is expensive
* since it requires traversing the entire sub-plan at each local exchange node, so calling this method should be avoided
* whenever possible and new usages should not be added.
*
* @param exchangeNode a local exchange with a single input source to check for single stream input distribution
* @throws IllegalArgumentException if the exchange is not a local exchange or does not have only a single input source
* @deprecated Only for use by {@link PropertyDerivations}
*/
@Deprecated
static boolean isLocalExchangesSourceSingleStreamDistributed(ExchangeNode exchangeNode, Metadata metadata, Session session)
{
checkArgument(exchangeNode.getScope() == LOCAL, "exchangeNode must be a local exchange");
checkArgument(exchangeNode.getSources().size() == 1, "exchangeNode must have a single source");

return deriveStreamPropertiesWithoutActualPropertiesRecursively(exchangeNode.getSources().get(0), metadata, session).isSingleStream();
}

/**
* Derives {@link StreamProperties} without populating {@link StreamProperties#otherActualProperties}. This is necessary to avoid exponential-time,
* mutually recursive sub-plan traversals when {@link PropertyDerivations} attempts to check a local exchange's input source for single stream distribution.
*
* @deprecated For internal use only by {@link StreamPropertyDerivations#isLocalExchangesSourceSingleStreamDistributed(ExchangeNode, Metadata, Session)}
*/
@Deprecated
private static StreamProperties deriveStreamPropertiesWithoutActualPropertiesRecursively(PlanNode node, Metadata metadata, Session session)
{
List<StreamProperties> inputProperties = node.getSources().stream()
.map(source -> deriveStreamPropertiesWithoutActualPropertiesRecursively(source, metadata, session))
.collect(toImmutableList());

return deriveStreamPropertiesWithoutActualProperties(node, inputProperties, metadata, session);
}

private static StreamProperties deriveStreamPropertiesWithoutActualProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session)
{
StreamProperties result = node.accept(new Visitor(metadata, session), inputProperties);

result.getPartitioningColumns().ifPresent(columns ->
verify(node.getOutputSymbols().containsAll(columns), "Stream-level partitioning properties contain columns not present in node's output"));

return result;
}

private static class Visitor
extends PlanVisitor<StreamProperties, List<StreamProperties>>
{
Expand Down Expand Up @@ -833,7 +875,8 @@ public StreamProperties translate(Function<Symbol, Optional<Symbol>> translator)
}
return Optional.of(newPartitioningColumns.build());
}),
ordered, otherActualProperties.translate(translator));
ordered,
otherActualProperties == null ? null : otherActualProperties.translate(translator));
}

public Optional<List<Symbol>> getPartitioningColumns()
Expand Down

0 comments on commit b8e3bac

Please sign in to comment.