Skip to content

Commit

Permalink
[AMORO-2272][Spark]:Fix table name mismatch in spark `show create tab…
Browse files Browse the repository at this point in the history
…le` (apache#2321)

* fix table name miss match in spark show create table

* Show Create Table assume spark version great then 3.1

* fix params order.

(cherry picked from commit 5f756e0)
Signed-off-by: zhoujinsong <[email protected]>
  • Loading branch information
baiyangtx authored and zhoujinsong committed Dec 21, 2023
1 parent 184782c commit bb4130d
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public org.apache.spark.sql.connector.catalog.Identifier toSparkIdentifier() {
return org.apache.spark.sql.connector.catalog.Identifier.of(new String[] {database}, table);
}

/** @return spark identifier as string. */
@Override
public String toString() {
if (SOURCE_TYPE_VIEW.equalsIgnoreCase(sourceType)) {
Expand Down Expand Up @@ -282,4 +283,21 @@ public void assertTableDesc(List<Row> rows, List<String> primaryKeys, List<Strin
primaryKeys.stream().sorted().distinct().toArray(),
descPrimaryKeys.stream().sorted().distinct().toArray());
}

public void assertShowCreateTable(List<Row> rows, Identifier id, ArcticTable table) {
StringBuilder showCreateSqlBuilder = new StringBuilder();
for (Row r : rows) {
showCreateSqlBuilder.append(r.getString(0));
}
String showCreateSql = showCreateSqlBuilder.toString();
String expectCreateHeader = "create table " + id.catalog + "." + id + " (";
String ignoreCaseShowCreate = showCreateSql.replace("CREATE", "create");
ignoreCaseShowCreate = ignoreCaseShowCreate.replace("TABLE", "table");
Assertions.assertTrue(
ignoreCaseShowCreate.startsWith(expectCreateHeader),
"expect ["
+ expectCreateHeader
+ "] in ShowCreateTable Result, but not found in :"
+ showCreateSql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
}

private Table loadInnerTable(ArcticTable table, ArcticTableStoreType type) {
Expand Down Expand Up @@ -177,7 +177,7 @@ && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), ident)) {
builder.withPartitionSpec(spec).withProperties(properties);
}
ArcticTable table = builder.create();
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException("Table " + ident + " already exists", Option.apply(e));
}
Expand Down Expand Up @@ -261,10 +261,10 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
}
if (table.isUnkeyedTable()) {
alterUnKeyedTable(table.asUnkeyedTable(), changes);
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
} else if (table.isKeyedTable()) {
alterKeyedTable(table.asKeyedTable(), changes);
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
}
throw new UnsupportedOperationException("Unsupported alter table");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,22 @@

public class ArcticIcebergSparkTable extends SparkTable {
private final UnkeyedTable unkeyedTable;
private final String sparkCatalogName;

public ArcticIcebergSparkTable(UnkeyedTable unkeyedTable, boolean refreshEagerly) {
public ArcticIcebergSparkTable(
UnkeyedTable unkeyedTable, boolean refreshEagerly, String sparkCatalogName) {
super(unkeyedTable, refreshEagerly);
this.unkeyedTable = unkeyedTable;
this.sparkCatalogName = sparkCatalogName;
}

@Override
public String name() {
return sparkCatalogName
+ "."
+ unkeyedTable.id().getDatabase()
+ "."
+ unkeyedTable.id().getTableName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,24 @@ public class ArcticSparkTable
TableCapability.OVERWRITE_DYNAMIC);

private final ArcticTable arcticTable;
private final String sparkCatalogName;
private SparkSession lazySpark = null;
private final ArcticCatalog catalog;

public static Table ofArcticTable(ArcticTable table, ArcticCatalog catalog) {
public static Table ofArcticTable(
ArcticTable table, ArcticCatalog catalog, String sparkCatalogName) {
if (table.isUnkeyedTable()) {
if (!(table instanceof SupportHive)) {
return new ArcticIcebergSparkTable(table.asUnkeyedTable(), false);
return new ArcticIcebergSparkTable(table.asUnkeyedTable(), false, sparkCatalogName);
}
}
return new ArcticSparkTable(table, catalog);
return new ArcticSparkTable(table, catalog, sparkCatalogName);
}

public ArcticSparkTable(ArcticTable arcticTable, ArcticCatalog catalog) {
public ArcticSparkTable(ArcticTable arcticTable, ArcticCatalog catalog, String sparkCatalogName) {
this.arcticTable = arcticTable;
this.catalog = catalog;
this.sparkCatalogName = sparkCatalogName;
}

private SparkSession sparkSession() {
Expand All @@ -89,7 +92,11 @@ public ArcticTable table() {

@Override
public String name() {
return arcticTable.id().toString();
return sparkCatalogName
+ "."
+ arcticTable.id().getDatabase()
+ "."
+ arcticTable.id().getTableName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netease.arctic.spark.test.SparkTableTestBase;
import com.netease.arctic.spark.test.extensions.EnableCatalogSelect;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -75,5 +76,9 @@ public void testDescTable(TableFormat format, String primaryKeyDDL, String parti
List<Row> rows2 =
sql("desc extended " + target().database + "." + target().table).collectAsList();
assertTableDesc(rows2, primaryKeys, partitions);

Assumptions.assumeFalse(spark().version().startsWith("3.1"));
rows = sql("show create table " + target()).collectAsList();
assertShowCreateTable(rows, target(), loadTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
}

private Table loadInnerTable(ArcticTable table, ArcticTableStoreType type) {
Expand Down Expand Up @@ -191,7 +191,7 @@ && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), ident)) {
builder.withPartitionSpec(spec).withProperties(properties);
}
ArcticTable table = builder.create();
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException("Table " + ident + " already exists", Option.apply(e));
}
Expand Down Expand Up @@ -275,10 +275,10 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
}
if (table.isUnkeyedTable()) {
alterUnKeyedTable(table.asUnkeyedTable(), changes);
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
} else if (table.isKeyedTable()) {
alterKeyedTable(table.asKeyedTable(), changes);
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
}
throw new UnsupportedOperationException("Unsupported alter table");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,29 @@

public class ArcticIcebergSparkTable extends SparkTable implements SupportsPartitionManagement {
private final UnkeyedTable unkeyedTable;
private final String sparkCatalogName;

public ArcticIcebergSparkTable(UnkeyedTable unkeyedTable, boolean refreshEagerly) {
public ArcticIcebergSparkTable(
UnkeyedTable unkeyedTable, boolean refreshEagerly, String sparkCatalogName) {
super(unkeyedTable, refreshEagerly);
this.unkeyedTable = unkeyedTable;
this.sparkCatalogName = sparkCatalogName;
}

@Override
public UnkeyedTable table() {
return unkeyedTable;
}

@Override
public String name() {
return sparkCatalogName
+ "."
+ unkeyedTable.id().getDatabase()
+ "."
+ unkeyedTable.id().getTableName();
}

@Override
public Map<String, String> properties() {
Map<String, String> properties = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,39 +66,25 @@ public class ArcticSparkTable
TableCapability.OVERWRITE_DYNAMIC);

private final ArcticTable arcticTable;
private final StructType requestedSchema;
private final boolean refreshEagerly;
private final String sparkCatalogName;
private StructType lazyTableSchema = null;
private SparkSession lazySpark = null;
private final ArcticCatalog catalog;

public static Table ofArcticTable(ArcticTable table, ArcticCatalog catalog) {
public static Table ofArcticTable(
ArcticTable table, ArcticCatalog catalog, String sparkCatalogName) {
if (table.isUnkeyedTable()) {
if (!(table instanceof SupportHive)) {
return new ArcticIcebergSparkTable(table.asUnkeyedTable(), false);
return new ArcticIcebergSparkTable(table.asUnkeyedTable(), false, sparkCatalogName);
}
}
return new ArcticSparkTable(table, false, catalog);
return new ArcticSparkTable(table, catalog, sparkCatalogName);
}

public ArcticSparkTable(ArcticTable arcticTable, boolean refreshEagerly, ArcticCatalog catalog) {
this(arcticTable, null, refreshEagerly, catalog);
}

public ArcticSparkTable(
ArcticTable arcticTable,
StructType requestedSchema,
boolean refreshEagerly,
ArcticCatalog catalog) {
public ArcticSparkTable(ArcticTable arcticTable, ArcticCatalog catalog, String sparkCatalogName) {
this.arcticTable = arcticTable;
this.requestedSchema = requestedSchema;
this.refreshEagerly = refreshEagerly;
this.sparkCatalogName = sparkCatalogName;
this.catalog = catalog;

if (requestedSchema != null) {
// convert the requested schema to throw an exception if any requested fields are unknown
SparkSchemaUtil.convert(arcticTable.schema(), requestedSchema);
}
}

private SparkSession sparkSession() {
Expand All @@ -115,19 +101,18 @@ public ArcticTable table() {

@Override
public String name() {
return arcticTable.id().toString();
return sparkCatalogName
+ "."
+ arcticTable.id().getDatabase()
+ "."
+ arcticTable.id().getTableName();
}

@Override
public StructType schema() {
if (lazyTableSchema == null) {
Schema tableSchema = arcticTable.schema();
if (requestedSchema != null) {
Schema prunedSchema = SparkSchemaUtil.prune(tableSchema, requestedSchema);
this.lazyTableSchema = SparkSchemaUtil.convert(prunedSchema);
} else {
this.lazyTableSchema = SparkSchemaUtil.convert(tableSchema);
}
this.lazyTableSchema = SparkSchemaUtil.convert(tableSchema);
}

return lazyTableSchema;
Expand Down Expand Up @@ -195,13 +180,7 @@ public int hashCode() {

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), arcticTable, options);

if (requestedSchema != null) {
scanBuilder.pruneColumns(requestedSchema);
}

return scanBuilder;
return new SparkScanBuilder(sparkSession(), arcticTable, options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netease.arctic.spark.test.SparkTableTestBase;
import com.netease.arctic.spark.test.extensions.EnableCatalogSelect;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -75,5 +76,9 @@ public void testDescTable(TableFormat format, String primaryKeyDDL, String parti
List<Row> rows2 =
sql("desc extended " + target().database + "." + target().table).collectAsList();
assertTableDesc(rows2, primaryKeys, partitions);

Assumptions.assumeFalse(spark().version().startsWith("3.1"));
rows = sql("show create table " + target()).collectAsList();
assertShowCreateTable(rows, target(), loadTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
}

private Table loadInnerTable(ArcticTable table, ArcticTableStoreType type) {
Expand Down Expand Up @@ -191,7 +191,7 @@ && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), ident)) {
builder.withPartitionSpec(spec).withProperties(properties);
}
ArcticTable table = builder.create();
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException("Table " + ident + " already exists", Option.apply(e));
}
Expand Down Expand Up @@ -275,10 +275,10 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
}
if (table.isUnkeyedTable()) {
alterUnKeyedTable(table.asUnkeyedTable(), changes);
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
} else if (table.isKeyedTable()) {
alterKeyedTable(table.asKeyedTable(), changes);
return ArcticSparkTable.ofArcticTable(table, catalog);
return ArcticSparkTable.ofArcticTable(table, catalog, catalogName);
}
throw new UnsupportedOperationException("Unsupported alter table");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@

public class ArcticIcebergSparkTable extends SparkTable implements SupportsPartitionManagement {
private final UnkeyedTable unkeyedTable;
private final String sparkCatalogName;

public ArcticIcebergSparkTable(UnkeyedTable unkeyedTable, boolean refreshEagerly) {
public ArcticIcebergSparkTable(
UnkeyedTable unkeyedTable, boolean refreshEagerly, String sparkCatalogName) {
super(unkeyedTable, refreshEagerly);
this.unkeyedTable = unkeyedTable;
this.sparkCatalogName = sparkCatalogName;
}

@Override
Expand All @@ -52,6 +55,15 @@ public Map<String, String> properties() {
return properties;
}

@Override
public String name() {
return sparkCatalogName
+ "."
+ unkeyedTable.id().getDatabase()
+ "."
+ unkeyedTable.id().getTableName();
}

@Override
public StructType partitionSchema() {
return SparkSchemaUtil.convert(new Schema(table().spec().partitionType().fields()));
Expand Down
Loading

0 comments on commit bb4130d

Please sign in to comment.