Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Correct partition transform functions to match spec.md #8192

Merged
merged 12 commits into from
Sep 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -179,6 +180,82 @@ public void testAddHoursPartition() {
Assert.assertEquals("Should have new spec field", expected, table.spec());
}

@Test
public void testAddYearPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD year(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).year("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddMonthPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD month(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).month("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddDayPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD day(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).day("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddHourPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD hour(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).hour("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddNamedPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,18 @@ public static Term toIcebergTerm(Transform transform) {
return org.apache.iceberg.expressions.Expressions.ref(colName);
case "bucket":
return org.apache.iceberg.expressions.Expressions.bucket(colName, findWidth(transform));
case "year":
case "years":
return org.apache.iceberg.expressions.Expressions.year(colName);
case "month":
case "months":
return org.apache.iceberg.expressions.Expressions.month(colName);
case "date":
case "day":
case "days":
return org.apache.iceberg.expressions.Expressions.day(colName);
case "date_hour":
case "hour":
case "hours":
return org.apache.iceberg.expressions.Expressions.hour(colName);
case "truncate":
Expand Down Expand Up @@ -417,17 +421,21 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
case "bucket":
builder.bucket(colName, findWidth(transform));
break;
case "year":
case "years":
builder.year(colName);
break;
case "month":
case "months":
builder.month(colName);
break;
case "date":
case "day":
case "days":
builder.day(colName);
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(colName);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public void testTransformIgnoreCase() {
Assert.assertTrue("Table should already exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformSingularForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hour(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformPluralForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hours(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testCreateTable() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -179,6 +180,82 @@ public void testAddHoursPartition() {
Assert.assertEquals("Should have new spec field", expected, table.spec());
}

@Test
public void testAddYearPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD year(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).year("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddMonthPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD month(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).month("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddDayPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD day(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).day("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddHourPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Table table = validationCatalog.loadTable(tableIdent);

Assertions.assertThat(table.spec().isUnpartitioned())
.as("Table should start unpartitioned")
.isTrue();

sql("ALTER TABLE %s ADD PARTITION FIELD hour(ts)", tableName);

table.refresh();

PartitionSpec expected =
PartitionSpec.builderFor(table.schema()).withSpecId(1).hour("ts").build();

Assertions.assertThat(table.spec()).as("Should have new spec field").isEqualTo(expected);
}

@Test
public void testAddNamedPartition() {
createTable("id bigint NOT NULL, category string, ts timestamp, data string");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,18 @@ public static Term toIcebergTerm(Expression expr) {
return org.apache.iceberg.expressions.Expressions.ref(colName);
case "bucket":
return org.apache.iceberg.expressions.Expressions.bucket(colName, findWidth(transform));
case "year":
case "years":
return org.apache.iceberg.expressions.Expressions.year(colName);
case "month":
case "months":
return org.apache.iceberg.expressions.Expressions.month(colName);
case "date":
case "day":
case "days":
return org.apache.iceberg.expressions.Expressions.day(colName);
case "date_hour":
case "hour":
case "hours":
return org.apache.iceberg.expressions.Expressions.hour(colName);
case "truncate":
Expand Down Expand Up @@ -399,17 +403,21 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
case "bucket":
builder.bucket(colName, findWidth(transform));
break;
case "year":
case "years":
builder.year(colName);
break;
case "month":
case "months":
builder.month(colName);
break;
case "date":
case "day":
case "days":
builder.day(colName);
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(colName);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public void testTransformIgnoreCase() {
Assert.assertTrue("Table should already exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformSingularForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hour(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testTransformPluralForm() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql(
"CREATE TABLE IF NOT EXISTS %s (id BIGINT NOT NULL, ts timestamp) "
+ "USING iceberg partitioned by (hours(ts))",
tableName);
Assert.assertTrue("Table should exist", validationCatalog.tableExists(tableIdent));
}

@Test
public void testCreateTable() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
Expand Down
Loading