Skip to content

Commit

Permalink
Added spark 3.4 to the project
Browse files Browse the repository at this point in the history
  • Loading branch information
alfonsorr committed Apr 17, 2023
1 parent 55f96b9 commit eaa7afb
Show file tree
Hide file tree
Showing 40 changed files with 97 additions and 76 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val spark30Version = "3.0.3"
val spark31Version = "3.1.3"
val spark32Version = "3.2.3"
val spark33Version = "3.3.2"
val spark34Version = "3.4.0"

val versionRegex = """^(.*)\.(.*)\.(.*)$""".r
val versionRegexShort = """^(.*)\.(.*)$""".r
Expand All @@ -24,6 +25,7 @@ val parserSparkVersion: String => String = {
case versionRegexShort("3", "1") => spark31Version
case versionRegexShort("3", "2") => spark32Version
case versionRegexShort("3", "3") => spark33Version
case versionRegexShort("3", "4") => spark34Version
case versionRegex(a, b, c) => s"$a.$b.$c"
}

Expand All @@ -37,7 +39,7 @@ val scalaVersionSelect: String => List[String] = {
case versionRegex("3", "1", _) => List(scala212)
case versionRegex("3", "2", _) => List(scala212, scala213)
case versionRegex("3", "3", _) => List(scala212, scala213)

case versionRegex("3", "4", _) => List(scala212, scala213)
}

val catsVersion: String => String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package doric
package syntax

import doric.types.NumericType

import org.apache.spark.sql.{functions => f}

private[syntax] trait NumericColumns31 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package doric
package syntax

import doric.sem.Location

import org.apache.spark.sql.{functions => f}

private[syntax] trait StringColumns31 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package doric
package syntax

import cats.implicits._
import org.apache.spark.sql.Column
import org.apache.spark.sql.{functions => f}

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.{ShiftLeft, ShiftRight, ShiftRightUnsigned}

private[syntax] trait NumericColumns32 {
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/spark_3.4_mount/scala/doric/syntax/All.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package doric.syntax

private[doric] trait All
extends ArrayColumns
with TypeMatcher
with CommonColumns
with DStructs
with LiteralConversions
with MapColumns
with NumericColumns
with DateColumns
with TimestampColumns
with BooleanColumns
with StringColumns
with ControlStructures
with AggregationColumns
with CNameOps
with BinaryColumns
with Interpolators
with AggregationColumns31
with BooleanColumns31
with NumericColumns31
with NumericColumns32
with StringColumns31
with BinaryColumns32
with ArrayColumns3x
with CommonColumns3x
with MapColumns3x
with StringColumn3x
with AggregationColumns32
13 changes: 11 additions & 2 deletions core/src/test/scala/doric/sem/ChildColumnNotFound.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package doric.sem

import org.apache.spark.sql.SparkSession

object ChildColumnNotFound {
def apply(expectedCol: String, foundCols: List[String]): SparkErrorWrapper = {
def apply(expectedCol: String, foundCols: List[String])(implicit
location: Location,
sparkSession: SparkSession
): SparkErrorWrapper = {
SparkErrorWrapper(
new Throwable(
s"No such struct field $expectedCol in ${foundCols.mkString(", ")}"
if (!sparkSession.version.startsWith("3.4"))
s"No such struct field $expectedCol in ${foundCols.mkString(", ")}"
else
s"[FIELD_NOT_FOUND] No such struct field `$expectedCol` in ${foundCols
.mkString("`", "`, `", "`")}."
)
)
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/doric/sem/ColumnNotFound.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ object ColumnNotFound {

SparkErrorWrapper(
new Throwable(
if (!sparkSession.version.startsWith("3.4"))
s"""Cannot resolve column name "$expectedCol" among (${foundCols
.mkString(", ")})"""
else
s"[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `$expectedCol` cannot be resolved. Did you mean one of the following? [${foundCols.mkString("`", "`, `", "`")}]."
)
)
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/scala/doric/sem/ErrorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ class ErrorsSpec
val err = intercept[DoricMultiError] {
Seq(1, 2, 3).toDF("value").select(colInt("notFound"))
}
val err2 = SparkErrorWrapper(
new Exception("Cannot resolve column name \"notFound\" among (value)")
)
val err2 = ColumnNotFound("notFound", List("value"))

err.errors.head.equals(err2) shouldBe true
}
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/doric/sem/JoinOpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package doric
package sem

import doric.implicitConversions._

import org.apache.spark.sql.types.{LongType, StringType}

class JoinOpsSpec extends DoricTestElements {
Expand Down Expand Up @@ -58,7 +59,7 @@ class JoinOpsSpec extends DoricTestElements {

val badJoinFunction: DoricJoinColumn =
LeftDF.colString(id) ===
RightDF.colString(id + "entifier")
RightDF.colString("identifier")

intercept[DoricMultiError] {
left.join(right, "inner", badJoinFunction)
Expand All @@ -70,7 +71,10 @@ class JoinOpsSpec extends DoricTestElements {
JoinDoricSingleError(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"" + id + "entifier\" among (" + id + ", " + otherColumn + ")"
if (!spark.version.startsWith("3.4"))
"Cannot resolve column name \"identifier\" among (" + id + ", " + otherColumn + ")"
else
"[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `identifier` cannot be resolved. Did you mean one of the following? [`id`, `otherColumn`]."
)
),
isLeft = false
Expand All @@ -94,11 +98,7 @@ class JoinOpsSpec extends DoricTestElements {
isLeft = true
),
JoinDoricSingleError(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"" + id + "entifier\" among (" + id + ", " + otherColumn + ")"
)
),
ColumnNotFound("identifier", List("id", "otherColumn")),
isLeft = false
)
)
Expand Down
9 changes: 7 additions & 2 deletions core/src/test/scala/doric/sem/TransformOpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,15 @@ class TransformOpsSpec
)
}
error.getMessage should startWith(
"Found duplicate column(s) in given column names:"
if (!spark.version.startsWith("3.4"))
"Found duplicate column(s) in given column names:"
else
"[COLUMN_ALREADY_EXISTS] The column `a` already exists. Consider to choose another name or rename the existing column."
)
error.getMessage should include("`a`")
error.getMessage should include("`b`")
if (!spark.version.startsWith("3.4")) {
error.getMessage should include("`b`")
}
}

it("should work with 'withNamedColumns' as with 'namedColumns'") {
Expand Down
41 changes: 9 additions & 32 deletions core/src/test/scala/doric/syntax/ArrayColumnsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package doric
package syntax

import doric.SparkAuxFunctions.createLambda
import doric.sem.{ChildColumnNotFound, ColumnTypeError, DoricMultiError, SparkErrorWrapper}
import doric.sem.{ChildColumnNotFound, ColumnNotFound, ColumnTypeError, DoricMultiError}
import doric.types.SparkType

import org.apache.spark.sql.{Column, Row, functions => f}
import org.apache.spark.sql.catalyst.expressions.ArrayExists
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{Column, Row, functions => f}

class ArrayColumnsSpec extends DoricTestElements {

Expand Down Expand Up @@ -49,11 +50,7 @@ class ArrayColumnsSpec extends DoricTestElements {
.transform(_ => colString("something"))
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("something", StringType, IntegerType)
)

Expand All @@ -69,11 +66,7 @@ class ArrayColumnsSpec extends DoricTestElements {
)
}
errors should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("_1", LongType, IntegerType),
ChildColumnNotFound("_3", List("_1", "_2"))
)
Expand Down Expand Up @@ -176,11 +169,7 @@ class ArrayColumnsSpec extends DoricTestElements {
.transformWithIndex(_ + _ + colInt("something2"))
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("something", IntegerType, StringType)
)
}
Expand Down Expand Up @@ -209,11 +198,7 @@ class ArrayColumnsSpec extends DoricTestElements {
.aggregate(colInt("something2"))(_ + _ + colInt("something"))
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnTypeError("something", IntegerType, StringType)
)
}
Expand Down Expand Up @@ -246,16 +231,8 @@ class ArrayColumnsSpec extends DoricTestElements {
)
)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something2\" among (col, something)"
)
),
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"something3\" among (col, something)"
)
),
ColumnNotFound("something2", List("col", "something")),
ColumnNotFound("something3", List("col", "something")),
ColumnTypeError("something", IntegerType, StringType)
)
}
Expand Down
8 changes: 2 additions & 6 deletions core/src/test/scala/doric/syntax/AsSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package doric
package syntax

import doric.sem.{ColumnTypeError, DoricMultiError, SparkErrorWrapper}
import doric.sem.{ColumnNotFound, ColumnTypeError, DoricMultiError}

import org.apache.spark.sql.functions.{col => sparkCol}
import org.apache.spark.sql.types.{IntegerType, StringType}
Expand All @@ -19,11 +19,7 @@ class AsSpec extends DoricTestElements {
intercept[DoricMultiError] {
df.select(originalColumn)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"""Cannot resolve column name "error" among (int, str)"""
)
)
ColumnNotFound("error", List("int", "str"))
)
}

Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/doric/syntax/StringColumnsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,18 @@ class StringColumnsSpec
df.testColumns2("col1", "")(
(str, pattern) => colString(str).split(pattern.lit),
(str, pattern) => f.split(f.col(str), pattern),
if (!spark.version.startsWith("3.4"))
List(
Array("h", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d", ""),
Array("1", "2", "3", "4", "5", ""),
null
).map(Option(_))
else
List(
Array("h", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"),
Array("1", "2", "3", "4", "5"),
null
).map(Option(_))
)
}
}
Expand Down
15 changes: 4 additions & 11 deletions core/src/test/scala/doric/syntax/TypeMatcherSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package doric
package syntax

import doric.sem.{ColumnMultiTypeError, DoricMultiError, SparkErrorWrapper}
import doric.sem.{ColumnMultiTypeError, ColumnNotFound, DoricMultiError}

import org.apache.spark.sql.types.{IntegerType, StringType}

class TypeMatcherSpec
Expand Down Expand Up @@ -62,11 +63,7 @@ class TypeMatcherSpec
intercept[DoricMultiError] {
df.select(testColumn)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"int2\" among (colArr, int, str)"
)
)
ColumnNotFound("int2", List("colArr", "int", "str"))
)
}

Expand All @@ -81,11 +78,7 @@ class TypeMatcherSpec
intercept[DoricMultiError] {
df.select(testColumn)
} should containAllErrors(
SparkErrorWrapper(
new Exception(
"Cannot resolve column name \"int3\" among (colArr, int, str)"
)
)
ColumnNotFound("int3", List("colArr", "int", "str"))
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package doric
package syntax

import doric.SparkAuxFunctions.createLambda
import org.apache.spark.sql.catalyst.expressions.{ArrayFilter, ArraySort}
import org.scalatest.EitherValues
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.ArraySort

class ArrayColumns3xSpec
extends DoricTestElements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.sql.Date
import java.time.{Instant, LocalDate}
import org.scalatest.EitherValues
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.{DataFrame, functions => f}

class DateColumns3xSpec
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package doric
package syntax

import Equalities._
import scala.reflect.ClassTag

import doric.Equalities._
import doric.types.NumericType
import doric.types.SparkType.Primitive
import org.apache.spark.sql.{DataFrame, SparkSession, functions => f}
import java.sql.Timestamp
import org.scalatest.funspec.AnyFunSpecLike

import java.sql.Timestamp
import scala.reflect.ClassTag
import org.apache.spark.sql.{DataFrame, SparkSession, functions => f}

trait NumericOperations31Spec
extends AnyFunSpecLike
Expand Down
Loading

0 comments on commit eaa7afb

Please sign in to comment.