Skip to content

Commit

Permalink
Spark: Correct partition transform functions to match Spec (apache#8192)
Browse files Browse the repository at this point in the history
  • Loading branch information
clettieri authored Sep 25, 2023
1 parent 34f8c35 commit c0bed74
Show file tree
Hide file tree
Showing 15 changed files with 525 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -179,6 +180,82 @@ public void testAddHoursPartition() {
Assert.assertEquals("Should have new spec field", expected, table.spec());
}

@Test
public void testAddYearPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD year(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).year("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddMonthPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD month(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).month("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddDayPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD day(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).day("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddHourPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD hour(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).hour("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddNamedPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,18 @@ public static Term toIcebergTerm(Transform transform) {
return org.apache.iceberg.expressions.Expressions.ref(colName);
case "bucket":
return org.apache.iceberg.expressions.Expressions.bucket(colName, findWidth(transform));
case "year":
case "years":
return org.apache.iceberg.expressions.Expressions.year(colName);
case "month":
case "months":
return org.apache.iceberg.expressions.Expressions.month(colName);
case "date":
case "day":
case "days":
return org.apache.iceberg.expressions.Expressions.day(colName);
case "date_hour":
case "hour":
case "hours":
return org.apache.iceberg.expressions.Expressions.hour(colName);
case "truncate":
Expand Down Expand Up @@ -417,17 +421,21 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
case "bucket":
builder.bucket(colName, findWidth(transform));
break;
case "year":
case "years":
builder.year(colName);
break;
case "month":
case "months":
builder.month(colName);
break;
case "date":
case "day":
case "days":
builder.day(colName);
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(colName);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public void testTransformIgnoreCase() {
Assert.assertTrue("Table should already exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformSingularForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hour(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformPluralForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hours(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testCreateTable() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -179,6 +180,82 @@ public void testAddHoursPartition() {
Assert.assertEquals("Should have new spec field", expected, table.spec());
}

@Test
public void testAddYearPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD year(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).year("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddMonthPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD month(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).month("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddDayPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD day(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).day("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddHourPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD hour(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).hour("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddNamedPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,18 @@ public static Term toIcebergTerm(Expression expr) {
return org.apache.iceberg.expressions.Expressions.ref(colName);
case "bucket":
return org.apache.iceberg.expressions.Expressions.bucket(colName, findWidth(transform));
case "year":
case "years":
return org.apache.iceberg.expressions.Expressions.year(colName);
case "month":
case "months":
return org.apache.iceberg.expressions.Expressions.month(colName);
case "date":
case "day":
case "days":
return org.apache.iceberg.expressions.Expressions.day(colName);
case "date_hour":
case "hour":
case "hours":
return org.apache.iceberg.expressions.Expressions.hour(colName);
case "truncate":
Expand Down Expand Up @@ -399,17 +403,21 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
case "bucket":
builder.bucket(colName, findWidth(transform));
break;
case "year":
case "years":
builder.year(colName);
break;
case "month":
case "months":
builder.month(colName);
break;
case "date":
case "day":
case "days":
builder.day(colName);
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(colName);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public void testTransformIgnoreCase() {
Assert.assertTrue("Table should already exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformSingularForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hour(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformPluralForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hours(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testCreateTable() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
Expand Down
Loading

0 comments on commit c0bed74

Please sign in to comment.