Skip to content

Commit

Permalink
WIP: aliasing
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 24, 2024
1 parent 6559f72 commit ffe9fba
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ import org.apache.spark.sql.catalyst.plans.logical.View
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.SchemaUtils

object CheckViews extends (LogicalPlan => Unit) {

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, _, _, _, _) =>
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
queryColumnNames, _, _, _, _, _) =>
verifyAmountOfColumns(ident, columnAliases, query)
verifyTemporaryObjectsDontExist(ident, query)
SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver)

case _ => // OK
}
Expand Down Expand Up @@ -98,4 +102,4 @@ object CheckViews extends (LogicalPlan => Unit) {

collectTempViews(child)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.MetadataBuilder

case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

Expand All @@ -59,6 +61,32 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _,
rewritten)
if query.resolved && !rewritten =>
val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier)
val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments)
c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true)
}

private def aliasPlan(
analyzedPlan: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]]): LogicalPlan = {
if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zipWithIndex.map { case (attr, pos) =>
if (columnComments.apply(pos).isDefined) {
val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build()
Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
} else {
Alias(attr, columnAliases.apply(pos))()
}
}
Project(projectList, analyzedPlan)
}
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi

case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
Some(queryText), query, allowExisting, replace) =>
val columnAliases = userSpecifiedColumns.map(_._1)
val columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty))
CreateIcebergView(resolved, queryText, query, columnAliases, columnComments,
comment, properties, allowExisting, replace)
CreateIcebergView(child = resolved,
queryText = queryText,
query = query,
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ case class CreateIcebergView(
query: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String] = Seq.empty,
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean) extends BinaryCommand {
replace: Boolean,
rewritten: Boolean = false) extends BinaryCommand {
override def left: LogicalPlan = child

override def right: LogicalPlan = query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,13 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.SchemaUtils
import scala.collection.JavaConverters._


Expand All @@ -44,19 +36,16 @@ case class CreateV2ViewExec(
viewSchema: StructType,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String],
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean) extends LeafV2CommandExec {
replace: Boolean,
query: LogicalPlan) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
// val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed
//
val queryColumnNames = viewSchema.fieldNames
SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver)

val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
val currentNamespace = session.sessionState.catalogManager.currentNamespace
Expand All @@ -79,7 +68,7 @@ case class CreateV2ViewExec(
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
Expand All @@ -92,7 +81,7 @@ case class CreateV2ViewExec(
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
Expand All @@ -107,25 +96,4 @@ case class CreateV2ViewExec(
override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
*/
private def aliasPlan(
analyzedPlan: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = {
if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
val projectedPlan = Project(projectList, analyzedPlan)
session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil

case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query,
columnAliases, columnComments, comment, properties, allowExisting, replace) =>
columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) =>
CreateV2ViewExec(
catalog = viewCatalog,
ident = ident,
queryText = queryText,
columnAliases = columnAliases,
columnComments = columnComments,
queryColumnNames = queryColumnNames,
query = query,
viewSchema = query.schema,
comment = comment,
properties = properties,
Expand Down

0 comments on commit ffe9fba

Please sign in to comment.