diff --git a/README.md b/README.md index fbb557099a..1bc0f174c3 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,22 @@ In Microsoft Fabric notebooks SynapseML is already installed. To change the vers In Azure Synapse notebooks please place the following in the first cell of your notebook. +- For Spark 3.5 Pools: + +```bash +%%configure -f +{ + "name": "synapseml", + "conf": { + "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3", + "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", + "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", + "spark.yarn.user.classpath.first": "true", + "spark.sql.parquet.enableVectorizedReader": "false" + } +} +``` + - For Spark 3.4 Pools: ```bash diff --git a/build.sbt b/build.sbt index 0b6f9f6f79..138fd2c3de 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ import scala.xml.transform.{RewriteRule, RuleTransformer} import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _} val condaEnvName = "synapseml" -val sparkVersion = "3.4.1" +val sparkVersion = "3.5.0" name := "synapseml" ThisBuild / organization := "com.microsoft.azure" ThisBuild / scalaVersion := "2.12.17" @@ -34,7 +34,7 @@ val extraDependencies = Seq( "com.jcraft" % "jsch" % "0.1.54", "org.apache.httpcomponents.client5" % "httpclient5" % "5.1.3", "org.apache.httpcomponents" % "httpmime" % "4.5.13", - "com.linkedin.isolation-forest" %% "isolation-forest_3.4.2" % "3.0.4" + "com.linkedin.isolation-forest" %% "isolation-forest_3.5.0" % "3.0.5" exclude("com.google.protobuf", "protobuf-java") exclude("org.apache.spark", "spark-mllib_2.12") exclude("org.apache.spark", "spark-core_2.12") exclude("org.apache.spark", "spark-avro_2.12") exclude("org.apache.spark", "spark-sql_2.12"), diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala index b90cef5881..7d77704f5b 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala @@ -15,7 +15,7 @@ import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.ComplexParamsReadable import org.apache.spark.ml.util._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.{col, explode} import org.apache.spark.sql.types._ import spray.json.DefaultJsonProtocol._ @@ -44,7 +44,7 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria ): Lambda = { Lambda({ df => val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) df.toDF().mapPartitions { rows => val futures = rows.map { row: Row => (Future { diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala index 442cc21118..000a07b695 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala @@ -12,7 +12,7 @@ import org.apache.http.entity.{AbstractHttpEntity, StringEntity} import org.apache.spark.ml.util.Identifiable import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer} import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{DataType, StringType, StructType} import spray.json.DefaultJsonProtocol.StringJsonFormat @@ -93,7 +93,7 @@ class SpeakerEmotionInference(override val uid: String) converter(row.getAs[Row](row.fieldIndex(getOutputCol))) ) new GenericRowWithSchema((row.toSeq.dropRight(1) ++ Seq(ssml)).toArray, newSchema): Row - })(RowEncoder({ + })(ExpressionEncoder({ newSchema })) }) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala index 4a034b1f80..7596a07fa1 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala @@ -24,7 +24,7 @@ import org.apache.spark.injections.SConf import org.apache.spark.ml.param._ import org.apache.spark.ml.util._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -400,7 +400,7 @@ abstract class SpeechSDKBase extends Transformer ArrayType(responseTypeBinding.schema) } - val enc = RowEncoder(enrichedDf.schema.add(getOutputCol, addedSchema)) + val enc = ExpressionEncoder(enrichedDf.schema.add(getOutputCol, addedSchema)) val sc = df.sparkSession.sparkContext val bConf = sc.broadcast(new SConf(sc.hadoopConfiguration)) val isUriAudio = df.schema(getAudioDataCol).dataType match { diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala index bedd949cc4..67761ee4e3 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.io.{IOUtils => HUtils} import org.apache.spark.ml.param.{Param, ParamMap} import org.apache.spark.ml.util._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.util.SerializableConfiguration @@ -152,7 +152,7 @@ class TextToSpeech(override val uid: String) } Row.fromSeq(row.toSeq ++ Seq(errorRow)) }.get - }(RowEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema))) + }(ExpressionEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema))) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala index bc7d07087e..d9f648109f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala @@ -21,7 +21,7 @@ object PackageUtils { // Use a fixed version for local testing // val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.5" - private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1" + private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.5.0" val PackageRepository: String = SparkMLRepository // If testing onnx package with snapshots repo, make sure to switch to using diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala index 2fb39424ab..9d03df8285 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala @@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.core.schema import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.StructType import scala.reflect.runtime.universe.TypeTag @@ -14,7 +14,7 @@ abstract class SparkBindings[T: TypeTag] extends Serializable { lazy val schema: StructType = enc.schema private lazy val enc: ExpressionEncoder[T] = ExpressionEncoder[T]().resolveAndBind() - private lazy val rowEnc: ExpressionEncoder[Row] = RowEncoder(enc.schema).resolveAndBind() + private lazy val rowEnc: ExpressionEncoder[Row] = ExpressionEncoder(enc.schema).resolveAndBind() // WARNING: each time you use this function on a dataframe, you should make a new converter. // Spark does some magic that makes this leak memory if re-used on a diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala index 483b058f17..f4abbc7cec 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala @@ -14,7 +14,7 @@ import org.apache.spark.ml.Transformer import org.apache.spark.ml.linalg.SQLDataTypes.VectorType import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -44,7 +44,7 @@ object LIMEUtils extends SLogging { case field if colsToSquish.contains(field.name) => StructField(field.name, ArrayType(field.dataType)) case f => f }) - val encoder = RowEncoder(schema) + val encoder = ExpressionEncoder(schema) val indiciesToSquish = colsToSquish.map(df.schema.fieldIndex) df.mapPartitions { it => val isEmpty = it.isEmpty diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala index fd958d1216..1388d84045 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala @@ -12,7 +12,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.feature._ import org.apache.spark.ml.param._ import org.apache.spark.ml.util._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -56,7 +56,7 @@ class MultiNGram(override val uid: String) .map(col => row.getAs[Seq[String]](col)) .reduce(_ ++ _) Row.fromSeq(row.toSeq :+ mergedNGrams) - }(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType)))) + }(ExpressionEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType)))) .drop(intermediateOutputCols: _*) }, dataset.columns.length) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala index 1268750cc7..0bed65a49e 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.schema.BinaryFileSchema import com.microsoft.azure.synapse.ml.core.utils.AsyncUtils import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.BinaryType import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -85,7 +85,7 @@ object BinaryFileReader { timeout: Int ): DataFrame = { val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) val hconf = ConfUtils.getHConf(df) df.mapPartitions { rows => diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala index 8d942e34b1..ad74bd11c4 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala @@ -13,7 +13,7 @@ import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.param._ import org.apache.spark.ml.util.Identifiable import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -118,7 +118,7 @@ class HTTPTransformer(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ val df = dataset.toDF() - val enc = RowEncoder(transformSchema(df.schema)) + val enc = ExpressionEncoder(transformSchema(df.schema)) val colIndex = df.schema.fieldNames.indexOf(getInputCol) val fromRow = HTTPRequestData.makeFromRowConverter val toRow = HTTPResponseData.makeToRowConverter diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala index 6d0d4f8e1a..c4ef7130d1 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala @@ -11,7 +11,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.ml.ImageInjections import org.apache.spark.ml.image.ImageSchema import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Row} import java.awt.color.ColorSpace @@ -117,7 +117,7 @@ object ImageUtils { def readFromPaths(df: DataFrame, pathCol: String, imageCol: String = "image"): DataFrame = { val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) val hconf = ConfUtils.getHConf(df) df.mapPartitions { rows => rows.map { row => @@ -133,7 +133,7 @@ object ImageUtils { def readFromBytes(df: DataFrame, pathCol: String, bytesCol: String, imageCol: String = "image"): DataFrame = { val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) df.mapPartitions { rows => rows.map { row => val path = row.getAs[String](pathCol) @@ -150,7 +150,7 @@ object ImageUtils { imageCol: String = "image", dropPrefix: Boolean = false): DataFrame = { val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) df.mapPartitions { rows => rows.map { row => val encoded = row.getAs[String](bytesCol) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala index 0fef316649..9c15aa5809 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.param.TransformerParam import org.apache.spark.ml.Transformer import org.apache.spark.ml.param._ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -35,7 +35,7 @@ trait MiniBatchBase extends Transformer with DefaultParamsWritable with Wrappabl def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ val outputSchema = transformSchema(dataset.schema) - implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema) + implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema) dataset.toDF().mapPartitions { it => if (it.isEmpty) { it @@ -215,7 +215,7 @@ class FlattenBatch(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ val outputSchema = transformSchema(dataset.schema) - implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema) + implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema) dataset.toDF().mapPartitions(it => it.flatMap { rowOfLists => diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala index bd6e1340bf..494f2a945f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import org.apache.spark.ml.param._ import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.ml.{ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -39,7 +39,7 @@ class PartitionConsolidator(val uid: String) } else { Iterator() } - }(RowEncoder(dataset.schema)) + }(ExpressionEncoder(dataset.schema)) }, dataset.columns.length) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala index 277dbf0f97..fa609a6e51 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala @@ -17,7 +17,7 @@ import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, Multiclas import org.apache.spark.mllib.linalg.{Matrices, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -252,7 +252,7 @@ class ComputeModelStatistics(override val uid: String) extends Transformer confusionMatrix: Matrix, resultDF: DataFrame): DataFrame = { val schema = resultDF.schema.add(MetricConstants.ConfusionMatrix, SQLDataTypes.MatrixType) - resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(RowEncoder(schema)) + resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(ExpressionEncoder(schema)) } private def selectAndCastToDF(dataset: Dataset[_], diff --git a/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala b/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala index 299881b1bb..4346d88b22 100644 --- a/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala +++ b/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala @@ -13,7 +13,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.image.ImageSchema import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ @@ -118,7 +118,7 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg if (requiredSchema.isEmpty) { filteredResult.map(_ => emptyUnsafeRow) } else { - val converter = RowEncoder(requiredSchema) + val converter = ExpressionEncoder(requiredSchema) filteredResult.map(row => converter.createSerializer()(row)) } } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala index 15f6886619..f41218b0e0 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.io.http.{HTTPRequestData, HTTPResponseData import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} import org.apache.spark.sql.execution.streaming.continuous.HTTPSourceV2 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider} @@ -218,7 +218,7 @@ class DistributedHTTPSource(name: String, private[spark] val infoSchema = new StructType() .add("machine", StringType).add("ip", StringType).add("id", StringType) - private[spark] val infoEnc = RowEncoder(infoSchema) + private[spark] val infoEnc = ExpressionEncoder(infoSchema) // Access point to run code on nodes through mapPartitions // TODO do this by hooking deeper into spark, @@ -284,7 +284,7 @@ class DistributedHTTPSource(name: String, .map{ case (id, request) => Row.fromSeq(Seq(Row(null, id, null), toRow(request))) //scalastyle:ignore null }.toIterator - }(RowEncoder(HTTPSourceV2.Schema)) + }(ExpressionEncoder(HTTPSourceV2.Schema)) } override def commit(end: OffsetV2): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala b/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala index 3ab64e7548..306991c0f5 100644 --- a/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala +++ b/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala @@ -8,6 +8,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging import com.microsoft.azure.synapse.ml.nn._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.types._ trait OptimizedCKNNFitting extends ConditionalKNNParams with SynapseMLLogging { @@ -35,12 +36,12 @@ trait OptimizedCKNNFitting extends ConditionalKNNParams with SynapseMLLogging { protected def fitOptimized(dataset: Dataset[_]): ConditionalKNNModel = { - val vt = dataset.schema(getValuesCol).dataType - val lt = dataset.schema(getLabelCol).dataType + val vt = PhysicalDataType.apply(dataset.schema(getValuesCol).dataType) + val lt = PhysicalDataType.apply(dataset.schema(getLabelCol).dataType) (vt, lt) match { - case (avt: AtomicType, alt: AtomicType) => fitGeneric[avt.InternalType, alt.InternalType](dataset) - case (avt: AtomicType, _) => fitGeneric[avt.InternalType, Any](dataset) - case (_, alt: AtomicType) => fitGeneric[Any, alt.InternalType](dataset) + case (avt: PhysicalDataType, alt: PhysicalDataType) => fitGeneric[avt.InternalType, alt.InternalType](dataset) + case (avt: PhysicalDataType, _) => fitGeneric[avt.InternalType, Any](dataset) + case (_, alt: PhysicalDataType) => fitGeneric[Any, alt.InternalType](dataset) case _ => fitGeneric[Any, Any](dataset) } } @@ -69,8 +70,8 @@ trait OptimizedKNNFitting extends KNNParams with SynapseMLLogging { protected def fitOptimized(dataset: Dataset[_]): KNNModel = { - dataset.schema(getValuesCol).dataType match { - case avt: AtomicType => fitGeneric[avt.InternalType](dataset) + PhysicalDataType.apply(dataset.schema(getValuesCol).dataType) match { + case avt: PhysicalDataType => fitGeneric[avt.InternalType](dataset) case _ => fitGeneric[Any](dataset) } } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala index 2a86894bc2..a409ad0d0b 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala @@ -101,7 +101,7 @@ object RTestGen { | "spark.sql.shuffle.partitions=10", | "spark.sql.crossJoin.enabled=true") | - |sc <- spark_connect(master = "local", version = "3.4.1", config = conf) + |sc <- spark_connect(master = "local", version = "3.5.0", config = conf) | |""".stripMargin, StandardOpenOption.CREATE) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala index acbc5e3e12..3469ee6975 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.core.test.base.{TestBase, TimeLimitedFlaky import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, TransformerFuzzing} import com.microsoft.azure.synapse.ml.stages.PartitionConsolidator import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.scalatest.Assertion @@ -66,7 +66,7 @@ class PartitionConsolidatorSuite extends TransformerFuzzing[PartitionConsolidato println(baseDF.count()) def getDF: Dataset[Row] = baseDF.map { x => Thread.sleep(10); x }( - RowEncoder(new StructType().add("values", DoubleType))) + ExpressionEncoder(new StructType().add("values", DoubleType))) val t1 = getTime(3)( getDF.foreach(_ => ()))._2 diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala index 50d2d09bc4..2d932a9991 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.test.base.TestBase import com.microsoft.azure.synapse.ml.io.powerbi.PowerBIWriter import org.apache.spark.SparkException import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.{current_timestamp, lit} import java.io.File @@ -33,7 +33,7 @@ class PowerBiSuite extends TestBase with FileReaderUtils { .createDataFrame(rows, df.schema) .coalesce(1).cache() df2.count() - df2.map({x => Thread.sleep(10); x})(RowEncoder(df2.schema)) + df2.map({x => Thread.sleep(10); x})(ExpressionEncoder(df2.schema)) } test("write to powerBi") { diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala index 4303384f03..bac6d152ea 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala @@ -14,7 +14,7 @@ import org.apache.commons.io.IOUtils import org.apache.http.client.methods.HttpPost import org.apache.http.entity.{FileEntity, StringEntity} import org.apache.http.impl.client.{BasicResponseHandler, CloseableHttpClient} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.DistributedHTTPSourceProvider import org.apache.spark.sql.functions.{col, length} import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter, StreamingQuery} @@ -396,12 +396,12 @@ class DistributedHTTPSuite extends TestBase with Flaky with HTTPTestUtils { .mapPartitions { _ => Foo.get.increment() Iterator(Row(Foo.get.state)) - }(RowEncoder(new StructType().add("state", IntegerType))).cache() + }(ExpressionEncoder(new StructType().add("state", IntegerType))).cache() val States1: Array[Row] = DF.collect() val DF2: DataFrame = DF.mapPartitions { _ => Iterator(Row(Foo.get.state)) - }(RowEncoder(new StructType().add("state", IntegerType))) + }(ExpressionEncoder(new StructType().add("state", IntegerType))) val States2: Array[Row] = DF2.collect() assert(States2.forall(_.getInt(0) === States2.length)) } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala index 55e0fbdfce..28a0286f0a 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala @@ -31,11 +31,11 @@ object DatabricksUtilities { // ADB Info val Region = "eastus" - val PoolName = "synapseml-build-13.3" - val GpuPoolName = "synapseml-build-13.3-gpu" - val AdbRuntime = "13.3.x-scala2.12" - // https://docs.databricks.com/en/release-notes/runtime/13.3lts-ml.html - val AdbGpuRuntime = "13.3.x-gpu-ml-scala2.12" + val PoolName = "synapseml-build-14.3" + val GpuPoolName = "synapseml-build-14.3-gpu" + val AdbRuntime = "14.3.x-scala2.12" + // https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html + val AdbGpuRuntime = "14.3.x-gpu-ml-scala2.12" val NumWorkers = 5 val AutoTerminationMinutes = 15 @@ -65,7 +65,7 @@ object DatabricksUtilities { "pdf2image", "pdfminer.six", "sqlparse", - "raiwidgets", + // "raiwidgets", // Broken on ADB "interpret-community", "numpy==1.22.4", "unstructured==0.10.24", @@ -109,6 +109,7 @@ object DatabricksUtilities { .filterNot(_.getAbsolutePath.contains("Audiobooks")) // TODO Remove this by fixing auth .filterNot(_.getAbsolutePath.contains("Art")) // TODO Remove this by fixing performance .filterNot(_.getAbsolutePath.contains("Explanation Dashboard")) // TODO Remove this exclusion + .filterNot(_.getAbsolutePath.contains("Isolation Forests")) // TODO Remove this exclusion when raiwidgets is fixed val GPUNotebooks: Seq[File] = ParallelizableNotebooks.filter(_.getAbsolutePath.contains("Fine-tune")) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala index f2264025f4..9f7a76993b 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala @@ -83,7 +83,7 @@ object SynapseExtensionUtilities { |"{ | 'Default${store}ArtifactId': '$storeId', | 'ExecutableFile': '$path', - | 'SparkVersion':'3.4', + | 'SparkVersion':'3.5', | 'SparkSettings': { | 'spark.jars.packages' : '$SparkMavenPackageList', | 'spark.jars.repositories' : '$SparkMavenRepositoryList', diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala index 433c0c6601..2c3089b968 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala @@ -255,7 +255,7 @@ object SynapseUtilities { | "nodeSizeFamily": "MemoryOptimized", | "provisioningState": "Succeeded", | "sessionLevelPackagesEnabled": "true", - | "sparkVersion": "3.4" + | "sparkVersion": "3.5" | } |} |""".stripMargin diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala index 8d43eddbcf..d446305e5a 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, Transformer import com.microsoft.azure.synapse.ml.param.DataFrameEquality import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType} @@ -30,10 +30,10 @@ trait MiniBatchTestUtils extends TestBase with DataFrameEquality { def basicTest(t: MiniBatchBase): Assertion = { val delay = 5 - val slowDf = df.map { x => Thread.sleep(3 * delay.toLong); x }(RowEncoder(df.schema)) + val slowDf = df.map { x => Thread.sleep(3 * delay.toLong); x }(ExpressionEncoder(df.schema)) val df2 = t.transform(slowDf) - val df3 = df2.map { x => Thread.sleep(10 * delay.toLong); x }(RowEncoder(df2.schema)) + val df3 = df2.map { x => Thread.sleep(10 * delay.toLong); x }(ExpressionEncoder(df2.schema)) assert(df3.schema == new StructType() .add("in1", ArrayType(IntegerType)) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala index 16ca75c13e..2e77eebf1e 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, Transformer import org.apache.spark.TaskContext import org.apache.spark.ml.util.MLReadable import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class StratifiedRepartitionSuite extends TestBase with TransformerFuzzing[StratifiedRepartition] { @@ -37,7 +37,7 @@ class StratifiedRepartitionSuite extends TestBase with TransformerFuzzing[Strati test("Assert doing a stratified repartition will ensure all keys exist across all partitions") { val inputSchema = new StructType() .add(values, IntegerType).add(colors, StringType).add(const, IntegerType) - val inputEnc = RowEncoder(inputSchema) + val inputEnc = ExpressionEncoder(inputSchema) val valuesFieldIndex = inputSchema.fieldIndex(values) val numPartitions = 3 val trainData = input.repartition(numPartitions).select(values, colors, const) diff --git a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala index 3378cd9721..491ee64d67 100644 --- a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala +++ b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala @@ -24,7 +24,7 @@ import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} import org.apache.spark.ml.param._ import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, TaskContext} @@ -230,7 +230,7 @@ class ONNXModel(override val uid: String) def transformInner(dataset: Dataset[_], inputSchema: StructType): DataFrame = logTransform ({ val modelOutputSchema = getModelOutputSchema(inputSchema) - implicit val enc: Encoder[Row] = RowEncoder( + implicit val enc: Encoder[Row] = ExpressionEncoder( StructType(modelOutputSchema.map(f => StructField(f.name, ArrayType(f.dataType)))) ) diff --git a/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb b/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb index d96d707df9..28da5ad135 100644 --- a/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb +++ b/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb @@ -431,7 +431,7 @@ "\n", "anoms = list(rdf[\"severity\"] >= minSeverity)\n", "_, _, ymin, ymax = plt.axis()\n", - "plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color=\"r\", alpha=0.8)\n", + "plt.vlines(list(np.where(anoms)[0]), ymin=ymin, ymax=ymax, color=\"r\", alpha=0.8)\n", "\n", "plt.legend()\n", "plt.title(\n", diff --git a/docs/Get Started/Install SynapseML.md b/docs/Get Started/Install SynapseML.md index a15f60590c..0369a40787 100644 --- a/docs/Get Started/Install SynapseML.md +++ b/docs/Get Started/Install SynapseML.md @@ -26,6 +26,21 @@ SynapseML is already installed in Microsoft Fabric notebooks. To change the vers SynapseML is already installed in Synapse Analytics notebooks. To change the version please place the following in the first cell of your notebook: +For Spark3.5 pools +```python +%%configure -f +{ + "name": "synapseml", + "conf": { + "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3", + "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", + "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", + "spark.yarn.user.classpath.first": "true", + "spark.sql.parquet.enableVectorizedReader": "false" + } +} +``` + For Spark3.4 pools ```python %%configure -f diff --git a/environment.yml b/environment.yml index e9361ad5de..33c4f5f9c6 100644 --- a/environment.yml +++ b/environment.yml @@ -11,7 +11,7 @@ dependencies: - r-devtools=2.4.2 - pip: - pyarrow>=0.15.0 - - pyspark==3.4.1 + - pyspark==3.5.0 - pandas==1.4.0 - wheel - sphinx==5.0.2 diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala index a99355cc69..ea96e91b57 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala @@ -11,7 +11,7 @@ import com.microsoft.azure.synapse.ml.lightgbm.split1._ import org.apache.spark.TaskContext import org.apache.spark.ml.feature.{LabeledPoint, VectorAssembler} import org.apache.spark.ml.linalg.{DenseVector, Vector, Vectors} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row} @@ -455,7 +455,7 @@ class VerifyLightGBMClassifierStream extends LightGBMClassifierTestData { } else { rows } - }(RowEncoder(baseDF.schema)) + }(ExpressionEncoder(baseDF.schema)) assertFitWithoutErrors(baseModel, df) } @@ -470,7 +470,7 @@ class VerifyLightGBMClassifierStream extends LightGBMClassifierTestData { } else { rows } - })(RowEncoder(baseDF.schema)) + })(ExpressionEncoder(baseDF.schema)) val model = new LightGBMClassifier() .setLabelCol(labelCol) @@ -493,7 +493,7 @@ class VerifyLightGBMClassifierStream extends LightGBMClassifierTestData { } else { rows } - })(RowEncoder(baseDF.schema)) + })(ExpressionEncoder(baseDF.schema)) // Validate fit works and doesn't get stuck assertFitWithoutErrors(baseModel, df) diff --git a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala index 57fa63a0d2..a943bc3660 100644 --- a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala +++ b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala @@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.opencv import com.microsoft.azure.synapse.ml.core.env.NativeLoader import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder object OpenCVUtils { /** This object will load the openCV binaries when the object is referenced @@ -27,7 +27,7 @@ object OpenCVUtils { } private[ml] def loadOpenCV(df: DataFrame): DataFrame = { - val encoder = RowEncoder(df.schema) + val encoder = ExpressionEncoder(df.schema) df.mapPartitions(loadOpenCVFunc)(encoder) } diff --git a/pipeline.yaml b/pipeline.yaml index 0e75e509c1..c87d7cfffc 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -457,7 +457,7 @@ jobs: (timeout 5m sbt setup) || (echo "retrying" && timeout 5m sbt setup) || (echo "retrying" && timeout 5m sbt setup) sbt codegen sbt publishM2 - SPARK_VERSION=3.4.1 + SPARK_VERSION=3.5.0 HADOOP_VERSION=3 wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz - task: AzureCLI@2 diff --git a/start b/start index ac866307f1..42ae484330 100644 --- a/start +++ b/start @@ -2,7 +2,7 @@ export OPENMPI_VERSION="3.1.2" -export SPARK_VERSION="3.4.1" +export SPARK_VERSION="3.5.0" export HADOOP_VERSION="3.3" export SYNAPSEML_VERSION="1.0.5" # Binder compatibility version diff --git a/tools/docker/demo/Dockerfile b/tools/docker/demo/Dockerfile index c5db1030e4..6f9d2468be 100644 --- a/tools/docker/demo/Dockerfile +++ b/tools/docker/demo/Dockerfile @@ -3,7 +3,7 @@ FROM mcr.microsoft.com/oss/mirror/docker.io/library/ubuntu:20.04 ARG SYNAPSEML_VERSION=1.0.5 ARG DEBIAN_FRONTEND=noninteractive -ENV SPARK_VERSION=3.4.1 +ENV SPARK_VERSION=3.5.0 ENV HADOOP_VERSION=3 ENV SYNAPSEML_VERSION=${SYNAPSEML_VERSION} ENV JAVA_HOME /usr/lib/jvm/java-1.11.0-openjdk-amd64 diff --git a/tools/docker/minimal/Dockerfile b/tools/docker/minimal/Dockerfile index 33fcc54366..18da661de0 100644 --- a/tools/docker/minimal/Dockerfile +++ b/tools/docker/minimal/Dockerfile @@ -3,7 +3,7 @@ FROM mcr.microsoft.com/oss/mirror/docker.io/library/ubuntu:20.04 ARG SYNAPSEML_VERSION=1.0.5 ARG DEBIAN_FRONTEND=noninteractive -ENV SPARK_VERSION=3.4.1 +ENV SPARK_VERSION=3.5.0 ENV HADOOP_VERSION=3 ENV SYNAPSEML_VERSION=${SYNAPSEML_VERSION} ENV JAVA_HOME /usr/lib/jvm/java-1.11.0-openjdk-amd64 diff --git a/tools/tests/run_r_tests.R b/tools/tests/run_r_tests.R index a5a61260f2..47d255b9d6 100644 --- a/tools/tests/run_r_tests.R +++ b/tools/tests/run_r_tests.R @@ -3,7 +3,7 @@ if (!require("sparklyr")) { library("sparklyr") } -spark_install_tar(paste(getwd(), "/../../../../../../spark-3.4.1-bin-hadoop3.tgz", sep = "")) +spark_install_tar(paste(getwd(), "/../../../../../../spark-3.5.0-bin-hadoop3.tgz", sep = "")) options("testthat.output_file" = "../../../../r-test-results.xml") devtools::test(reporter = JunitReporter$new()) diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala index 8c0178aebe..699a737da8 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.utils.{FaultToleranceUtils, ParamsStr import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.param.{Param, StringArrayParam} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.{col, lit, spark_partition_id} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession} @@ -312,7 +312,7 @@ trait VowpalWabbitBaseLearner extends VowpalWabbitBase { // construct buffer & schema for buffered predictions val predictionBuffer = createPredictionBuffer(schema) - val encoder = RowEncoder(predictionBuffer.schema) + val encoder = ExpressionEncoder(predictionBuffer.schema) // always include preserve perf counters to make sure all information is retained in serialized model for // model merging diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala index 92163a942f..d030167151 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala @@ -3,7 +3,7 @@ package com.microsoft.azure.synapse.ml.vw -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types.{StructField} import org.vowpalwabbit.spark.VowpalWabbitExample @@ -41,7 +41,7 @@ trait VowpalWabbitBaseModelSpark val outputSchema = dataset.schema.add(StructField(vowpalWabbitPredictionCol, schemaForPredictionType, false)) // create a fitting row encoder - val rowEncoder = RowEncoder(outputSchema) + val rowEncoder = ExpressionEncoder(outputSchema) dataset.toDF.mapPartitions(inputRows => { inputRows.map { row => { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala index 8a265b0717..7c781175ee 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala @@ -6,7 +6,7 @@ package com.microsoft.azure.synapse.ml.vw import org.apache.spark.TaskContext import org.apache.spark.ml.Transformer import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.StructType import org.vowpalwabbit.spark.VowpalWabbitNative @@ -114,7 +114,7 @@ trait VowpalWabbitBaseProgressive // TODO: barrier mode? // TODO: check w/ Stage ID (different stages) - val encoder = RowEncoder(schema) + val encoder = ExpressionEncoder(schema) df .mapPartitions(inputRows => diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala index c1b3f8e0bc..7537a3511f 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Estimator, Model} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions => F} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -105,7 +105,7 @@ class VowpalWabbitGenericModel(override val uid: String) val inputColIdx = df.schema.fieldIndex(getInputCol) val predictToSeq = VowpalWabbitPrediction.getPredictionFunc(vw) - val rowEncoder = RowEncoder(schemaForPredictionType) + val rowEncoder = ExpressionEncoder(schemaForPredictionType) df.mapPartitions(inputRows => { inputRows.map { row => { diff --git a/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala b/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala index 3a9ca46cd4..7d89a001d1 100644 --- a/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala +++ b/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala @@ -9,7 +9,7 @@ import org.apache.spark.TaskContext import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator} import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, IntegerType} import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -199,7 +199,7 @@ class VerifyVowpalWabbitClassifier extends Benchmarks with EstimatorFuzzing[Vowp .setNumPasses(3) .setLabelConversion(false) - val infoEnc = RowEncoder(dataset.schema) + val infoEnc = ExpressionEncoder(dataset.schema) val trainData = dataset .mapPartitions(iter => { val ctx = TaskContext.get