Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 26, 2024
1 parent 71a00a4 commit f348f8b
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ object CheckViews extends (LogicalPlan => Unit) {
override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
queryColumnNames, _, _, _, _, _) =>
_, _, _, _, _, _) =>
verifyColumnCount(ident, columnAliases, query)
SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver)
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver)

case _ => // OK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,28 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _)
case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _)
if query.resolved && !c.rewritten =>
val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier)
val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments)
c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true)
val aliased = aliasColumns(query, columnAliases, columnComments)
c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true)
}

private def aliasPlan(
analyzedPlan: LogicalPlan,
private def aliasColumns(
plan: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]]): LogicalPlan = {
if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) {
analyzedPlan
if (columnAliases.isEmpty || columnAliases.length != plan.output.length) {
plan
} else {
val projectList = analyzedPlan.output.zipWithIndex.map { case (_, pos) =>
val column = GetColumnByOrdinal(pos, analyzedPlan.schema.fields.apply(pos).dataType)

val projectList = plan.output.zipWithIndex.map { case (attr, pos) =>
if (columnComments.apply(pos).isDefined) {
val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build()
Alias(column, columnAliases.apply(pos))(explicitMetadata = Some(meta))
Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
} else {
Alias(column, columnAliases.apply(pos))()
Alias(attr, columnAliases.apply(pos))()
}
}
Project(projectList, analyzedPlan)
Project(projectList, plan)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi

case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
Some(queryText), query, allowExisting, replace) =>
verifyTemporaryObjectsDontExist(resolved.identifier, query)
val q = CTESubstitution.apply(query)
verifyTemporaryObjectsDontExist(resolved.identifier, q)
CreateIcebergView(child = resolved,
queryText = queryText,
query = query,
query = q,
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.view.View;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -989,8 +990,17 @@ public void createViewWithColumnAliases() throws NoSuchTableException {
"CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3",
viewName, tableName);

assertThat(viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).properties())
.containsEntry("queryColumnNames", "id, data");
View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
assertThat(view.properties()).containsEntry("queryColumnNames", "id,data");

assertThat(view.schema().columns()).hasSize(2);
Types.NestedField first = view.schema().columns().get(0);
assertThat(first.name()).isEqualTo("new_id");
assertThat(first.doc()).isEqualTo("ID");

Types.NestedField second = view.schema().columns().get(1);
assertThat(second.name()).isEqualTo("new_data");
assertThat(second.doc()).isEqualTo("DATA");

assertThat(sql("SELECT new_id FROM %s", viewName))
.hasSize(3)
Expand All @@ -1008,14 +1018,32 @@ public void createViewWithColumnAliases() throws NoSuchTableException {
}

@Test
public void createViewWithDuplicateQueryColumnNames() {
public void createViewWithDuplicateColumnNames() {
assertThatThrownBy(
() ->
sql(
"CREATE VIEW viewWithDuplicateQueryColumnNames (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3",
"CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id) AS SELECT id, id FROM %s WHERE id <= 3",
tableName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The column `new_id` already exists");
}

@Test
public void createViewWithDuplicateQueryColumnNames() throws NoSuchTableException {
insertRows(3);
String viewName = "viewWithDuplicateQueryColumnNames";
String sql = String.format("SELECT id, id FROM %s WHERE id <= 3", tableName);

// not specifying column aliases in the view should fail
assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The column `id` already exists");

sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql);

assertThat(sql("SELECT * FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3));
}

@Test
Expand All @@ -1033,6 +1061,45 @@ public void createViewWithCTE() throws NoSuchTableException {
assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L));
}

@Test
public void createViewWithConflictingNamesForCTEAndTempView() throws NoSuchTableException {
insertRows(10);
String viewName = "viewWithConflictingNamesForCTEAndTempView";
String cteName = "cteName";
String sql =
String.format(
"WITH %s AS (SELECT max(id) as max FROM %s) "
+ "(SELECT max, count(1) AS count FROM %s GROUP BY max)",
cteName, tableName, cteName);

// create a CTE and a TEMP VIEW with the same name
sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName);
sql("CREATE VIEW %s AS %s", viewName, sql);

// CTE should take precedence over the TEMP VIEW when data is read
assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L));
}

@Test
public void createViewWithCTEReferencingTempView() {
String viewName = "viewWithCTEReferencingTempView";
String tempViewInCTE = "tempViewInCTE";
String sql =
String.format(
"WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+ "SELECT max, count(1) AS count FROM max_by_data GROUP BY max",
tempViewInCTE);

sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName);

assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot create the persistent object")
.hasMessageContaining(viewName)
.hasMessageContaining("of the type VIEW because it references to the temporary object")
.hasMessageContaining(tempViewInCTE);
}

@Test
public void createViewWithNonExistingQueryColumn() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public View createView(
if (null != asViewCatalog) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);

StringJoiner joiner = new StringJoiner(", ");
StringJoiner joiner = new StringJoiner(",");
Arrays.stream(queryColumnNames).forEach(joiner::add);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public StructType schema() {
@Override
public String[] queryColumnNames() {
return icebergView.properties().containsKey(QUERY_COLUMN_NAMES)
? icebergView.properties().get(QUERY_COLUMN_NAMES).split(", ")
? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",")
: new String[0];
}

Expand Down

0 comments on commit f348f8b

Please sign in to comment.