Skip to content

Commit

Permalink
feat: [+] collection functions (issue #64) (#287)
Browse files Browse the repository at this point in the history
* feat: [+] collection functions (issue #64)

* fix scala compatibility & testing & ci summary

* feat: [+] expected data transformation per spark version

* delete data transformation per spark version, fix some tests

* continue fixing spark tests

* java converters

* fix: [~] pos explode functions

* scalafmt

* fix rebase "conflicts"

* fix zip_with tests for spark < 3.0.0

* PR comments
  • Loading branch information
eruizalo authored Apr 20, 2023
1 parent aacd432 commit b42bfdb
Show file tree
Hide file tree
Showing 24 changed files with 1,566 additions and 90 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ lazy val core = project
"org.typelevel" %% "cats-core" % catsVersion(sparkVersion.value),
"com.lihaoyi" %% "sourcecode" % "0.3.0",
"com.chuusai" %% "shapeless" % "2.3.10",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1",
"com.github.mrpowers" %% "spark-fast-tests" % "1.3.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test"
),
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/doric/DoricColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import doric.types.{LiteralSparkType, SparkType}
import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Column, Dataset}

sealed trait DoricColumn[T] {
val elem: Doric[Column]
Expand Down
171 changes: 158 additions & 13 deletions core/src/main/scala/doric/syntax/ArrayColumns.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package doric
package syntax

import scala.language.higherKinds
import scala.reflect.ClassTag

import cats.data.Kleisli
import cats.implicits._
import doric.types.{CollectionType, LiteralSparkType, SparkType}

import org.apache.spark.sql.{Column, Dataset, functions => f}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.LambdaFunction.identity
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.{Column, Dataset, Row, functions => f}

import scala.jdk.CollectionConverters._
import scala.language.higherKinds
import scala.reflect.ClassTag

protected final case class Zipper[T1, T2, F[_]: CollectionType](
col: DoricColumn[F[T1]],
Expand Down Expand Up @@ -92,7 +92,7 @@ private[syntax] trait ArrayColumns {
*
* @group Array Type
* @param n
* the index of the element to retreave.
* the index of the element to retrieve.
* @return
* the DoricColumn with the selected element.
*/
Expand Down Expand Up @@ -394,7 +394,7 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.array_sort]]
*/
def sortAscNullsLast: ArrayColumn[T] = col.elem.map(f.array_sort).toDC
def sortAscNullsLast: DoricColumn[F[T]] = col.elem.map(f.array_sort).toDC

/**
* Sorts the input array for the given column in ascending order,
Expand All @@ -404,7 +404,7 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.sort_array(e:org\.apache\.spark\.sql\.Column,asc* org.apache.spark.sql.functions.sort_array]]
*/
def sortAscNullsFirst: ArrayColumn[T] = col.elem.map(f.sort_array).toDC
def sortAscNullsFirst: DoricColumn[F[T]] = col.elem.map(f.sort_array).toDC

/**
* Sorts the input array for the given column in ascending or descending order,
Expand All @@ -415,7 +415,7 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.sort_array(e:org\.apache\.spark\.sql\.Column)* org.apache.spark.sql.functions.sort_array]]
*/
def sort(asc: BooleanColumn): ArrayColumn[T] =
def sort(asc: BooleanColumn): DoricColumn[F[T]] =
(col.elem, asc.elem)
.mapN((c, a) => {
new Column(SortArray(c.expr, a.expr))
Expand All @@ -441,7 +441,7 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.arrays_overlap]]
*/
def overlaps[B](col2: ArrayColumn[T]): BooleanColumn =
def overlaps(col2: DoricColumn[F[T]]): BooleanColumn =
(col.elem, col2.elem).mapN(f.arrays_overlap).toDC

/**
Expand Down Expand Up @@ -488,6 +488,61 @@ private[syntax] trait ArrayColumns {
*/
def explodeOuter: DoricColumn[T] = col.elem.map(f.explode_outer).toDC

/**
* Creates a new row for each element with position in the given array column.
*
* @note Uses the default column name pos for position, and value for elements in the array
* @note WARNING: Unlike spark, doric returns a struct
* @example {{{
* ORIGINAL SPARK DORIC
* +------------+ +---+---+ +------+
* |col | |pos|col| |col |
* +------------+ +---+---+ +------+
* |[a, b, c, d]| |0 |a | |{0, a}|
* |[e] | |1 |b | |{1, b}|
* |[] | |2 |c | |{2, c}|
* |null | |3 |d | |{3, d}|
* +------------+ |0 |e | |{0, e}|
* +---+---+ +------+
* }}}
*
* @group Array Type
* @see [[org.apache.spark.sql.functions.posexplode]]
*/
def posExplode: DoricColumn[Row] =
col.zipWithIndex("pos".cname, "value".cname).elem.map(f.explode).toDC

/**
* Creates a new row for each element with position in the given array column.
* Unlike posexplode, if the array is null or empty then the row null is produced.
*
* @note Uses the default column name pos for position, and col for elements in the array
* @note WARNING: Unlike spark, doric returns a struct
* @example {{{
* ORIGINAL SPARK DORIC
* +------------+ +----+----+ +------+
* |col | |pos |col | |col |
* +------------+ +----+----+ +------+
* |[a, b, c, d]| |0 |a | |{0, a}|
* |[e] | |1 |b | |{1, b}|
* |[] | |2 |c | |{2, c}|
* |null | |3 |d | |{3, d}|
* +------------+ |0 |e | |{0, e}|
* |null|null| |null |
* |null|null| |null |
* +----+----+ +------+
* }}}
*
* @group Array Type
* @see [[org.apache.spark.sql.functions.posexplode_outer]]
*/
def posExplodeOuter: DoricColumn[Row] =
col
.zipWithIndex("pos".cname, "value".cname)
.elem
.map(f.explode_outer)
.toDC

/**
* Returns an array with reverse order of elements.
*
Expand All @@ -504,7 +559,7 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.shuffle]]
*/
def shuffle: ArrayColumn[T] = col.elem.map(f.shuffle).toDC
def shuffle: DoricColumn[F[T]] = col.elem.map(f.shuffle).toDC

/**
* Returns length of array.
Expand All @@ -527,11 +582,26 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.slice(x:org\.apache\.spark\.sql\.Column,start:org\.apache\.spark\.sql\.Column,length* org.apache.spark.sql.functions.slice]]
*/
def slice(start: IntegerColumn, length: IntegerColumn): ArrayColumn[T] =
def slice(start: IntegerColumn, length: IntegerColumn): DoricColumn[F[T]] =
(col.elem, start.elem, length.elem)
.mapN((a, b, c) => new Column(Slice(a.expr, b.expr, c.expr)))
.toDC

/**
* DORIC EXCLUSIVE! Given any array[e] column this method will return a new
* array struct[i, e] column, where the first element is the index and
* the second element is the value itself
*
* @group Array Type
*/
def zipWithIndex(
indexName: CName = "index".cname,
valueName: CName = "value".cname
): DoricColumn[F[Row]] =
col.transformWithIndex((value, index) =>
struct(index.asCName(indexName), value.asCName(valueName))
)

/**
* Merge two given arrays, element-wise, into a single array using a function.
* If one array is shorter, nulls are appended at the end to match the length of the longer
Expand All @@ -548,6 +618,81 @@ private[syntax] trait ArrayColumns {
): Zipper[T, T2, F] = {
Zipper(col, col2)
}

/**
* Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.
*
* @group Array Type
* @see [[org.apache.spark.sql.functions.arrays_zip]]
*/
def zip(
other: DoricColumn[F[T]],
others: DoricColumn[F[T]]*
): DoricColumn[F[Row]] = {
val cols = col +: (other +: others)
cols.toList.traverse(_.elem).map(f.arrays_zip).toDC
}

/**
* Creates a new map column.
* The array in the first column is used for keys.
* The array in the second column is used for values.
*
* @throws RuntimeException if arrays doesn't have the same length.
* @throws RuntimeException if a key is null
*
* @group Array Type
* @see [[org.apache.spark.sql.functions.map_from_arrays]]
*/
def mapFromArrays[V](values: DoricColumn[F[V]]): MapColumn[T, V] =
(col.elem, values.elem).mapN(f.map_from_arrays).toDC

/**
* Creates a new map column.
* The array in the first column is used for keys.
* The array in the second column is used for values.
*
* @throws RuntimeException if arrays doesn't have the same length.
* @throws RuntimeException if a key is null
*
* @group Array Type
* @see [[mapFromArrays]]
*/
def toMap[V](values: DoricColumn[F[V]]): MapColumn[T, V] =
mapFromArrays(values)

/**
* Converts a column containing a StructType into a JSON string with the specified schema.
* @throws IllegalArgumentException in the case of an unsupported type.
*
* @group Array Type
* @see org.apache.spark.sql.functions.to_json(e:org\.apache\.spark\.sql\.Column,options:scala\.collection\.immutable\.Map\[java\.lang\.String,java\.lang\.String\]):* org.apache.spark.sql.functions.to_csv
* @todo scaladoc link (issue #135)
*/
def toJson(options: Map[String, String] = Map.empty): StringColumn =
col.elem.map(x => f.to_json(x, options.asJava)).toDC
}

/**
* Extension methods for arrays
*
* @group Array Type
*/
implicit class ArrayColumnTupleSyntax[K, V, F[_]: CollectionType](
private val col: DoricColumn[F[(K, V)]]
) {

/**
* Returns a map created from the given array of entries.
* All elements in the array for key should not be null.
*
* @group Map Type
* @see [[org.apache.spark.sql.functions.map_from_entries]]
*/
def mapFromEntries: MapColumn[K, V] = col.elem.map(f.map_from_entries).toDC

@inline def toMap: MapColumn[K, V] = mapFromEntries

}

implicit class ArrayArrayColumnSyntax[G[_]: CollectionType, F[_]
Expand Down
17 changes: 12 additions & 5 deletions core/src/main/scala/doric/syntax/CommonColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,10 @@ private[syntax] trait CommonColumns extends ColGetters[NamedDoricColumn] {
}

/**
* Extension methods for any kind of column
* Casting methods
* @group All Types
*/
implicit class BasicCol[T: SparkType](private val column: DoricColumn[T]) {

private type CastToT[To] = Casting[T, To]
private type WCastToT[To] = UnsafeCasting[T, To]
implicit class CastingImpl[T](private val column: DoricColumn[T]) {

/**
* Gives the column an alias.
Expand All @@ -113,6 +110,16 @@ private[syntax] trait CommonColumns extends ColGetters[NamedDoricColumn] {
*/
def asCName(colName: CName): NamedDoricColumn[T] =
NamedDoricColumn[T](column, colName.value)
}

/**
* Extension methods for any kind of column
* @group All Types
*/
implicit class BasicCol[T: SparkType](private val column: DoricColumn[T]) {

private type CastToT[To] = Casting[T, To]
private type WCastToT[To] = UnsafeCasting[T, To]

/**
* Type safe equals between Columns
Expand Down
31 changes: 21 additions & 10 deletions core/src/main/scala/doric/syntax/DStructs.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package doric
package syntax

import scala.language.dynamics

import cats.data.Kleisli
import cats.evidence.Is
import cats.implicits._
import doric.sem.{ColumnTypeError, Location, SparkErrorWrapper}
import doric.types.SparkType
import org.apache.spark.sql.{Column, Dataset, Row}
import org.apache.spark.sql.catalyst.expressions.ExtractValue
import org.apache.spark.sql.functions.{struct => sparkStruct}
import org.apache.spark.sql.{Column, Dataset, Row, functions => f}
import shapeless.labelled.FieldType
import shapeless.{::, HList, LabelledGeneric, Witness}

import scala.jdk.CollectionConverters._
import scala.language.dynamics

private[syntax] trait DStructs {

/**
Expand All @@ -25,7 +25,7 @@ private[syntax] trait DStructs {
* A DStruct DoricColumn.
*/
def struct(cols: DoricColumn[_]*): RowColumn =
cols.map(_.elem).toList.sequence.map(c => sparkStruct(c: _*)).toDC
cols.map(_.elem).toList.sequence.map(c => f.struct(c: _*)).toDC

implicit class DStructOps[T](private val col: DoricColumn[T])(implicit
st: SparkType.Custom[T, Row]
Expand All @@ -39,14 +39,14 @@ private[syntax] trait DStructs {
* the column name expected to find in the struct.
* @param location
* the location if an error is generated
* @tparam T
* @tparam T2
* the expected type of the child column.
* @return
* a reference to the child column of the provided type.
*/
def getChild[T: SparkType](
def getChild[T2: SparkType](
subColumnName: String
)(implicit location: Location): DoricColumn[T] = {
)(implicit location: Location): DoricColumn[T2] = {
(col.elem, subColumnName.lit.elem)
.mapN((a, b) => (a, b))
.mapK(toEither)
Expand All @@ -61,12 +61,12 @@ private[syntax] trait DStructs {
df.sparkSession.sessionState.analyzer.resolver
)
)
if (SparkType[T].isEqual(subColumn.expr.dataType))
if (SparkType[T2].isEqual(subColumn.expr.dataType))
subColumn.asRight
else
ColumnTypeError(
subColumnName,
SparkType[T].dataType,
SparkType[T2].dataType,
subColumn.expr.dataType
).leftNec
} else {
Expand All @@ -87,6 +87,17 @@ private[syntax] trait DStructs {
}

def child: DynamicFieldAccessor[T] = new DynamicFieldAccessor(col)

/**
* Converts a column containing a StructType into a JSON string with the specified schema.
* @throws java.lang.IllegalArgumentException in the case of an unsupported type.
*
* @group Struct Type
* @see org.apache.spark.sql.functions.to_json(e:org\.apache\.spark\.sql\.Column,options:scala\.collection\.immutable\.Map\[java\.lang\.String,java\.lang\.String\]):* org.apache.spark.sql.functions.to_csv
* @todo scaladoc link (issue #135)
*/
def toJson(options: Map[String, String] = Map.empty): StringColumn =
col.elem.map(x => f.to_json(x, options.asJava)).toDC
}

class DynamicFieldAccessor[T](dCol: DoricColumn[T])(implicit
Expand Down
Loading

0 comments on commit b42bfdb

Please sign in to comment.