diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index 6256f9624f650..a0448697dd1ed 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -19,7 +19,6 @@ package org.apache.flink.api.dag; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; @@ -603,20 +602,6 @@ public String toString() { + '}'; } - @VisibleForTesting - public String toStringWithoutId() { - return getClass().getSimpleName() - + "{" - + "name='" - + name - + '\'' - + ", outputType=" - + outputType - + ", parallelism=" - + parallelism - + '}'; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index cea10f7bb8124..bb4c1b75a288e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -84,7 +84,7 @@ class BatchPlanner( processors } - override def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { + override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { beforeTranslation() val planner = createDummyPlanner() diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index b36edaa21d7a2..45788e6278ea8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -367,8 +367,7 @@ abstract class PlannerBase( * @return * The [[Transformation]] DAG that corresponds to the node DAG. */ - @VisibleForTesting - def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] + protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] def addExtraTransformation(transformation: Transformation[_]): Unit = { if (!extraTransformations.contains(transformation)) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 894a37c8cf904..fb32326f11787 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -78,7 +78,7 @@ class StreamPlanner( override protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq() - override def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { + override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { beforeTranslation() val planner = createDummyPlanner() val transformations = execGraph.getRootNodes.map { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index 8fe6835213c0f..465c4b0c67ce5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -729,20 +729,25 @@ Calc(select=[ts, a, b], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) - - 1]]> - - - + ($2, 1)]) +- LogicalJoin(condition=[=($0, $3)], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, src]]) +- LogicalTableScan(table=[[default_catalog, default_database, changelog_src]]) -]]> - - - (c, 1)]) +: +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[id, b, c]) ++- Exchange(distribution=[hash[id]]) + +- ChangelogNormalize(key=[id]) + +- Exchange(distribution=[hash[id]]) + +- TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a]) + +== Optimized Execution Plan == Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, b, c], where=[(c > 1)]) @@ -751,22 +756,60 @@ Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], le +- ChangelogNormalize(key=[id]) +- Exchange(distribution=[hash[id]]) +- TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a]) -]]> - - - (org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- PartitionTransformation{name='Exchange(distribution=[hash[id]])', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- OneInputTransformation{name='Calc(select=[id, b, c], where=[(c > 1)])', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[id, b, c])', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=3} - +- PartitionTransformation{name='Exchange(distribution=[hash[id]])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- OneInputTransformation{name='ChangelogNormalize(key=[id])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- PartitionTransformation{name='Exchange(distribution=[hash[id]])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- PartitionTransformation{name='Partitioner', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1} - +- LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=5} -]]> - + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: src[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[id, b, c])", + "parallelism" : 3 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[id, b, c], where=[(c > 1)])", + "parallelism" : 10, + "predecessors" : [ { + "id" : , + "ship_strategy" : "REBALANCE", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Source: changelog_src[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a])", + "parallelism" : 5 + }, { + "id" : , + "type" : "ChangelogNormalize[]", + "pact" : "Operator", + "contents" : "[]:ChangelogNormalize(key=[id])", + "parallelism" : 10, + "predecessors" : [ { + "id" : , + "ship_strategy" : "HASH", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Join[]", + "pact" : "Operator", + "contents" : "[]:Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])", + "parallelism" : 10, + "predecessors" : [ { + "id" : , + "ship_strategy" : "HASH", + "side" : "second" + }, { + "id" : , + "ship_strategy" : "HASH", + "side" : "second" + } ] + } ] +}]]> + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index be1ae70d3fa78..e55cf64199115 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -779,7 +779,7 @@ class TableScanTest extends TableTestBase { @Test def testSetParallelismForSource(): Unit = { val config = TableConfig.getDefault - config.set(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED, Boolean.box(false)) + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(10)) val util = streamTestUtil(config) util.addTable(""" @@ -809,8 +809,12 @@ class TableScanTest extends TableTestBase { | 'enable-projection-push-down' = 'false' |) """.stripMargin) - util.verifyTransformation( - "SELECT * FROM src LEFT JOIN changelog_src " + - "on src.id = changelog_src.id WHERE src.c > 1") + val query = + """ + |SELECT * + |FROM src LEFT JOIN changelog_src + |ON src.id = changelog_src.id WHERE src.c > 1 + |""".stripMargin + util.verifyExplain(query, ExplainDetail.JSON_EXECUTION_PLAN) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index e5b418365be68..1e006f3d94bd6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.utils import org.apache.flink.FlinkVersion import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.dag.Transformation import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.BatchExecutionOptions @@ -88,7 +87,7 @@ import org.junit.jupiter.api.extension.{BeforeEachCallback, ExtendWith, Extensio import org.junit.jupiter.api.io.TempDir import org.junit.platform.commons.support.AnnotationSupport -import java.io.{File, IOException, PrintWriter, StringWriter} +import java.io.{File, IOException} import java.net.URL import java.nio.file.{Files, Path, Paths} import java.time.Duration @@ -703,20 +702,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withQueryBlockAlias = false) } - /** - * Verify the AST (abstract syntax tree), the optimized exec plan and tranformation for the given - * SELECT query. Note: An exception will be thrown if the given sql can't be translated to exec - * plan and transformation result is wrong. - */ - def verifyTransformation(query: String): Unit = { - doVerifyPlan( - query, - Array.empty[ExplainDetail], - withRowType = false, - Array(PlanKind.AST, PlanKind.OPT_EXEC, PlanKind.TRANSFORM), - withQueryBlockAlias = false) - } - /** Verify the explain result for the given SELECT query. See more about [[Table#explain()]]. */ def verifyExplain(query: String): Unit = verifyExplain(getTableEnv.sqlQuery(query)) @@ -1055,14 +1040,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) "" } - // build transformation graph if `expectedPlans` contains TRANSFORM - val transformation = if (expectedPlans.contains(PlanKind.TRANSFORM)) { - val optimizedNodes = getPlanner.translateToExecNodeGraph(optimizedRels, true) - System.lineSeparator + getTransformations(getPlanner.translateToPlan(optimizedNodes)) - } else { - "" - } - // check whether the sql equals to the expected if the `relNodes` are translated from sql assertSqlEqualsOrExpandFunc() // check ast plan @@ -1081,10 +1058,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) if (expectedPlans.contains(PlanKind.OPT_EXEC)) { assertEqualsOrExpand("optimized exec plan", optimizedExecPlan, expand = false) } - // check transformation graph - if (expectedPlans.contains(PlanKind.TRANSFORM)) { - assertEqualsOrExpand("transformation", transformation, expand = false) - } } private def doVerifyExplain(explainResult: String, extraDetails: ExplainDetail*): Unit = { @@ -1144,25 +1117,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) replaceEstimatedCost(optimizedPlan) } - private def getTransformations(transformations: java.util.List[Transformation[_]]): String = { - val stringWriter = new StringWriter() - val printWriter = new PrintWriter(stringWriter) - transformations.foreach(transformation => getTransformation(printWriter, transformation, 0)) - stringWriter.toString - } - - private def getTransformation( - printWriter: PrintWriter, - transformation: Transformation[_], - level: Int): Unit = { - if (level == 0) { - printWriter.println(transformation.toStringWithoutId) - } else { - printWriter.println(("\t" * level) + "+- " + transformation.toStringWithoutId) - } - transformation.getInputs.foreach(child => getTransformation(printWriter, child, level + 1)) - } - /** Replace the estimated costs for the given plan, because it may be unstable. */ protected def replaceEstimatedCost(s: String): String = { var str = s.replaceAll("\\r\\n", "\n") @@ -1670,9 +1624,6 @@ object PlanKind extends Enumeration { /** Optimized Execution Plan */ val OPT_EXEC: Value = Value("OPT_EXEC") - - /** Transformation */ - val TRANSFORM: Value = Value("TRANSFORM") } object TableTestUtil {