Skip to content

Commit

Permalink
Spark 3.5: Remove legacy configs for timestamps without zone (#8654)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Sep 26, 2023
1 parent a582968 commit ad6d21a
Show file tree
Hide file tree
Showing 8 changed files with 0 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,6 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
}
}

SparkUtil.validateTimestampWithoutTimezoneConfig(sparkSession.conf());

EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "spark");
EnvironmentContext.put(
EnvironmentContext.ENGINE_VERSION, sparkSession.sparkContext().version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public SparkReadConf(
this.branch = branch;
this.readOptions = readOptions;
this.confParser = new SparkConfParser(spark, table, readOptions);

SparkUtil.validateTimestampWithoutTimezoneConfig(spark.conf(), readOptions);
}

public boolean caseSensitive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ private SparkReadOptions() {}
"streaming-skip-overwrite-snapshots";
public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false;

// Controls whether to allow reading timestamps without zone info
@Deprecated
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
"handle-timestamp-without-timezone";

// Controls whether to report locality information to Spark while allocating input partitions
public static final String LOCALITY = "locality";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@ private SparkSQLProperties() {}
// Controls whether vectorized reads are enabled
public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled";

// Controls whether reading/writing timestamps without timezones is allowed
@Deprecated
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
"spark.sql.iceberg.handle-timestamp-without-timezone";

// Controls whether timestamp types for new tables should be stored with timezone info
@Deprecated
public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES =
"spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables";

// Controls whether to perform the nullability check during writes
public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability";
public static final boolean CHECK_NULLABILITY_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.BoundReference;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
Expand Down Expand Up @@ -157,40 +155,6 @@ private static String hadoopConfPrefixForCatalog(String catalogName) {
return String.format(SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR, catalogName);
}

public static void validateTimestampWithoutTimezoneConfig(RuntimeConfig conf) {
validateTimestampWithoutTimezoneConfig(conf, ImmutableMap.of());
}

/**
* Checks for properties both supplied by Spark's RuntimeConfig and the read or write options
*
* @param conf The RuntimeConfig of the active Spark session
* @param options The read or write options supplied when reading/writing a table
*/
public static void validateTimestampWithoutTimezoneConfig(
RuntimeConfig conf, Map<String, String> options) {
if (conf.contains(SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE)) {
throw new UnsupportedOperationException(
"Spark configuration "
+ SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE
+ " is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
}

if (options.containsKey(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE)) {
throw new UnsupportedOperationException(
"Option "
+ SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE
+ " is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
}

if (conf.contains(SparkSQLProperties.USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES)) {
throw new UnsupportedOperationException(
"Spark configuration "
+ SparkSQLProperties.USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES
+ " is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
}
}

/**
* Get a List of Spark filter Expression.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ public SparkWriteConf(
this.sessionConf = spark.conf();
this.writeOptions = writeOptions;
this.confParser = new SparkConfParser(spark, table, writeOptions);

SparkUtil.validateTimestampWithoutTimezoneConfig(spark.conf(), writeOptions);
}

public boolean checkNullability() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ private SparkWriteOptions() {}
// File scan task set ID that indicates which files must be replaced
public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = "rewritten-file-scan-task-set-id";

// Controls whether to allow writing timestamps without zone info
@Deprecated
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
"handle-timestamp-without-timezone";

public static final String OUTPUT_SPEC_ID = "output-spec-id";

public static final String OVERWRITE_MODE = "overwrite-mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,10 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -95,97 +91,6 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS %s", newTableName);
}

@Test
public void testDeprecatedTimezoneProperty() {
withSQLConf(
ImmutableMap.of(SparkSQLProperties.USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, "true"),
() -> {
Assertions.assertThatThrownBy(
() -> {
spark
.sessionState()
.catalogManager()
.currentCatalog()
.initialize(catalog.name(), new CaseInsensitiveStringMap(config));
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Spark configuration spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
});
}

@Test
public void testReadWithDeprecatedTimezoneProperty() {
withSQLConf(
ImmutableMap.of(SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"),
() -> {
Assertions.assertThatThrownBy(
() -> {
sql("SELECT count(*) FROM %s", tableName);
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
});
}

@Test
public void testReadWithDeprecatedTimezonePropertyReadOption() {
Assertions.assertThatThrownBy(
() -> {
spark
.read()
.option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true")
.table(tableName)
.count();
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Option handle-timestamp-without-timezone is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
}

@Test
public void testWriteWithDeprecatedTimezoneProperty() {
withSQLConf(
ImmutableMap.of(
SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE,
"true",
"spark.sql.legacy.createHiveTableByDefault",
"false"),
() -> {
Assertions.assertThatThrownBy(
() -> {
sql(
"CREATE OR REPLACE TABLE %s USING ICEBERG AS SELECT * FROM %s",
newTableName, tableName);
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
});
}

@Test
public void testWriteWithDeprecatedTimezonePropertyReadOption() {
Assertions.assertThatThrownBy(
() -> {
withSQLConf(
ImmutableMap.of("spark.sql.legacy.createHiveTableByDefault", "false"),
() -> {
spark
.read()
.table(tableName)
.writeTo(newTableName)
.option(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true")
.using("iceberg")
.createOrReplace();
});
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Option handle-timestamp-without-timezone is not supported in Spark 3.4 due to the introduction of native support for timestamp without timezone.");
}

/*
Spark does not really care about the timezone, it will just convert it
Expand Down

0 comments on commit ad6d21a

Please sign in to comment.