Skip to content

Commit

Permalink
chore: Adding Spark35 support
Browse files Browse the repository at this point in the history
  • Loading branch information
BrendanWalsh authored and mhamilton723 committed Aug 30, 2024
1 parent 392f601 commit 6e53e62
Show file tree
Hide file tree
Showing 44 changed files with 123 additions and 90 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,22 @@ In Microsoft Fabric notebooks SynapseML is already installed. To change the vers

In Azure Synapse notebooks please place the following in the first cell of your notebook.

- For Spark 3.5 Pools:

```bash
%%configure -f
{
"name": "synapseml",
"conf": {
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3",
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
"spark.yarn.user.classpath.first": "true",
"spark.sql.parquet.enableVectorizedReader": "false"
}
}
```

- For Spark 3.4 Pools:

```bash
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.xml.transform.{RewriteRule, RuleTransformer}
import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _}

val condaEnvName = "synapseml"
val sparkVersion = "3.4.1"
val sparkVersion = "3.5.0"
name := "synapseml"
ThisBuild / organization := "com.microsoft.azure"
ThisBuild / scalaVersion := "2.12.17"
Expand All @@ -34,7 +34,7 @@ val extraDependencies = Seq(
"com.jcraft" % "jsch" % "0.1.54",
"org.apache.httpcomponents.client5" % "httpclient5" % "5.1.3",
"org.apache.httpcomponents" % "httpmime" % "4.5.13",
"com.linkedin.isolation-forest" %% "isolation-forest_3.4.2" % "3.0.4"
"com.linkedin.isolation-forest" %% "isolation-forest_3.5.0" % "3.0.5"
exclude("com.google.protobuf", "protobuf-java") exclude("org.apache.spark", "spark-mllib_2.12")
exclude("org.apache.spark", "spark-core_2.12") exclude("org.apache.spark", "spark-avro_2.12")
exclude("org.apache.spark", "spark-sql_2.12"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.spark.injections.UDFUtils
import org.apache.spark.ml.ComplexParamsReadable
import org.apache.spark.ml.util._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types._
import spray.json.DefaultJsonProtocol._
Expand Down Expand Up @@ -44,7 +44,7 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria
): Lambda = {
Lambda({ df =>
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
df.toDF().mapPartitions { rows =>
val futures = rows.map { row: Row =>
(Future {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.http.entity.{AbstractHttpEntity, StringEntity}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import spray.json.DefaultJsonProtocol.StringJsonFormat
Expand Down Expand Up @@ -93,7 +93,7 @@ class SpeakerEmotionInference(override val uid: String)
converter(row.getAs[Row](row.fieldIndex(getOutputCol)))
)
new GenericRowWithSchema((row.toSeq.dropRight(1) ++ Seq(ssml)).toArray, newSchema): Row
})(RowEncoder({
})(ExpressionEncoder({
newSchema
}))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.injections.SConf
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand Down Expand Up @@ -400,7 +400,7 @@ abstract class SpeechSDKBase extends Transformer
ArrayType(responseTypeBinding.schema)
}

val enc = RowEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
val enc = ExpressionEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
val sc = df.sparkSession.sparkContext
val bConf = sc.broadcast(new SConf(sc.hadoopConfiguration))
val isUriAudio = df.schema(getAudioDataCol).dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.hadoop.io.{IOUtils => HUtils}
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util._
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -152,7 +152,7 @@ class TextToSpeech(override val uid: String)
}
Row.fromSeq(row.toSeq ++ Seq(errorRow))
}.get
}(RowEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
}(ExpressionEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object PackageUtils {
// Use a fixed version for local testing
// val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.5"

private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1"
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.5.0"
val PackageRepository: String = SparkMLRepository

// If testing onnx package with snapshots repo, make sure to switch to using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.core.schema

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType

import scala.reflect.runtime.universe.TypeTag
Expand All @@ -14,7 +14,7 @@ abstract class SparkBindings[T: TypeTag] extends Serializable {

lazy val schema: StructType = enc.schema
private lazy val enc: ExpressionEncoder[T] = ExpressionEncoder[T]().resolveAndBind()
private lazy val rowEnc: ExpressionEncoder[Row] = RowEncoder(enc.schema).resolveAndBind()
private lazy val rowEnc: ExpressionEncoder[Row] = ExpressionEncoder(enc.schema).resolveAndBind()

// WARNING: each time you use this function on a dataframe, you should make a new converter.
// Spark does some magic that makes this leak memory if re-used on a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -44,7 +44,7 @@ object LIMEUtils extends SLogging {
case field if colsToSquish.contains(field.name) => StructField(field.name, ArrayType(field.dataType))
case f => f
})
val encoder = RowEncoder(schema)
val encoder = ExpressionEncoder(schema)
val indiciesToSquish = colsToSquish.map(df.schema.fieldIndex)
df.mapPartitions { it =>
val isEmpty = it.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

Expand Down Expand Up @@ -56,7 +56,7 @@ class MultiNGram(override val uid: String)
.map(col => row.getAs[Seq[String]](col))
.reduce(_ ++ _)
Row.fromSeq(row.toSeq :+ mergedNGrams)
}(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
}(ExpressionEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
.drop(intermediateOutputCols: _*)
}, dataset.columns.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.schema.BinaryFileSchema
import com.microsoft.azure.synapse.ml.core.utils.AsyncUtils
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.BinaryType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

Expand Down Expand Up @@ -85,7 +85,7 @@ object BinaryFileReader {
timeout: Int
): DataFrame = {
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
val hconf = ConfUtils.getHConf(df)

df.mapPartitions { rows =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.injections.UDFUtils
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand Down Expand Up @@ -118,7 +118,7 @@ class HTTPTransformer(val uid: String)
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
val df = dataset.toDF()
val enc = RowEncoder(transformSchema(df.schema))
val enc = ExpressionEncoder(transformSchema(df.schema))
val colIndex = df.schema.fieldNames.indexOf(getInputCol)
val fromRow = HTTPRequestData.makeFromRowConverter
val toRow = HTTPResponseData.makeToRowConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.ml.ImageInjections
import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Row}

import java.awt.color.ColorSpace
Expand Down Expand Up @@ -117,7 +117,7 @@ object ImageUtils {

def readFromPaths(df: DataFrame, pathCol: String, imageCol: String = "image"): DataFrame = {
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
val hconf = ConfUtils.getHConf(df)
df.mapPartitions { rows =>
rows.map { row =>
Expand All @@ -133,7 +133,7 @@ object ImageUtils {

def readFromBytes(df: DataFrame, pathCol: String, bytesCol: String, imageCol: String = "image"): DataFrame = {
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
df.mapPartitions { rows =>
rows.map { row =>
val path = row.getAs[String](pathCol)
Expand All @@ -150,7 +150,7 @@ object ImageUtils {
imageCol: String = "image",
dropPrefix: Boolean = false): DataFrame = {
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
val encoder = RowEncoder(outputSchema)
val encoder = ExpressionEncoder(outputSchema)
df.mapPartitions { rows =>
rows.map { row =>
val encoded = row.getAs[String](bytesCol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.param.TransformerParam
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand All @@ -35,7 +35,7 @@ trait MiniBatchBase extends Transformer with DefaultParamsWritable with Wrappabl
def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
val outputSchema = transformSchema(dataset.schema)
implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema)
implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema)
dataset.toDF().mapPartitions { it =>
if (it.isEmpty) {
it
Expand Down Expand Up @@ -215,7 +215,7 @@ class FlattenBatch(val uid: String)
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
val outputSchema = transformSchema(dataset.schema)
implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema)
implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema)

dataset.toDF().mapPartitions(it =>
it.flatMap { rowOfLists =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable}
import org.apache.spark.ml.{ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

Expand Down Expand Up @@ -39,7 +39,7 @@ class PartitionConsolidator(val uid: String)
} else {
Iterator()
}
}(RowEncoder(dataset.schema))
}(ExpressionEncoder(dataset.schema))
}, dataset.columns.length)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, Multiclas
import org.apache.spark.mllib.linalg.{Matrices, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -252,7 +252,7 @@ class ComputeModelStatistics(override val uid: String) extends Transformer
confusionMatrix: Matrix,
resultDF: DataFrame): DataFrame = {
val schema = resultDF.schema.add(MetricConstants.ConfusionMatrix, SQLDataTypes.MatrixType)
resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(RowEncoder(schema))
resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(ExpressionEncoder(schema))
}

private def selectAndCastToDF(dataset: Dataset[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -118,7 +118,7 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg
if (requiredSchema.isEmpty) {
filteredResult.map(_ => emptyUnsafeRow)
} else {
val converter = RowEncoder(requiredSchema)
val converter = ExpressionEncoder(requiredSchema)
filteredResult.map(row => converter.createSerializer()(row))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.io.http.{HTTPRequestData, HTTPResponseData
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.execution.streaming.continuous.HTTPSourceV2
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
Expand Down Expand Up @@ -218,7 +218,7 @@ class DistributedHTTPSource(name: String,
private[spark] val infoSchema = new StructType()
.add("machine", StringType).add("ip", StringType).add("id", StringType)

private[spark] val infoEnc = RowEncoder(infoSchema)
private[spark] val infoEnc = ExpressionEncoder(infoSchema)

// Access point to run code on nodes through mapPartitions
// TODO do this by hooking deeper into spark,
Expand Down Expand Up @@ -284,7 +284,7 @@ class DistributedHTTPSource(name: String,
.map{ case (id, request) =>
Row.fromSeq(Seq(Row(null, id, null), toRow(request))) //scalastyle:ignore null
}.toIterator
}(RowEncoder(HTTPSourceV2.Schema))
}(ExpressionEncoder(HTTPSourceV2.Schema))
}

override def commit(end: OffsetV2): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import com.microsoft.azure.synapse.ml.nn._
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.types._

trait OptimizedCKNNFitting extends ConditionalKNNParams with SynapseMLLogging {
Expand Down Expand Up @@ -35,12 +36,12 @@ trait OptimizedCKNNFitting extends ConditionalKNNParams with SynapseMLLogging {

protected def fitOptimized(dataset: Dataset[_]): ConditionalKNNModel = {

val vt = dataset.schema(getValuesCol).dataType
val lt = dataset.schema(getLabelCol).dataType
val vt = PhysicalDataType.apply(dataset.schema(getValuesCol).dataType)
val lt = PhysicalDataType.apply(dataset.schema(getLabelCol).dataType)
(vt, lt) match {
case (avt: AtomicType, alt: AtomicType) => fitGeneric[avt.InternalType, alt.InternalType](dataset)
case (avt: AtomicType, _) => fitGeneric[avt.InternalType, Any](dataset)
case (_, alt: AtomicType) => fitGeneric[Any, alt.InternalType](dataset)
case (avt: PhysicalDataType, alt: PhysicalDataType) => fitGeneric[avt.InternalType, alt.InternalType](dataset)
case (avt: PhysicalDataType, _) => fitGeneric[avt.InternalType, Any](dataset)
case (_, alt: PhysicalDataType) => fitGeneric[Any, alt.InternalType](dataset)
case _ => fitGeneric[Any, Any](dataset)
}
}
Expand Down Expand Up @@ -69,8 +70,8 @@ trait OptimizedKNNFitting extends KNNParams with SynapseMLLogging {

protected def fitOptimized(dataset: Dataset[_]): KNNModel = {

dataset.schema(getValuesCol).dataType match {
case avt: AtomicType => fitGeneric[avt.InternalType](dataset)
PhysicalDataType.apply(dataset.schema(getValuesCol).dataType) match {
case avt: PhysicalDataType => fitGeneric[avt.InternalType](dataset)
case _ => fitGeneric[Any](dataset)
}
}
Expand Down
Loading

0 comments on commit 6e53e62

Please sign in to comment.