Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 19, 2024
1 parent 2e5667b commit 2639053
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
import org.apache.spark.sql.catalyst.analysis.ViewCheck
import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
Expand All @@ -38,7 +37,6 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectCheckRule(_ => ViewCheck)

// optimizer extensions
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Alias
Expand All @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
Expand All @@ -38,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.ViewHelper

case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

Expand All @@ -59,6 +62,10 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) =>
ViewHelper.verifyTemporaryObjectsNotExists(false, Spark3Util.toV1TableIdentifier(ident), query, Seq.empty)
c
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -151,7 +158,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
}


implicit class ViewHelper(plugin: CatalogPlugin) {
implicit class IcebergViewHelper(plugin: CatalogPlugin) {
def asViewCatalog: ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)

case CreateView(r@ResolvedView(_), userSpecifiedColumns, comment, properties,
case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
originalText, query, allowExisting, replace) =>
CreateIcebergView(r, userSpecifiedColumns, comment, properties, originalText,
CreateIcebergView(resolved, userSpecifiedColumns, comment, properties, originalText,
query, allowExisting, replace)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ case class CreateV2ViewExec(
override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed
val identifier = Spark3Util.toV1TableIdentifier(ident)

if (userSpecifiedColumns.nonEmpty) {
Expand All @@ -71,17 +68,17 @@ case class CreateV2ViewExec(

val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
val columnAliases = userSpecifiedColumns.map(_._1).toArray
val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray
val columnComments = userSpecifiedColumns.map(_._2.orNull).toArray

val currentCatalog = session.sessionState.catalogManager.currentCatalog.name
val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val createEngineVersion = Some(engineVersion)
val newProperties = properties ++
comment.map(ViewCatalog.PROP_COMMENT -> _) ++
createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) +
(ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)
comment.map(ViewCatalog.PROP_COMMENT -> _) +
(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)

if (replace) {
// CREATE OR REPLACE VIEW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ public void createViewReferencingTempView() throws NoSuchTableException {
insertRows(10);
String tempView = "tempViewBeingReferencedInAnotherView";
String viewReferencingTempView = "viewReferencingTempView";
String sql = String.format("SELECT id FROM %s", tempView);

sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName);

Expand Down Expand Up @@ -893,6 +892,9 @@ public void createViewWithColumnAliases() throws NoSuchTableException {
"CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3",
viewName, tableName);

assertThat(viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).properties())
.containsEntry("queryColumnNames", "id, data");

assertThat(sql("SELECT new_id FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(1), row(2), row(3));
Expand All @@ -908,6 +910,19 @@ public void createViewWithColumnAliases() throws NoSuchTableException {
.containsExactlyInAnyOrder(row(1), row(2), row(3));
}

@Test
public void createViewWithDuplicateQueryColumnNames() {
String viewName = "viewWithDuplicateQueryColumnNames";

assertThatThrownBy(
() ->
sql(
"CREATE VIEW %s (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3",
viewName, tableName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The column `id` already exists");
}

private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -571,11 +572,14 @@ public View createView(
if (null != asViewCatalog) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);

StringJoiner joiner = new StringJoiner(", ");
Arrays.stream(queryColumnNames).forEach(joiner::add);

try {
Map<String, String> props =
ImmutableMap.<String, String>builder()
.putAll(Spark3Util.rebuildCreateProperties(properties))
.put("queryColumnNames", Arrays.toString(queryColumnNames))
.put("queryColumnNames", joiner.toString())
.build();
org.apache.iceberg.view.View view =
asViewCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public StructType schema() {

@Override
public String[] queryColumnNames() {
return properties().containsKey(QUERY_COLUMN_NAMES)
? properties().get(QUERY_COLUMN_NAMES).split(", ")
return icebergView.properties().containsKey(QUERY_COLUMN_NAMES)
? icebergView.properties().get(QUERY_COLUMN_NAMES).split(", ")
: new String[0];
}

Expand Down

0 comments on commit 2639053

Please sign in to comment.