diff --git a/schemaregistry-avro/README.md b/schemaregistry-avro/README.md new file mode 100644 index 000000000..0aa6cf663 --- /dev/null +++ b/schemaregistry-avro/README.md @@ -0,0 +1,113 @@ +# azure-schemaregistry-spark (WIP) + +## Overview + +Schema Registry support in Java is provided by the official Schema Registry SDK in the Azure Java SDK repository. + +Schema Registry serializer craft payloads that contain a schema ID and an encoded payload. The ID references a registry-stored schema that can be used to decode the user-specified payload. + +However, consuming Schema Registry-backed payloads in Spark is particularly difficult, since - +- Spark Kafka does not support plug-in with KafkaSerializer and KafkaDeserializer objects, and +- Object management is non-trivial given Spark's driver-executor model. + +For these reasons, Spark functions are required to simplify SR UX in Spark. This repository contains packages that will provide Spark support in Scala for serialization and deserialization of registry-backed payloads. + +Currently, only Avro encodings are supported by Azure Schema Registry clients. `from_avro` and `to_avro` found in the `functions.scala` file will be usable for converting Spark SQL columns from registry-backed payloads to columns of the correct Spark SQL datatype (e.g. `StringType`, `StructType`, etc.). + +## Usage + +Compile the JAR and build with dependencies using the following Maven command: +```bash +mvn clean compile assembly:assembly +``` + +The JAR can then be uploaded without additional required dependencies in your environment. If using `spark-submit`, use the `--jars` option to submit the path of the custom JAR. + +## Environment Support + +|Environment|Package Version| +|-------------|----------------| +|Databricks Runtime 10.X|azure-schemaregistry-spark-avro-1.0.0| +|Databricks Runtime 9.X|azure-schemaregistry-spark-avro-1.0.0| +|Databricks Runtime 8.X|azure-schemaregistry-spark-avro-1.0.0| +|Synapse Spark pool 3.1|azure-schemaregistry-spark-avro-1.0.0| + + +## Available API + +Both `from_avro` and `to_avro` functions can be used by either providing the schema GUID or the schema itself. Note that if you are providing the schema GUID it should be wrapped in a SchemaGUID object. +Below you can find more info about available APIs: + +```scala + /** + * @param data column with SR payloads + * @param schemaString The avro schema in JSON string format. + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) and schema exact match flag + */ + def from_avro( + data: Column, + schemaString: String, + clientOptions: java.util.Map[String, String]): Column + + /** + * @param data column with SR payloads + * @param schemaId The GUID of the expected schema. + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) and schema exact match flag + */ + def from_avro( + data: Column, + schemaId: SchemaGUID, + clientOptions: java.util.Map[String, String]): Column + + /** + * @param data the data column. + * @param schemaString The avro schema in JSON string format. + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) + */ + def to_avro(data: Column, + schemaString: String, + clientOptions: java.util.Map[java.lang.String, java.lang.String]): Column + + /** + * @param data the data column. + * @param schemaId The GUID of the expected schema + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) + */ + def to_avro(data: Column, + schemaId: SchemaGUID, + clientOptions: java.util.Map[java.lang.String, java.lang.String]): Column + +``` + +You can find examples of how to use the above APIs in + + +## Schema Evolution + +In the context of stream processing, the primary use case is where the schema GUID references a schema matching in the stream. + +However, there are two edge cases that will be common in streaming scenarios in which we are concerned with schema evolution: + * Backward compatibility: stream jobs reading old data with new schemas. + * Forward compatibility: stream jobs reading new data with old schemas. + +To handle these scenarios, we have introduced the `schema.exact.match.required` flag which can be set in the properties map: + * If true, the schema in the payload must exactly match the specified schema, otherwise the job throws an `IncompatibleSchemaException`. + * If false, the job will attempt to read the data incoming in the stream even if the payload schema is different from the specified schema. In this case: + * if the payload contains a field not present in the specified schema, the value for that field in the payload is ignored. + * if the specified schema contains a field not present in the payload, and the field has a default value, then the default value is added to the stream. + * if the specified schema contains a field not present in the payload, and the field does not have a default value, the job throws an `IncompatibleSchemaException`. + +Please note that the default value for the `schema.exact.match.required` flag is `false`. However, we suggest setting this flag to `true` in production jobs. + + +## Failure Modes + +Two modes will be supported as dictated by Spark SQL - +- `FailFastMode` - fail on catching any exception +- `PermissiveMode` - continue processing if parsing exceptions are caught + +You can configure the stream with specific failure mode using the `failure.mode` key in the properties map. The default failure mode is `FailFastMode` to prevent perceived data loss with `PermissiveMode`. + +See also: +- aka.ms/schemaregistry +- https://github.com/Azure/azure-schema-registry-for-kafka diff --git a/schemaregistry-avro/docs/PySpark/schema-registry-example-pyspark.md b/schemaregistry-avro/docs/PySpark/schema-registry-example-pyspark.md new file mode 100644 index 000000000..9a8755a4f --- /dev/null +++ b/schemaregistry-avro/docs/PySpark/schema-registry-example-pyspark.md @@ -0,0 +1,303 @@ +# Spark Streaming + Azure Schema Registry for PySpark + +This document provides producer and consumer examples using the schema registry in PySpark. +Similar to the [schema registry example for scala](../schema-registry-example.md), first you need to create a schema group with a schema +in a schema registry and register an application in Azure Active Directory (Azure AD). Please refer to [Register a Schema](../schema-registry-example.md#register-a-schema) +and [Azure Role-Based Access Control](../schema-registry-example.md#azure-role-based-access-control) sections for more information. + + +## Example Schema +We use the following schema in both producer and consumer examples in this document. +```json +{ + "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "description", + "type": "string" + } + ] +} +``` + +## Producer Examples +In order to send data to an eventhub using the schema registry in PySpark you need to: + * Create a wrapper `to_avro` function which works properly in PySpark. + * Use a property object which contains required information to connect to your schema registry. + * Create records matching the schema and serialize those using `to_avro` function. + * Send serialized bytes to the Eventhub instance. + +Please note that for accessing the schema registry the below information must be provided in the property map: + * The schema registry endpoint url, which should be set using the key "schema.registry.url" + * The tenant ID from the registered application, which should be set using the key "schema.registry.tenant.id" + * The client ID (application ID) from the registered application, which should be set using the key "schema.registry.client.id" + * The secret from the registered application, which should be set using the key "schema.registry.client.secret" + + +### Wrapper `to_avro` Function for PySpark +First, we need to define a wrapper function in order to access and use the `com.microsoft.azure.schemaregistry.spark.avro.functions.to_avro` +from JVM properly. + +```python +from pyspark.sql.column import Column, _to_java_column + +def to_avro(col, schemaObj, propMap): + jf = getattr(sc._jvm.com.microsoft.azure.schemaregistry.spark.avro.functions, "to_avro") + return Column(jf(_to_java_column(col), schemaObj, propMap)) +``` + +### Producer Example 1: Using `to_avro` with schema GUID +In order to serialize payloads using the schema GUID, you need to create a property object which contains the +required information to access your schema registry and pass the schema GUId to the `to_avro` function. + +#### Create a Schema Registry Object +```python +schemaRegistryURL = "http://.servicebus.windows.net" +schemaRegistryTenantID = "" +schemaRegistryClientID = "" +schemaRegistryClientSecret = "" + +properties = sc._jvm.java.util.HashMap() +properties.put("schema.registry.url", schemaRegistryURL) +properties.put("schema.registry.tenant.id", schemaRegistryTenantID) +properties.put("schema.registry.client.id", schemaRegistryClientID) +properties.put("schema.registry.client.secret", schemaRegistryClientSecret) +``` + +#### Create Records Matching the Schema and Serialize Those Using the Schema GUID +```python +from pyspark.sql.types import StructField, StructType, StringType, DoubleType +from pyspark.sql.functions import udf + +data = [("id1", 11.11, "order1"), ("id2", 22.22, "order2"), ("id3", 33.33, "order3")] +columns = ["id", "amount", "description"] +df = spark.sparkContext.parallelize(data).toDF(columns) + +df_schema = StructType([ + StructField('id', StringType(), nullable=False), + StructField('amount', DoubleType(), nullable=False), + StructField('description', StringType(), nullable=False) +]) + +def orderGen(rId, rAmount, rDescription): + return [rId, rAmount, rDescription] + +orderGenUDF = udf(lambda x, y, z : orderGen(x, y, z), df_schema) +dfRecords = df.withColumn("orders", orderGenUDF('id', 'amount', 'description')).drop("id").drop("amount").drop("description") + +schemaGUID = "" +schemaGuidObj = sc._jvm.com.microsoft.azure.schemaregistry.spark.avro.SchemaGUID(schemaGUID) +dfPayload = dfRecords.withColumn("body", to_avro(dfRecords.orders, schemaGuidObj, properties)).drop("orders") +``` + + +#### Send Data to Your Eventhub Instance +```python +connectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=;EntityPath=" + +ehConf = {} +# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted. +ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString) + +ds = dfPayload.select("body").write.format("eventhubs").options(**ehConf).save() +print("Sent!") +``` + + +### Producer Example 2: Using `to_avro` with Schema Definition +In order to serialize payloads using the schema definition, the property object requires two more values in addition to the required information to access your schema registry: + * The schema group where your schema has been registered, which should be set using the key "schema.group" + * The schema name of your registered schema, which should be set using the key "schema.name" + +Both schema group and schema name are needed to retrieve the unique schema GUID. Note that the schema GUID is being added to every payload so that all consumers know exactly which schema has been used to serialize the payload. + +If you want to use a new schema which has not been registered in your schema group, you need to enable the schema auto registry option by setting the `schema.auto.register.flag` to `true` in your property object. +The schema auto registry option simply registers a new schema under the schema group and name provided in the properties object if it cannot find the schema in the given schema group. +Using a new schema with disabled auto registry option results in an exception. Note that the schema auto registry option is off by default. + +Once you create the property map with all the required information, you can use the schema definition instead of the schema GUID in the `to_avro` function. + +#### Create a Schema Registry Object, Including Schema Group and Schema Name +```python +schemaRegistryURL = "http://.servicebus.windows.net" +schemaRegistryTenantID = "" +schemaRegistryClientID = "" +schemaRegistryClientSecret = "" +schemaGroup = "" +schemaName = "" + +properties = sc._jvm.java.util.HashMap() +properties.put("schema.registry.url", schemaRegistryURL) +properties.put("schema.registry.tenant.id", schemaRegistryTenantID) +properties.put("schema.registry.client.id", schemaRegistryClientID) +properties.put("schema.registry.client.secret", schemaRegistryClientSecret) +properties.put("schema.group", schemaGroup) +properties.put("schema.name", schemaName) +``` + +#### Create Records Matching the Schema and Serialize Those Using the Schema Definition +```python +from pyspark.sql.types import StructField, StructType, StringType, DoubleType +from pyspark.sql.functions import udf + +data = [("id1", 11.11, "order1"), ("id2", 22.22, "order2"), ("id3", 33.33, "order3")] +columns = ["id", "amount", "description"] +df = spark.sparkContext.parallelize(data).toDF(columns) + +df_schema = StructType([ + StructField('id', StringType(), nullable=False), + StructField('amount', DoubleType(), nullable=False), + StructField('description', StringType(), nullable=False) +]) + +def orderGen(rId, rAmount, rDescription): + return [rId, rAmount, rDescription] + +orderGenUDF = udf(lambda x, y, z : orderGen(x, y, z), df_schema) +dfRecords = df.withColumn("orders", orderGenUDF('id', 'amount', 'description')).drop("id").drop("amount").drop("description") + +schemaString = """ +{ + "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "description", + "type": "string" + } + ] +} +""" +dfPayload = dfRecords.withColumn("body", to_avro(dfRecords.orders, schemaString, properties)).drop("orders") +``` + +Finally, you can send the payloads in the `dfAvro` to your Eventhub instance using the sample code provided in the [Send Data to Your Eventhub Instance](#send-data-to-your-eventhub-instance) subsection under the producer example 1. + + +## Consumer Examples +We can perform the following steps to pull data from an Eventhub instance and parse it with respect to a schema from the schema registry: + * Create a wrapper `from_avro` function which works properly in PySpark. + * Pull data from the Eventhub instance using azure-event-hubs-spark connector. + * Use a property object which contains required information to connect to your schema registry. + * Deserialize the data using `from_avro` function. + +Please refer to the [Producer Examples](#producer-examples) section for the required information in the property map. + + +### Wrapper `to_avro` Function for PySpark +First, we need to define a wrapper function in order to access and use the `com.microsoft.azure.schemaregistry.spark.avro.functions.to_avro` +from JVM properly. + +```python +from pyspark.sql.column import Column, _to_java_column + +def from_avro(col, schemaObj, propMap): + jf = getattr(sc._jvm.com.microsoft.azure.schemaregistry.spark.avro.functions, "from_avro") + return Column(jf(_to_java_column(col), schemaObj, propMap)) +``` + +### Consumer Example 1: Using `from_avro` with Schema GUID + +#### Pull Data +```python +import json + +connectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=;EntityPath=" + +ehConf = {} +# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted. +ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString) + +# Create the positions +startingEventPosition = { + "offset": "-1", + "seqNo": -1, #not in use + "enqueuedTime": None, #not in use + "isInclusive": True +} +# Put the positions into the Event Hub config dictionary +ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition) + +df = spark.read.format("eventhubs").options(**ehConf).load() +``` + +#### Create a Schema Registry Object +In case you want to set either `schema.exact.match.required` or `failure.mode` options, you should set their corresponding values in the property map. +For more information about these options please refer to the [schema registry README](../../README.md) file. + +```python +schemaRegistryURL = "http://.servicebus.windows.net" +schemaRegistryTenantID = "" +schemaRegistryClientID = "" +schemaRegistryClientSecret = "" + +properties = sc._jvm.java.util.HashMap() +properties.put("schema.registry.url", schemaRegistryURL) +properties.put("schema.registry.tenant.id", schemaRegistryTenantID) +properties.put("schema.registry.client.id", schemaRegistryClientID) +properties.put("schema.registry.client.secret", schemaRegistryClientSecret) +#optional: in case you want to enable the exact schema match option, you should set the "schema.exact.match.required" to "true" in the property map +#properties.put("schema.exact.match.required", "true") +``` + +#### Deserialize Data +```python +schemaGUID = "" +schemaGuidObj = sc._jvm.com.microsoft.azure.schemaregistry.spark.avro.SchemaGUID(schemaGUID) +parsed_df = df.withColumn("jsondata", from_avro(df.body, schemaGuidObj, properties)).select("jsondata") + +ds = parsed_df.select("jsondata.id", "jsondata.amount", "jsondata.description").write.format("console").save() +``` + +### Consumer Example 2: Using `from_avro` with Schema Definition + +Using `from_avro` with schema definition is very similar to using it with the schema GUID. The first two steps of (I) pulling data from the Eventhub instance and (II) creating a property map +are exactly the same as steps [Pull Data](#pull-data) and [Create a Schema Registry Object](#create-a-schema-registry-object) in the consumer example 1, respectively. +The only difference is when you use `from_avro` to deserialize the data where you should pass the schema definition instead of the schema GUID. + +#### Deserialize Data +```python +schemaString = """ +{ + "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "description", + "type": "string" + } + ] +} +""" +parsed_df = df.withColumn("jsondata", from_avro(df.body, schemaString, properties)).select("jsondata") + +ds = parsed_df.select("jsondata.id", "jsondata.amount", "jsondata.description").write.format("console").save() +``` diff --git a/schemaregistry-avro/docs/schema-registry-example.md b/schemaregistry-avro/docs/schema-registry-example.md new file mode 100644 index 000000000..6b914df38 --- /dev/null +++ b/schemaregistry-avro/docs/schema-registry-example.md @@ -0,0 +1,299 @@ +# Spark Streaming + Azure Schema Registry + +## Register a Schema + +First, you need to create a schema group with a schema in a schema registry hosted by Azure Event Hubs. Please refer to [Create an Event Hubs schema registry using the Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/create-schema-registry) for detailed instructions. + +In this example, we use the following schema. Please follow the steps in the link above and create the below schema in your schema group. Please note down the *Schema GUID* to use in the producer/consumer code later. +```json +{ + "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "description", + "type": "string" + } + ] +} +``` + +### Azure Role-Based Access Control + +In order to be able to access the schema registry programmatically, you need to register an application in Azure Active Directory (Azure AD) and add the security principal of the application to one of the Azure role-based access control (Azure RBAC) roles mentioned in [Azure role-based access control](https://docs.microsoft.com/en-us/azure/event-hubs/schema-registry-overview#azure-role-based-access-control) section in the schema registery overview page. +Also, you can refer to [Register an app with Azure AD](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app) for instructions on registering an application using the Azure portal. + +Please make sure to note down the client ID (application ID), tenant ID, and the secret to use in the code. + +## Producer Examples + +In order to send data to an eventhub using the schema registry you need to: + * Use a property object which contains required information to connect to your schema registry. + * Create records matching the schema and serialize those using `to_avro` function defined in azure-schemaregistry-spark-avro. + * Send serialized bytes to the Eventhub instance. + +Please note that for accessing the schema registry the below information must be provided in the property map: + * The schema registry endpoint url, which should be set using the key "schema.registry.url" + * The tenant ID from the registered application, which should be set using the key "schema.registry.tenant.id" + * The client ID (application ID) from the registered application, which should be set using the key "schema.registry.client.id" + * The secret from the registered application, which should be set using the key "schema.registry.client.secret" + + +### Producer Example 1: Using `to_avro` with schema GUID + +In order to serialize payloads using the schema GUID, you need to create a property object which contains the required information to access your schema registry and pass the schema GUId to the `to_avro` function. + +#### Create a Schema Registry Object +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._ +import java.util._ + +val schemaRegistryURL = "http://.servicebus.windows.net" +val schemaRegistryTenantID = "" +val schemaRegistryClientID = "" +val schemaRegistryClientSecret = "" + +val props: HashMap[String, String] = new HashMap() + props.put(SCHEMA_REGISTRY_URL, schemaRegistryURL) + props.put(SCHEMA_REGISTRY_TENANT_ID_KEY, schemaRegistryTenantID) + props.put(SCHEMA_REGISTRY_CLIENT_ID_KEY, schemaRegistryClientID) + props.put(SCHEMA_REGISTRY_CLIENT_SECRET_KEY, schemaRegistryClientSecret) +``` + +#### Create Records Matching the Schema and Serialize Those Using the Schema GUID +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._ +import com.microsoft.azure.schemaregistry.spark.avro.SchemaGUID +import org.apache.spark.sql.functions.{col, udf} +import spark.sqlContext.implicits._ + +case class Order(id: String, amount: Double, description: String) +val makeOrderRecord = udf((id: String, amount: Double, description: String) => Order(id, amount, description)) + +val data = Seq(("id1", 11.11, "order1"), ("id2", 22.22, "order2"), ("id3", 33.33, "order3")) +val df = data.toDF("id", "amount", "description") + +val dfRecord = df. + withColumn("orders", makeOrderRecord(col("id"), col("amount"), col("description"))). + drop("id"). + drop("amount"). + drop("description") + +val schemaGUIDString: String = "" +val dfAvro = dfRecord.select(to_avro($"record", SchemaGUID(schemaGUIDString), props) as "body") +``` + +#### Send Data to Your Eventhub Instance +```scala +import org.apache.spark.eventhubs._ + +val connectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=;EntityPath=" +val ehConf = EventHubsConf(connectionString) + +dfAvro. + write. + format("eventhubs"). + options(ehConf.toMap). + save() + +print("Sent!") +``` + +### Producer Example 2: Using `to_avro` with Schema Definition + +In order to serialize payloads using the schema definition, the property object requires two more values in addition to the required information to access your schema registry: + * The schema group where your schema has been registered, which should be set using the key "schema.group" + * The schema name of your registered schema, which should be set using the key "schema.name" + +Both schema group and schema name are needed to retrieve the unique schema GUID. Note that the schema GUID is being added to every payload so that all consumers know exactly which schema has been used to serialize the payload. + +If you want to use a new schema which has not been registered in your schema group, you need to enable the schema auto registry option by setting the `schema.auto.register.flag` to `true` in your property object. +The schema auto registry option simply registers a new schema under the schema group and name provided in the properties object if it cannot find the schema in the given schema group. +Using a new schema with disabled auto registry option results in an exception. Note that the schema auto registry option is off by default. + +Once you create the property map with all the required information, you can use the schema definition instead of the schema GUID in the `to_avro` function. + +#### Create a Schema Registry Object, Including Schema Group and Schema Name +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._ +import java.util._ + +val schemaRegistryURL = "http://.servicebus.windows.net" +val schemaRegistryTenantID = "" +val schemaRegistryClientID = "" +val schemaRegistryClientSecret = "" +val schemaGroup = "" +val schemaName = "" + +val props: HashMap[String, String] = new HashMap() + props.put(SCHEMA_REGISTRY_URL, schemaRegistryURL) + props.put(SCHEMA_REGISTRY_TENANT_ID_KEY, schemaRegistryTenantID) + props.put(SCHEMA_REGISTRY_CLIENT_ID_KEY, schemaRegistryClientID) + props.put(SCHEMA_REGISTRY_CLIENT_SECRET_KEY, schemaRegistryClientSecret) + props.put(SCHEMA_GROUP_KEY, schemaGroup) + props.put(SCHEMA_GROUP_KEY, schemaName) + // optional: in case you want to enable the schema auto registry, you should set the "schema.auto.register.flag" to "true" in the property map + // props.put(SCHEMA_AUTO_REGISTER_FLAG_KEY, "true") +``` + +#### Create Records Matching the Schema and Serialize Those Using the Schema Definition +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._ +import org.apache.spark.sql.functions.{col, udf} +import spark.sqlContext.implicits._ + +case class Order(id: String, amount: Double, description: String) +val makeOrderRecord = udf((id: String, amount: Double, description: String) => Order(id, amount, description)) + +val data = Seq(("id1", 11.11, "order1"), ("id2", 22.22, "order2"), ("id3", 33.33, "order3")) +val df = data.toDF("id", "amount", "description") + +val dfRecord = df. + withColumn("orders", makeOrderRecord(col("id"), col("amount"), col("description"))). + drop("id"). + drop("amount"). + drop("description") + +val schemaString = """ +{ + "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "description", + "type": "string" + } + ] +} + """ + +val dfAvro = dfRecord.select(to_avro($"record", schemaString, props) as "body") +``` + +Finally, you can send the payloads in the `dfAvro` to your Eventhub instance using the sample code provided in the [Send Data to Your Eventhub Instance](#send-data-to-your-eventhub-instance) subsection under the producer example 1. + + +## Consumer Examples + +We can perform the following steps to pull data from an Eventhub instance and parse it with respect to a schema from the schema registry: + * Pull data from the Eventhub instance using azure-event-hubs-spark connector. + * Use a property object which contains required information to connect to your schema registry. + * Deserialize the data using `from_avro` function defined in azure-schemaregistry-spark-avro. + +Please refer to the [Producer Examples](#producer-examples) section for the required information in the property map. + + +### Consumer Example 1: Using `from_avro` with Schema GUID + +#### Pull Data +```scala +import org.apache.spark.eventhubs._ + +val connectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=;EntityPath=" + +val ehConf = EventHubsConf(connectionString). + setStartingPosition(EventPosition.fromStartOfStream) + +val df = spark. + readStream. + format("eventhubs"). + options(ehConf.toMap). + load() +``` + +#### Create a Schema Registry Object + +In case you want to set either `schema.exact.match.required` or `failure.mode` options, you should set their corresponding values in the property map. +For more information about these options please refer to the [schema registry README](../README.md) file. + +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._ +import java.util._ + +val schemaRegistryURL = "http://.servicebus.windows.net" +val schemaRegistryTenantID = "" +val schemaRegistryClientID = "" +val schemaRegistryClientSecret = "" + +val props: HashMap[String, String] = new HashMap() + props.put(SCHEMA_REGISTRY_URL, schemaRegistryURL) + props.put(SCHEMA_REGISTRY_TENANT_ID_KEY, schemaRegistryTenantID) + props.put(SCHEMA_REGISTRY_CLIENT_ID_KEY, schemaRegistryClientID) + props.put(SCHEMA_REGISTRY_CLIENT_SECRET_KEY, schemaRegistryClientSecret) + // optional: in case you want to enable the exact schema match option, you should set the "schema.exact.match.required" to "true" in the property map + // props.put(SCHEMA_EXACT_MATCH_REQUIRED, "true") +``` + +#### Deserialize Data +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._ +import com.microsoft.azure.schemaregistry.spark.avro.SchemaGUID + +val schemaGUIDString = "" +val parsed_df = df.select(from_avro($"body", SchemaGUID(schemaGUIDString), props) as "jsondata") + +val query = parsed_df. + select($"jsondata.id", $"jsondata.amount", $"jsondata.description"). + writeStream. + format("console"). + start() +``` + +### Consumer Example 2: Using `from_avro` with Schema Definition + +Using `from_avro` with schema definition is very similar to using it with the schema GUID. The first two steps of (I) pulling data from the Eventhub instance and (II) creating a property map +are exactly the same as steps [Pull Data](#pull-data) and [Create a Schema Registry Object](#create-a-schema-registry-object) in the consumer example 1, respectively. +The only difference is when you use `from_avro` to deserialize the data where you should pass the schema definition instead of the schema GUID. + +#### Deserialize Data +```scala +import com.microsoft.azure.schemaregistry.spark.avro.functions._; + +val schemaString = """ +{ + "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "double" + }, + { + "name": "description", + "type": "string" + } + ] +} + """ + +val parsed_df = df.select(from_avro($"body", schemaString, props) as "jsondata") + +val query = parsed_df. + select($"jsondata.id", $"jsondata.amount", $"jsondata.description"). + writeStream. + format("console"). + start() +``` \ No newline at end of file diff --git a/schemaregistry-avro/jar-with-dependencies.xml b/schemaregistry-avro/jar-with-dependencies.xml new file mode 100644 index 000000000..a4795ee95 --- /dev/null +++ b/schemaregistry-avro/jar-with-dependencies.xml @@ -0,0 +1,13 @@ + + jar-with-dependencies-and-exclude-classes + + jar + + + + + org.slf4j:slf4j-log4j12 + + + + \ No newline at end of file diff --git a/schemaregistry-avro/pom.xml b/schemaregistry-avro/pom.xml new file mode 100644 index 000000000..08dd947f5 --- /dev/null +++ b/schemaregistry-avro/pom.xml @@ -0,0 +1,206 @@ + + 4.0.0 + com.microsoft.azure + azure-schemaregistry-spark-avro + 1.0.0 + Azure Schema Registry Spark Plugin + Azure Schema Registry support for Spark + 2021 + + + + 1.8 + 1.8 + UTF-8 + 2.12 + 3.0.1 + 4.2.0 + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.project.version} + test-jar + test + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.project.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.project.version} + test-jar + test + + + com.azure + azure-data-schemaregistry + 1.0.0 + + + com.azure + azure-data-schemaregistry-avro + 1.0.0-beta.5 + + + com.azure + azure-identity + 1.4.1 + + + org.slf4j + slf4j-simple + 1.7.29 + + + com.fasterxml.jackson.core + jackson-annotations + 2.10.0 + + + com.fasterxml.jackson.core + jackson-core + 2.10.0 + + + com.fasterxml.jackson.core + jackson-databind + 2.10.0 + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + 2.10.0 + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.10.0 + + + com.azure + azure-core + 1.22.0 + + + + + + junit + junit + 4.12 + test + + + org.scalatest + scalatest_${scala.binary.version} + 3.0.5 + test + + + org.specs2 + specs2-core_${scala.binary.version} + ${spec2.version} + test + + + org.specs2 + specs2-junit_${scala.binary.version} + ${spec2.version} + test + + + + com.google.guava + guava + 23.6-jre + + + + + src/main/scala + src/test/scala + + + + net.alchim31.maven + scala-maven-plugin + 3.3.2 + + + + compile + testCompile + + + + -dependencyfile + ${project.build.directory}/.scala_dependencies + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.21.0 + + + true + + + + org.scalatest + scalatest-maven-plugin + 2.0.0 + + ${project.build.directory}/surefire-reports + . + TestSuiteReport.txt + + + + + + test + + test + + + + + + maven-assembly-plugin + + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + fully.qualified.MainClass + + + + jar-with-dependencies + + + + + + \ No newline at end of file diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroDataToCatalyst.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroDataToCatalyst.scala new file mode 100644 index 000000000..17e49ab08 --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroDataToCatalyst.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import java.io.ByteArrayInputStream +import java.nio.charset.StandardCharsets + +import com.azure.core.util.serializer.TypeReference +import com.azure.data.schemaregistry.SchemaRegistryClientBuilder +import com.azure.data.schemaregistry.avro.{SchemaRegistryAvroSerializerBuilder} +import com.azure.identity.ClientSecretCredentialBuilder +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +import scala.util.control.NonFatal +import scala.util.Try +import org.apache.spark.internal.Logging + +case class AvroDataToCatalyst( + child: Expression, + schemaId: String, + schemaDefinition: String, + options: Map[java.lang.String, java.lang.String]) + extends UnaryExpression with ExpectsInputTypes with Logging { + + override def inputTypes: Seq[BinaryType] = Seq(BinaryType) + + override lazy val dataType: DataType = { + val dt = SchemaConverters.toSqlType(schemaReader.expectedSchema).dataType; + dt + } + + override def nullable: Boolean = true + + @transient private lazy val schemaReader = SchemaReader.createSchemaReader(schemaId, schemaDefinition, options) + + @transient private lazy val avroConverter = { + new AvroDeserializer(schemaReader.expectedSchema, dataType) + } + + @transient private lazy val requireExactSchemaMatch: Boolean = { + val requiredExactMatchStr: String = options.getOrElse(functions.SCHEMA_EXACT_MATCH_REQUIRED, "false") + Try(requiredExactMatchStr.toLowerCase.toBoolean).getOrElse(false) + } + + @transient private lazy val parseMode: ParseMode = { + val modeStr = schemaReader.options.getOrElse(functions.SCHEMA_PARSE_MODE, FailFastMode.name) + val mode = ParseMode.fromString(modeStr) + if (mode != PermissiveMode && mode != FailFastMode) { + throw new IllegalArgumentException(mode + " parse mode not supported.") + } + mode + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + for(i <- 0 until st.length) { + resultRow.setNullAt(i) + } + resultRow + + case _ => + null + } + + override def nullSafeEval(input: Any): Any = { + val binary = new ByteArrayInputStream(input.asInstanceOf[Array[Byte]]) + // compare schema version and datatype version + val genericRecord = schemaReader.serializer.deserialize(binary, TypeReference.createInstance(classOf[GenericRecord])) + + if (requireExactSchemaMatch) { + if (!schemaReader.expectedSchema.equals(genericRecord.getSchema)) { + throw new IncompatibleSchemaException(s"Schema not exact match, payload schema did not match expected schema. Payload schema: ${genericRecord.getSchema}") + } + } + + try { + avroConverter.deserialize(genericRecord) + } catch { + case NonFatal(e) => parseMode match { + case PermissiveMode => //nullResultRow + throw new Exception(s"nave Permissive --> error message is $e") + case FailFastMode => + throw new Exception("Malformed records are detected in record parsing. " + + s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + + "result, try setting the option 'mode' as 'PERMISSIVE'.", e) + case _ => + throw new Exception(s"Unknown parse mode: ${parseMode.name}") + } + } + } + + override def prettyName: String = "from_avro" + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expr = ctx.addReferenceObj("this", this) + defineCodeGen(ctx, ev, input => + s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + } + +} \ No newline at end of file diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroDeserializer.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroDeserializer.scala new file mode 100644 index 000000000..6d2884f59 --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroDeserializer.scala @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import java.math.BigDecimal +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} +import org.apache.avro.Conversions.DecimalConversion +import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic._ +import org.apache.avro.util.Utf8 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData, DateTimeConstants} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A deserializer to deserialize data in avro format to data in catalyst format. + */ +class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { + private lazy val decimalConversions = new DecimalConversion() + + private val converter: Any => Any = rootCatalystType match { + // A shortcut for empty schema. + case st: StructType if st.isEmpty => + (data: Any) => InternalRow.empty + + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + val fieldUpdater = new RowUpdater(resultRow) + val writer = getRecordWriter(rootAvroType, st, Nil) + (data: Any) => { + val record = data.asInstanceOf[GenericRecord] + writer(fieldUpdater, record) + resultRow + } + + case _ => + val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) + val fieldUpdater = new RowUpdater(tmpRow) + val writer = newWriter(rootAvroType, rootCatalystType, Nil) + (data: Any) => { + writer(fieldUpdater, 0, data) + tmpRow.get(0, rootCatalystType) + } + } + + def deserialize(data: Any): Any = converter(data) + + /** + * Creates a writer to write avro values to Catalyst values at the given ordinal with the given + * updater. + */ + private def newWriter( + avroType: Schema, + catalystType: DataType, + path: List[String]): (CatalystDataUpdater, Int, Any) => Unit = + (avroType.getType, catalystType) match { + case (NULL, NullType) => (updater, ordinal, _) => + updater.setNullAt(ordinal) + + // TODO: we can avoid boxing if future version of avro provide primitive accessors. + case (BOOLEAN, BooleanType) => (updater, ordinal, value) => + updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) + + case (INT, IntegerType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + + case (INT, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + + case (LONG, LongType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + + case (LONG, TimestampType) => avroType.getLogicalType match { + case _: TimestampMillis => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + case _: TimestampMicros => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + case null => (updater, ordinal, value) => + // For backward compatibility, if the Avro type is Long and it is not logical type, + // the value is processed as timestamp type with millisecond precision. + updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + case other => throw new IncompatibleSchemaException( + s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") + } + + // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date. + // For backward compatibility, we still keep this conversion. + case (LONG, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeConstants.MILLIS_PER_DAY /*DateTimeUtils.MILLIS_PER_DAY*/).toInt) + + case (FLOAT, FloatType) => (updater, ordinal, value) => + updater.setFloat(ordinal, value.asInstanceOf[Float]) + + case (DOUBLE, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Double]) + + case (STRING, StringType) => (updater, ordinal, value) => + val str = value match { + case s: String => UTF8String.fromString(s) + case s: Utf8 => + val bytes = new Array[Byte](s.getByteLength) + System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength) + UTF8String.fromBytes(bytes) + } + updater.set(ordinal, str) + + case (ENUM, StringType) => (updater, ordinal, value) => + updater.set(ordinal, UTF8String.fromString(value.toString)) + + case (FIXED, BinaryType) => (updater, ordinal, value) => + updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone()) + + case (BYTES, BinaryType) => (updater, ordinal, value) => + val bytes = value match { + case b: ByteBuffer => + val bytes = new Array[Byte](b.remaining) + b.get(bytes) + bytes + case b: Array[Byte] => b + case other => throw new RuntimeException(s"$other is not a valid avro binary.") + } + updater.set(ordinal, bytes) + + case (FIXED, d: DecimalType) => (updater, ordinal, value) => + val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, + LogicalTypes.decimal(d.precision, d.scale)) + val decimal = createDecimal(bigDecimal, d.precision, d.scale) + updater.setDecimal(ordinal, decimal) + + case (BYTES, d: DecimalType) => (updater, ordinal, value) => + val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, + LogicalTypes.decimal(d.precision, d.scale)) + val decimal = createDecimal(bigDecimal, d.precision, d.scale) + updater.setDecimal(ordinal, decimal) + + case (RECORD, st: StructType) => + val writeRecord = getRecordWriter(avroType, st, path) + (updater, ordinal, value) => + val row = new SpecificInternalRow(st) + writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) + updater.set(ordinal, row) + + case (ARRAY, ArrayType(elementType, containsNull)) => + val elementWriter = newWriter(avroType.getElementType, elementType, path) + (updater, ordinal, value) => + val array = value.asInstanceOf[GenericData.Array[Any]] + val len = array.size() + val result = createArrayData(elementType, len) + val elementUpdater = new ArrayDataUpdater(result) + + var i = 0 + while (i < len) { + val element = array.get(i) + if (element == null) { + if (!containsNull) { + throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + elementUpdater.setNullAt(i) + } + } else { + elementWriter(elementUpdater, i, element) + } + i += 1 + } + + updater.set(ordinal, result) + + case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType => + val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType, path) + val valueWriter = newWriter(avroType.getValueType, valueType, path) + (updater, ordinal, value) => + val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]] + val keyArray = createArrayData(keyType, map.size()) + val keyUpdater = new ArrayDataUpdater(keyArray) + val valueArray = createArrayData(valueType, map.size()) + val valueUpdater = new ArrayDataUpdater(valueArray) + val iter = map.entrySet().iterator() + var i = 0 + while (iter.hasNext) { + val entry = iter.next() + assert(entry.getKey != null) + keyWriter(keyUpdater, i, entry.getKey) + if (entry.getValue == null) { + if (!valueContainsNull) { + throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + valueUpdater.setNullAt(i) + } + } else { + valueWriter(valueUpdater, i, entry.getValue) + } + i += 1 + } + + updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray)) + + case (UNION, _) => + val allTypes = avroType.getTypes.asScala + val nonNullTypes = allTypes.filter(_.getType != NULL) + val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava) + if (nonNullTypes.nonEmpty) { + if (nonNullTypes.length == 1) { + newWriter(nonNullTypes.head, catalystType, path) + } else { + nonNullTypes.map(_.getType) match { + case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType => + (updater, ordinal, value) => value match { + case null => updater.setNullAt(ordinal) + case l: java.lang.Long => updater.setLong(ordinal, l) + case i: java.lang.Integer => updater.setLong(ordinal, i.longValue()) + } + + case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && catalystType == DoubleType => + (updater, ordinal, value) => value match { + case null => updater.setNullAt(ordinal) + case d: java.lang.Double => updater.setDouble(ordinal, d) + case f: java.lang.Float => updater.setDouble(ordinal, f.doubleValue()) + } + + case _ => + catalystType match { + case st: StructType if st.length == nonNullTypes.size => + val fieldWriters = nonNullTypes.zip(st.fields).map { + case (schema, field) => newWriter(schema, field.dataType, path :+ field.name) + }.toArray + (updater, ordinal, value) => { + val row = new SpecificInternalRow(st) + val fieldUpdater = new RowUpdater(row) + val i = GenericData.get().resolveUnion(nonNullAvroType, value) + fieldWriters(i)(fieldUpdater, i, value) + updater.set(ordinal, row) + } + + case _ => + throw new IncompatibleSchemaException( + s"Cannot convert Avro to catalyst because schema at path " + + s"${path.mkString(".")} is not compatible " + + s"(avroType = $avroType, sqlType = $catalystType).\n" + + s"Source Avro schema: $rootAvroType.\n" + + s"Target Catalyst type: $rootCatalystType") + } + } + } + } else { + (updater, ordinal, value) => updater.setNullAt(ordinal) + } + + case _ => + throw new IncompatibleSchemaException( + s"Cannot convert Avro to catalyst because schema at path ${path.mkString(".")} " + + s"is not compatible (avroType = $avroType, sqlType = $catalystType).\n" + + s"Source Avro schema: $rootAvroType.\n" + + s"Target Catalyst type: $rootCatalystType") + } + + // TODO: move the following method in Decimal object on creating Decimal from BigDecimal? + private def createDecimal(decimal: BigDecimal, precision: Int, scale: Int): Decimal = { + if (precision <= Decimal.MAX_LONG_DIGITS) { + // Constructs a `Decimal` with an unscaled `Long` value if possible. + Decimal(decimal.unscaledValue().longValue(), precision, scale) + } else { + // Otherwise, resorts to an unscaled `BigInteger` instead. + Decimal(decimal, precision, scale) + } + } + + private def getRecordWriter( + avroType: Schema, + sqlType: StructType, + path: List[String]): (CatalystDataUpdater, GenericRecord) => Unit = { + val validFieldNames = ArrayBuffer.empty[String] + val fieldWritersMap = scala.collection.mutable.Map[String, (CatalystDataUpdater, Any) => Unit]() + val defaultValues = scala.collection.mutable.Map[String, Object]() + + val length = sqlType.length + var i = 0 + while (i < length) { + val sqlField = sqlType.fields(i) + val avroField = avroType.getField(sqlField.name) + if (avroField != null) { + validFieldNames += sqlField.name + if(avroField.defaultVal != null) { + defaultValues(sqlField.name) = avroField.defaultVal + } + + val baseWriter = newWriter(avroField.schema(), sqlField.dataType, path :+ sqlField.name) + val ordinal = i + val fieldWriter = (fieldUpdater: CatalystDataUpdater, value: Any) => { + if (value == null) { + fieldUpdater.setNullAt(ordinal) + } else { + baseWriter(fieldUpdater, ordinal, value) + } + } + fieldWritersMap(sqlField.name) = fieldWriter + } else if (!sqlField.nullable) { + throw new IncompatibleSchemaException( + s""" + |Cannot find non-nullable field ${path.mkString(".")}.${sqlField.name} in Avro schema. + |Source Avro schema: $rootAvroType. + |Target Catalyst type: $rootCatalystType. + """.stripMargin) + } + i += 1 + } + + (fieldUpdater, record) => { + var i = 0 + while (i < validFieldNames.length) { + var valueObj : Object = record.get(validFieldNames(i)) + + if(valueObj == null) { + if(defaultValues.contains(validFieldNames(i))) { + valueObj = defaultValues(validFieldNames(i)) + } + else { + throw new IncompatibleSchemaException(s"The schema = $rootAvroType contains non-optional field " + + s"${validFieldNames(i)} (i.e without a deafult value) that does not exist in the record producer schema.") + } + } + + fieldWritersMap(validFieldNames(i))(fieldUpdater, valueObj) + i += 1 + } + } + } + + private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match { + case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length)) + case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length)) + case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length)) + case IntegerType => UnsafeArrayData.fromPrimitiveArray(new Array[Int](length)) + case LongType => UnsafeArrayData.fromPrimitiveArray(new Array[Long](length)) + case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length)) + case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length)) + case _ => new GenericArrayData(new Array[Any](length)) + } + + /** + * A base interface for updating values inside catalyst data structure like `InternalRow` and + * `ArrayData`. + */ + sealed trait CatalystDataUpdater { + def set(ordinal: Int, value: Any): Unit + + def setNullAt(ordinal: Int): Unit = set(ordinal, null) + def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value) + def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value) + def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value) + def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value) + def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value) + def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value) + def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value) + def setDecimal(ordinal: Int, value: Decimal): Unit = set(ordinal, value) + } + + final class RowUpdater(row: InternalRow) extends CatalystDataUpdater { + override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, value) + + override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal) + override def setBoolean(ordinal: Int, value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(ordinal: Int, value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(ordinal: Int, value: Short): Unit = row.setShort(ordinal, value) + override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, value) + override def setLong(ordinal: Int, value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(ordinal: Int, value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(ordinal: Int, value: Float): Unit = row.setFloat(ordinal, value) + override def setDecimal(ordinal: Int, value: Decimal): Unit = + row.setDecimal(ordinal, value, value.precision) + } + + final class ArrayDataUpdater(array: ArrayData) extends CatalystDataUpdater { + override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, value) + + override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal) + override def setBoolean(ordinal: Int, value: Boolean): Unit = array.setBoolean(ordinal, value) + override def setByte(ordinal: Int, value: Byte): Unit = array.setByte(ordinal, value) + override def setShort(ordinal: Int, value: Short): Unit = array.setShort(ordinal, value) + override def setInt(ordinal: Int, value: Int): Unit = array.setInt(ordinal, value) + override def setLong(ordinal: Int, value: Long): Unit = array.setLong(ordinal, value) + override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value) + override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value) + override def setDecimal(ordinal: Int, value: Decimal): Unit = array.update(ordinal, value) + } +} \ No newline at end of file diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroSerializer.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroSerializer.scala new file mode 100644 index 000000000..e536a40c8 --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroSerializer.scala @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ + +import org.apache.avro.Conversions.DecimalConversion +import org.apache.avro.LogicalTypes +import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} +import org.apache.avro.Schema +import org.apache.avro.Schema.Type +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} +import org.apache.avro.generic.GenericData.Record +import org.apache.avro.util.Utf8 + +import org.apache.spark.internal.Logging +import com.microsoft.azure.schemaregistry.spark.avro.AvroUtils.{toFieldStr, AvroMatchedField} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.types._ + +/** + * A serializer to serialize data in catalyst format to data in avro format. + */ +class AvroSerializer( + rootCatalystType: DataType, + rootAvroType: Schema, + nullable: Boolean) + extends Logging { + + def serialize(catalystData: Any): Any = { + converter.apply(catalystData) + } + + private val converter: Any => Any = { + val actualAvroType = resolveNullableType(rootAvroType, nullable) + val baseConverter = try { + rootCatalystType match { + case st: StructType => + newStructConverter(st, actualAvroType, Nil, Nil).asInstanceOf[Any => Any] + case _ => + val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) + val converter = newConverter(rootCatalystType, actualAvroType, Nil, Nil) + (data: Any) => + tmpRow.update(0, data) + converter.apply(tmpRow, 0) + } + } catch { + case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( + s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise) + } + if (nullable) { + (data: Any) => + if (data == null) { + null + } else { + baseConverter.apply(data) + } + } else { + baseConverter + } + } + + private type Converter = (SpecializedGetters, Int) => Any + + private lazy val decimalConversions = new DecimalConversion() + + private def newConverter( + catalystType: DataType, + avroType: Schema, + catalystPath: Seq[String], + avroPath: Seq[String]): Converter = { + val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " + + s"to Avro ${toFieldStr(avroPath)} because " + (catalystType, avroType.getType) match { + case (NullType, NULL) => + (getter, ordinal) => null + case (BooleanType, BOOLEAN) => + (getter, ordinal) => getter.getBoolean(ordinal) + case (ByteType, INT) => + (getter, ordinal) => getter.getByte(ordinal).toInt + case (ShortType, INT) => + (getter, ordinal) => getter.getShort(ordinal).toInt + case (IntegerType, INT) => + (getter, ordinal) => getter.getInt(ordinal) + case (LongType, LONG) => + (getter, ordinal) => getter.getLong(ordinal) + case (FloatType, FLOAT) => + (getter, ordinal) => getter.getFloat(ordinal) + case (DoubleType, DOUBLE) => + (getter, ordinal) => getter.getDouble(ordinal) + case (d: DecimalType, FIXED) + if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) => + (getter, ordinal) => + val decimal = getter.getDecimal(ordinal, d.precision, d.scale) + decimalConversions.toFixed(decimal.toJavaBigDecimal, avroType, + LogicalTypes.decimal(d.precision, d.scale)) + + case (d: DecimalType, BYTES) + if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) => + (getter, ordinal) => + val decimal = getter.getDecimal(ordinal, d.precision, d.scale) + decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType, + LogicalTypes.decimal(d.precision, d.scale)) + + case (StringType, ENUM) => + val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet + (getter, ordinal) => + val data = getter.getUTF8String(ordinal).toString + if (!enumSymbols.contains(data)) { + throw new IncompatibleSchemaException(errorPrefix + + s""""$data" cannot be written since it's not defined in enum """ + + enumSymbols.mkString("\"", "\", \"", "\"")) + } + new EnumSymbol(avroType, data) + + case (StringType, STRING) => + (getter, ordinal) => new Utf8(getter.getUTF8String(ordinal).getBytes) + + case (BinaryType, FIXED) => + val size = avroType.getFixedSize + (getter, ordinal) => + val data: Array[Byte] = getter.getBinary(ordinal) + if (data.length != size) { + def len2str(len: Int): String = s"$len ${if (len > 1) "bytes" else "byte"}" + throw new IncompatibleSchemaException(errorPrefix + len2str(data.length) + + " of binary data cannot be written into FIXED type with size of " + len2str(size)) + } + new Fixed(avroType, data) + + case (BinaryType, BYTES) => + (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) + + case (DateType, INT) => + (getter, ordinal) => getter.getInt(ordinal) + + case (TimestampType, LONG) => avroType.getLogicalType match { + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), output the timestamp value as with millisecond precision. + case null | _: TimestampMillis => (getter, ordinal) => + Math.floorDiv(getter.getLong(ordinal), 1000L) + case _: TimestampMicros => (getter, ordinal) => + getter.getLong(ordinal) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other") + } + + case (ArrayType(et, containsNull), ARRAY) => + val elementConverter = newConverter( + et, resolveNullableType(avroType.getElementType, containsNull), + catalystPath :+ "element", avroPath :+ "element") + (getter, ordinal) => { + val arrayData = getter.getArray(ordinal) + val len = arrayData.numElements() + val result = new Array[Any](len) + var i = 0 + while (i < len) { + if (containsNull && arrayData.isNullAt(i)) { + result(i) = null + } else { + result(i) = elementConverter(arrayData, i) + } + i += 1 + } + // avro writer is expecting a Java Collection, so we convert it into + // `ArrayList` backed by the specified array without data copying. + java.util.Arrays.asList(result: _*) + } + + case (st: StructType, RECORD) => + val structConverter = newStructConverter(st, avroType, catalystPath, avroPath) + val numFields = st.length + (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields)) + + case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType => + val valueConverter = newConverter( + vt, resolveNullableType(avroType.getValueType, valueContainsNull), + catalystPath :+ "value", avroPath :+ "value") + (getter, ordinal) => + val mapData = getter.getMap(ordinal) + val len = mapData.numElements() + val result = new java.util.HashMap[String, Any](len) + val keyArray = mapData.keyArray() + val valueArray = mapData.valueArray() + var i = 0 + while (i < len) { + val key = keyArray.getUTF8String(i).toString + if (valueContainsNull && valueArray.isNullAt(i)) { + result.put(key, null) + } else { + result.put(key, valueConverter(valueArray, i)) + } + i += 1 + } + result + + case _ => + throw new IncompatibleSchemaException(errorPrefix + + s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = $avroType)") + } + } + + private def newStructConverter( + catalystStruct: StructType, + avroStruct: Schema, + catalystPath: Seq[String], + avroPath: Seq[String]): InternalRow => Record = { + + val avroSchemaHelper = new AvroUtils.AvroSchemaHelper( + avroStruct, catalystStruct, avroPath, catalystPath) + + avroSchemaHelper.validateNoExtraCatalystFields(ignoreNullable = false) + avroSchemaHelper.validateNoExtraAvroFields() + + val (avroIndices, fieldConverters) = avroSchemaHelper.matchedFields.map { + case AvroMatchedField(catalystField, _, avroField) => + val converter = newConverter(catalystField.dataType, + resolveNullableType(avroField.schema(), catalystField.nullable), + catalystPath :+ catalystField.name, avroPath :+ avroField.name) + (avroField.pos(), converter) + }.toArray.unzip + + val numFields = catalystStruct.length + row: InternalRow => + val result = new Record(avroStruct) + var i = 0 + while (i < numFields) { + if (row.isNullAt(i)) { + result.put(avroIndices(i), null) + } else { + result.put(avroIndices(i), fieldConverters(i).apply(row, i)) + } + i += 1 + } + result + } + + /** + * Resolve a possibly nullable Avro Type. + * + * An Avro type is nullable when it is a [[UNION]] of two types: one null type and another + * non-null type. This method will check the nullability of the input Avro type and return the + * non-null type within when it is nullable. Otherwise it will return the input Avro type + * unchanged. It will throw an [[UnsupportedAvroTypeException]] when the input Avro type is an + * unsupported nullable type. + * + * It will also log a warning message if the nullability for Avro and catalyst types are + * different. + */ + private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = { + val (avroNullable, resolvedAvroType) = resolveAvroType(avroType) + warnNullabilityDifference(avroNullable, nullable) + resolvedAvroType + } + + /** + * Check the nullability of the input Avro type and resolve it when it is nullable. The first + * return value is a [[Boolean]] indicating if the input Avro type is nullable. The second + * return value is the possibly resolved type. + */ + private def resolveAvroType(avroType: Schema): (Boolean, Schema) = { + if (avroType.getType == Type.UNION) { + val fields = avroType.getTypes.asScala + val actualType = fields.filter(_.getType != Type.NULL) + if (fields.length != 2 || actualType.length != 1) { + throw new UnsupportedAvroTypeException( + s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " + + "type is supported") + } + (true, actualType.head) + } else { + (false, avroType) + } + } + + /** + * log a warning message if the nullability for Avro and catalyst types are different. + */ + private def warnNullabilityDifference(avroNullable: Boolean, catalystNullable: Boolean): Unit = { + if (avroNullable && !catalystNullable) { + logWarning("Writing Avro files with nullable Avro schema and non-nullable catalyst schema.") + } + if (!avroNullable && catalystNullable) { + logWarning("Writing Avro files with non-nullable Avro schema and nullable catalyst " + + "schema will throw runtime exception if there is a record with null value.") + } + } +} diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroUtils.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroUtils.scala new file mode 100644 index 000000000..06f8adc9a --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroUtils.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import java.util.Locale +import scala.collection.JavaConverters._ + +import org.apache.avro.Schema +import org.apache.spark.sql.types._ +import org.apache.spark.sql.internal.SQLConf + +object AvroUtils{ + + /** Wrapper for a pair of matched fields, one Catalyst and one corresponding Avro field. */ + case class AvroMatchedField( + catalystField: StructField, + catalystPosition: Int, + avroField: Schema.Field) + + /** + * Helper class to perform field lookup/matching on Avro schemas. + * + * This will match `avroSchema` against `catalystSchema`, attempting to find a matching field in + * the Avro schema for each field in the Catalyst schema and vice-versa, respecting settings for + * case sensitivity. The match results can be accessed using the getter methods. + * + * @param avroSchema The schema in which to search for fields. Must be of type RECORD. + * @param catalystSchema The Catalyst schema to use for matching. + * @param avroPath The seq of parent field names leading to `avroSchema`. + * @param catalystPath The seq of parent field names leading to `catalystSchema`. + */ + class AvroSchemaHelper( + avroSchema: Schema, + catalystSchema: StructType, + avroPath: Seq[String], + catalystPath: Seq[String]) { + if (avroSchema.getType != Schema.Type.RECORD) { + throw new IncompatibleSchemaException( + s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: ${avroSchema.getType}") + } + + private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray + private[this] val fieldMap = avroSchema.getFields.asScala + .groupBy(_.name.toLowerCase(Locale.ROOT)) + .mapValues(_.toSeq) // toSeq needed for scala 2.13 + + /** The fields which have matching equivalents in both Avro and Catalyst schemas. */ + val matchedFields: Seq[AvroMatchedField] = catalystSchema.zipWithIndex.flatMap { + case (sqlField, sqlPos) => + getAvroField(sqlField.name, sqlPos).map(AvroMatchedField(sqlField, sqlPos, _)) + } + + /** + * Validate that there are no Catalyst fields which don't have a matching Avro field, throwing + * [[IncompatibleSchemaException]] if such extra fields are found. If `ignoreNullable` is false, + * consider nullable Catalyst fields to be eligible to be an extra field; otherwise, + * ignore nullable Catalyst fields when checking for extras. + */ + def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit = + catalystSchema.zipWithIndex.foreach { case (sqlField, sqlPos) => + if (getAvroField(sqlField.name, sqlPos).isEmpty && + (!ignoreNullable || !sqlField.nullable)) { + throw new IncompatibleSchemaException( + s"Cannot find ${toFieldStr(catalystPath :+ sqlField.name)} in Avro schema") + } + } + + /** + * Validate that there are no Avro fields which don't have a matching Catalyst field, throwing + * [[IncompatibleSchemaException]] if such extra fields are found. + */ + def validateNoExtraAvroFields(): Unit = { + (avroFieldArray.toSet -- matchedFields.map(_.avroField)).foreach { extraField => + throw new IncompatibleSchemaException( + s"Found ${toFieldStr(avroPath :+ extraField.name())} in Avro schema but there is no " + + "match in the SQL schema") + } + } + + /** + * Extract a single field from the contained avro schema which has the desired field name, + * performing the matching with proper case sensitivity according to SQLConf.resolver. + * + * @param name The name of the field to search for. + * @return `Some(match)` if a matching Avro field is found, otherwise `None`. + */ + private[avro] def getFieldByName(name: String): Option[Schema.Field] = { + + // get candidates, ignoring case of field name + val candidates = fieldMap.getOrElse(name.toLowerCase(Locale.ROOT), Seq.empty) + + // search candidates, taking into account case sensitivity settings + candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match { + case Seq(avroField) => Some(avroField) + case Seq() => None + case matches => throw new IncompatibleSchemaException(s"Searching for '$name' in Avro " + + s"schema at ${toFieldStr(avroPath)} gave ${matches.size} matches. Candidates: " + + matches.map(_.name()).mkString("[", ", ", "]") + ) + } + } + + /** Get the Avro field corresponding to the provided Catalyst field name/position, if any. */ + def getAvroField(fieldName: String, catalystPos: Int): Option[Schema.Field] = { + getFieldByName(fieldName) + } + } + + def toFieldStr(names: Seq[String]): String = names match { + case Seq() => "top-level record" + case n => s"field '${n.mkString(".")}'" + } + +} \ No newline at end of file diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/CatalystDataToAvro.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/CatalystDataToAvro.scala new file mode 100644 index 000000000..2af683731 --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/CatalystDataToAvro.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import java.io.ByteArrayOutputStream + +import com.azure.data.schemaregistry.SchemaRegistryClientBuilder +import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializerBuilder +import com.azure.identity.ClientSecretCredentialBuilder + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.{BinaryEncoder, EncoderFactory} + +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.{BinaryType, DataType} + +case class CatalystDataToAvro( + child: Expression, + schemaId: String, + schemaDefinition: String, + options: Map[java.lang.String, java.lang.String]) + extends UnaryExpression { + + override def dataType: DataType = BinaryType + + @transient private lazy val schemaReader = SchemaReader.createSchemaReader(schemaId, schemaDefinition, options, true) + + @transient private lazy val avroConverter = + new AvroSerializer(child.dataType, schemaReader.expectedSchema, child.nullable) + + @transient private lazy val writer = + new GenericDatumWriter[Any](schemaReader.expectedSchema) + + @transient private var encoder: BinaryEncoder = _ + + @transient private lazy val out = new ByteArrayOutputStream + + override def nullSafeEval(input: Any): Any = { + out.reset() + encoder = EncoderFactory.get().directBinaryEncoder(out, encoder) + val avroData = avroConverter.serialize(input) + val prefixBytes = Array[Byte](0, 0, 0, 0) + val payloadPrefixBytes = prefixBytes ++ schemaReader.schemaId.getBytes() + + writer.write(avroData, encoder) + encoder.flush() + + val payloadOut = payloadPrefixBytes ++ out.toByteArray + payloadOut + } + + override def prettyName: String = "to_avro" + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expr = ctx.addReferenceObj("this", this) + defineCodeGen(ctx, ev, input => + s"(byte[]) $expr.nullSafeEval($input)") + } + +} diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaConverters.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaConverters.scala new file mode 100644 index 000000000..71b31a38d --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaConverters.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} +import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis} +import org.apache.avro.Schema.Type._ + +import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, minBytesForPrecision} + +/** + * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice + * versa. + */ +object SchemaConverters { + private lazy val uuidGenerator = RandomUUIDGenerator(new Random().nextLong()) + + private lazy val nullSchema = Schema.create(Schema.Type.NULL) + + case class SchemaType(dataType: DataType, nullable: Boolean) + + /** + * This function takes an avro schema and returns a sql schema. + */ + def toSqlType(avroSchema: Schema): SchemaType = { + toSqlTypeHelper(avroSchema, Set.empty) + } + + def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = { + avroSchema.getType match { + case INT => avroSchema.getLogicalType match { + case _: Date => SchemaType(DateType, nullable = false) + case _ => SchemaType(IntegerType, nullable = false) + } + case STRING => SchemaType(StringType, nullable = false) + case BOOLEAN => SchemaType(BooleanType, nullable = false) + case BYTES | FIXED => avroSchema.getLogicalType match { + // For FIXED type, if the precision requires more bytes than fixed size, the logical + // type will be null, which is handled by Avro library. + case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false) + case _ => SchemaType(BinaryType, nullable = false) + } + + case DOUBLE => SchemaType(DoubleType, nullable = false) + case FLOAT => SchemaType(FloatType, nullable = false) + case LONG => avroSchema.getLogicalType match { + case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false) + case _ => SchemaType(LongType, nullable = false) + } + + case ENUM => SchemaType(StringType, nullable = false) + + case RECORD => + if (existingRecordNames.contains(avroSchema.getFullName)) { + throw new IncompatibleSchemaException(s""" + |Found recursive reference in Avro schema, which can not be processed by Spark: + |${avroSchema.toString(true)} + """.stripMargin) + } + val newRecordNames = existingRecordNames + avroSchema.getFullName + val fields = avroSchema.getFields.asScala.map { f => + val schemaType = toSqlTypeHelper(f.schema(), newRecordNames) + StructField(f.name, schemaType.dataType, schemaType.nullable) + } + + SchemaType(StructType(fields), nullable = false) + + case ARRAY => + val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames) + SchemaType( + ArrayType(schemaType.dataType, containsNull = schemaType.nullable), + nullable = false) + + case MAP => + val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames) + SchemaType( + MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), + nullable = false) + + case UNION => + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { + // In case of a union with null, eliminate it and make a recursive call + val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true) + } else { + toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames) + .copy(nullable = true) + } + } else avroSchema.getTypes.asScala.map(_.getType) match { + case Seq(t1) => + toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames) + case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => + SchemaType(LongType, nullable = false) + case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => + SchemaType(DoubleType, nullable = false) + case _ => + // Convert complex unions to struct types where field names are member0, member1, etc. + // This is consistent with the behavior when converting between Avro and Parquet. + val fields = avroSchema.getTypes.asScala.zipWithIndex.map { + case (s, i) => + val schemaType = toSqlTypeHelper(s, existingRecordNames) + // All fields are nullable because only one of them is set at a time + StructField(s"member$i", schemaType.dataType, nullable = true) + } + + SchemaType(StructType(fields), nullable = false) + } + + case other => throw new IncompatibleSchemaException(s"Unsupported type $other") + } + } + + def toAvroType( + catalystType: DataType, + nullable: Boolean = false, + recordName: String = "topLevelRecord", + nameSpace: String = "") + : Schema = { + val builder = SchemaBuilder.builder() + + val schema = catalystType match { + case BooleanType => builder.booleanType() + case ByteType | ShortType | IntegerType => builder.intType() + case LongType => builder.longType() + case DateType => + LogicalTypes.date().addToSchema(builder.intType()) + case TimestampType => + LogicalTypes.timestampMicros().addToSchema(builder.longType()) + + case FloatType => builder.floatType() + case DoubleType => builder.doubleType() + case StringType => builder.stringType() + case d: DecimalType => + val avroType = LogicalTypes.decimal(d.precision, d.scale) + val fixedSize = minBytesForPrecision(d.precision) + // Need to avoid naming conflict for the fixed fields + val name = nameSpace match { + case "" => s"$recordName.fixed" + case _ => s"$nameSpace.$recordName.fixed" + } + avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize)) + + case BinaryType => builder.bytesType() + case ArrayType(et, containsNull) => + builder.array() + .items(toAvroType(et, containsNull, recordName, nameSpace)) + case MapType(StringType, vt, valueContainsNull) => + builder.map() + .values(toAvroType(vt, valueContainsNull, recordName, nameSpace)) + case st: StructType => + val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName + val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() + st.foreach { f => + val fieldAvroType = + toAvroType(f.dataType, f.nullable, f.name, childNameSpace) + fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault() + } + fieldsAssembler.endRecord() + + // This should never happen. + case other => throw new IncompatibleSchemaException(s"Unexpected type $other.") + } + if (nullable) { + Schema.createUnion(schema, nullSchema) + } else { + schema + } + } +} + +class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) + +class UnsupportedAvroTypeException(msg: String) extends Exception(msg) \ No newline at end of file diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaReader.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaReader.scala new file mode 100644 index 000000000..b4f74c478 --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaReader.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import com.azure.data.schemaregistry.implementation.models.ErrorException +import com.azure.data.schemaregistry.models.{SchemaFormat, SchemaProperties, SchemaRegistrySchema} +import com.azure.data.schemaregistry.SchemaRegistryClientBuilder +import com.azure.data.schemaregistry.avro.{SchemaRegistryAvroSerializerBuilder} +import com.azure.identity.ClientSecretCredentialBuilder +import org.apache.avro.Schema +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.types._ +import scala.util.Try + +import functions._ + +class SchemaReader( + var schemaId: String, + var expectedSchemaString: String, + val options: Map[java.lang.String, java.lang.String]) { + + @transient private lazy val schemaRegistryCredential = new ClientSecretCredentialBuilder() + .tenantId(options.getOrElse(SCHEMA_REGISTRY_TENANT_ID_KEY, null)) + .clientId(options.getOrElse(SCHEMA_REGISTRY_CLIENT_ID_KEY, null)) + .clientSecret(options.getOrElse(SCHEMA_REGISTRY_CLIENT_SECRET_KEY, null)) + .build() + + @transient private lazy val schemaRegistryAsyncClient = new SchemaRegistryClientBuilder() + .fullyQualifiedNamespace(options.getOrElse(SCHEMA_REGISTRY_URL, null)) + .credential(schemaRegistryCredential) + .buildAsyncClient() + + @transient lazy val serializer = new SchemaRegistryAvroSerializerBuilder() + .schemaRegistryAsyncClient(schemaRegistryAsyncClient) + .schemaGroup(options.getOrElse(SCHEMA_GROUP_KEY, null)) + //.autoRegisterSchema(options.getOrElse(SCHEMA_AUTO_REGISTER_FLAG_KEY, false).asInstanceOf[Boolean]) + .buildSerializer() + + def setSchemaString = { + val schemaRegistrySchema = schemaRegistryAsyncClient.getSchema(schemaId).block() + expectedSchemaString = schemaRegistrySchema.getDefinition + } + + def setSchemaId = { + val schemaGroup: String = options.getOrElse(SCHEMA_GROUP_KEY, null) + val schemaName: String = options.getOrElse(SCHEMA_NAME_KEY, null) + try { + val schemaProperties = schemaRegistryAsyncClient.getSchemaProperties(schemaGroup, schemaName, expectedSchemaString, SchemaFormat.AVRO).block() + schemaId = schemaProperties.getId + } catch { + case e: ErrorException => { + val errorStatusCode = e.getResponse.getStatusCode + errorStatusCode match { + case 404 => { // schema not found + val autoRegistryStr: String = options.getOrElse(SCHEMA_AUTO_REGISTER_FLAG_KEY, "false") + val autoRegistryFlag: Boolean = Try(autoRegistryStr.toLowerCase.toBoolean).getOrElse(false) + if(autoRegistryFlag) { + val schemaProperties = schemaRegistryAsyncClient + .registerSchema(schemaGroup, schemaName, expectedSchemaString, SchemaFormat.AVRO) + .block() + schemaId = schemaProperties.getId + } else { + throw new SchemaNotFoundException(s"Schema with name=$schemaName and content=$expectedSchemaString does not" + + s"exist in schemaGroup=$schemaGroup and $SCHEMA_AUTO_REGISTER_FLAG_KEY is set to false. If you want to" + + s"auto register a new schema make sure to set $SCHEMA_AUTO_REGISTER_FLAG_KEY to true in the properties.") + } + } + case _ => throw e + } + } + case e: Throwable => throw e + } + } + + @transient lazy val expectedSchema = new Schema.Parser().parse(expectedSchemaString) +} + +object SchemaReader { + val VALUE_NOT_PROVIDED: String = "NOTHING" + + def createSchemaReader( + schemaId: String, + schemaDefinition: String, + options: Map[java.lang.String, java.lang.String], + calledByTo_avro: Boolean = false) : SchemaReader = { + // check for null schema string or schema guid + if((schemaId == VALUE_NOT_PROVIDED) && (schemaDefinition == null)) { + throw new NullPointerException("Schema definition cannot be null.") + } + else if((schemaDefinition == VALUE_NOT_PROVIDED) && (schemaId == null)) { + throw new NullPointerException("Schema Id cannot be null.") + } + + validateOptions(options) + if(schemaId != VALUE_NOT_PROVIDED) { + //schema Id is provided + val schemaReader = new SchemaReader(schemaId, schemaDefinition , options) + schemaReader.setSchemaString + schemaReader + } else { + // schema definition is provided + // in case to_avro has been called with the schema itself, the schema group and schema name should be provided to get the schema id + if(calledByTo_avro) { + schemaGroupAndNameAreSet(options) + } + // ensure schema doesn't have any whitespaces, otherwise getSchemaId API won't work properly! + val schemaDefinitionWithoutSpaces = schemaDefinition.replaceAll("[\\n\\t ]", "") + val schemaReader = new SchemaReader(schemaId, schemaDefinitionWithoutSpaces, options) + if(calledByTo_avro) { + schemaReader.setSchemaId + } + schemaReader + } + } + + private def validateOptions( + options: Map[java.lang.String, java.lang.String]) = { + // tenant id, client id, client secret and endpoint url should be present in all cases + if(!options.contains(SCHEMA_REGISTRY_TENANT_ID_KEY)) { + throw new MissingPropertyException(s"schemaRegistryCredential requires the tenant id. Please provide the " + + s"tenant id in the properties, using the $SCHEMA_REGISTRY_TENANT_ID_KEY key.") + } + if(!options.contains(SCHEMA_REGISTRY_CLIENT_ID_KEY)) { + throw new MissingPropertyException(s"schemaRegistryCredential requires the client id. Please provide the " + + s"client id in the properties, using the $SCHEMA_REGISTRY_CLIENT_ID_KEY key.") + } + if(!options.contains(SCHEMA_REGISTRY_CLIENT_SECRET_KEY)) { + throw new MissingPropertyException(s"schemaRegistryCredential requires the client secret. Please provide the " + + s"client secret in the properties, using the $SCHEMA_REGISTRY_CLIENT_SECRET_KEY key.") + } + if(!options.contains(SCHEMA_REGISTRY_URL)) { + throw new MissingPropertyException(s"schemaRegistryClient requires the endpoint url. Please provide the " + + s"endpoint url in the properties, using the $SCHEMA_REGISTRY_URL key.") + } + } + + private def schemaGroupAndNameAreSet(options: Map[java.lang.String, java.lang.String]) = { + if(!options.contains(SCHEMA_GROUP_KEY)) { + throw new MissingPropertyException(s"schemaRegistryClient requires the schema group to get the schema Guid. " + + s"Please provide the schema group in the properties, using the $SCHEMA_GROUP_KEY key.") + } + if(!options.contains(SCHEMA_NAME_KEY)) { + throw new MissingPropertyException(s"schemaRegistryClient requires the schema name to get the schema Guid. " + + s"Please provide the schema name in the properties, using the $SCHEMA_NAME_KEY key.") + } + } +} diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaRegistryExceptions.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaRegistryExceptions.scala new file mode 100644 index 000000000..320bcbd8c --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/SchemaRegistryExceptions.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +final case class MissingPropertyException(private val message: String) extends Exception(message) {} + +final case class SchemaNotFoundException(private val message: String) extends Exception(message) {} diff --git a/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/functions.scala b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/functions.scala new file mode 100644 index 000000000..422b5072a --- /dev/null +++ b/schemaregistry-avro/src/main/scala/com/microsoft/azure/schemaregistry/spark/avro/functions.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import com.azure.data.schemaregistry.avro.{SchemaRegistryAvroSerializer} +import scala.collection.JavaConverters._ +import org.apache.spark.sql.Column + +/*** + * Scala object containing utility methods for serialization/deserialization with Azure Schema Registry and Spark SQL + * columns. + * + * Functions are agnostic to data source or sink and can be used with any Schema Registry payloads, including: + * - Kafka Spark connector ($value) + * - Event Hubs Spark connector ($Body) + * - Event Hubs Avro Capture blobs ($Body) + */ +object functions { + + val SCHEMA_REGISTRY_TENANT_ID_KEY: String = "schema.registry.tenant.id" + val SCHEMA_REGISTRY_CLIENT_ID_KEY: String = "schema.registry.client.id" + val SCHEMA_REGISTRY_CLIENT_SECRET_KEY: String = "schema.registry.client.secret" + val SCHEMA_REGISTRY_URL: String = "schema.registry.url" + val SCHEMA_GROUP_KEY: String = "schema.group" + val SCHEMA_NAME_KEY: String = "schema.name" + val SCHEMA_AUTO_REGISTER_FLAG_KEY: String = "schema.auto.register.flag" + val SCHEMA_PARSE_MODE: String = "failure.mode" + val SCHEMA_EXACT_MATCH_REQUIRED: String = "schema.exact.match.required" + + /*** + * Converts Spark SQL Column containing SR payloads into a into its corresponding catalyst value. + * This methods either uses schema GUID or the schema in JSON string format. + * If schemaId is provided, it must be the Schema GUID of the actual schema used to serialize the data. + * If jsonFormatSchema is provided, it must match the actual schema used to serialize the data. + * + * @param data column with SR payloads + * @param schemaString The avro schema in JSON string format. + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) and schema exact match flag + */ + def from_avro( + data: Column, + schemaString: String, + clientOptions: java.util.Map[String, String]): Column = { + new Column(AvroDataToCatalyst(data.expr, SchemaReader.VALUE_NOT_PROVIDED, schemaString, clientOptions.asScala.toMap)) + } + + /*** + * Converts Spark SQL Column containing SR payloads into a into its corresponding catalyst value. + * This methods either uses schema GUID or the schema in JSON string format. + * If schemaId is provided, it must be the Schema GUID of the actual schema used to serialize the data. + * If jsonFormatSchema is provided, it must match the actual schema used to serialize the data. + * + * @param data column with SR payloads + * @param schemaId The GUID of the expected schema. + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) and schema exact match flag + */ + def from_avro( + data: Column, + schemaId: SchemaGUID, + clientOptions: java.util.Map[String, String]): Column = { + if(schemaId == null) { + throw new NullPointerException("Schema Id cannot be null.") + } + new Column(AvroDataToCatalyst(data.expr, schemaId.schemaIdStringValue, SchemaReader.VALUE_NOT_PROVIDED, clientOptions.asScala.toMap)) + } + + /** + * Converts a Spark SQL Column into a column containing SR payloads. + * + * @param data the data column. + * @param schemaString The avro schema in JSON string format. + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) + * @return column with SR payloads + * + */ + def to_avro(data: Column, + schemaString: String, + clientOptions: java.util.Map[java.lang.String, java.lang.String]): Column = { + new Column(CatalystDataToAvro(data.expr, SchemaReader.VALUE_NOT_PROVIDED, schemaString, clientOptions.asScala.toMap)) + } + + /** + * Converts a Spark SQL Column into a column containing SR payloads. + * + * @param data the data column. + * @param schemaId The GUID of the expected schema + * @param clientOptions map of configuration properties, including Spark run mode (permissive vs. fail-fast) + * @return column with SR payloads + * + */ + def to_avro(data: Column, + schemaId: SchemaGUID, + clientOptions: java.util.Map[java.lang.String, java.lang.String]): Column = { + if(schemaId == null) { + throw new NullPointerException("Schema Id cannot be null.") + } + new Column(CatalystDataToAvro(data.expr, schemaId.schemaIdStringValue, SchemaReader.VALUE_NOT_PROVIDED, clientOptions.asScala.toMap)) + } + +} + +case class SchemaGUID(schemaIdStringValue: String) {} + diff --git a/schemaregistry-avro/src/test/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroFunctionsSuite.scala b/schemaregistry-avro/src/test/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroFunctionsSuite.scala new file mode 100644 index 000000000..62e2c8e0b --- /dev/null +++ b/schemaregistry-avro/src/test/scala/com/microsoft/azure/schemaregistry/spark/avro/AvroFunctionsSuite.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.azure.schemaregistry.spark.avro + +import java.util +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkException +import org.apache.spark.sql.{Column, QueryTest, Row} +import org.apache.spark.sql.execution.LocalTableScanExec +import org.apache.spark.sql.functions.{col, lit, struct} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class AvroFunctionsSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + test("from_avro do not handle null column") { + try { + functions.from_avro(null, "schema_id", null) + fail() + } + catch { + case _: NullPointerException => + } + } + + test("from_avro do not handle null client options") { + try { + functions.from_avro(new Column("empty"), "schema_id", null) + fail() + } + catch { + case _: NullPointerException => + } + } + + + test("schema Id cannot be Null") { + try { + val configMap: util.Map[String, String] = new util.HashMap[String, String]() + val schemaReader = SchemaReader.createSchemaReader(null, SchemaReader.VALUE_NOT_PROVIDED, configMap.asScala.toMap) + fail() + } + catch { + case _: NullPointerException => + } + } + + test("schema Definition cannot be Null") { + try { + val configMap: util.Map[String, String] = new util.HashMap[String, String]() + val schemaReader = SchemaReader.createSchemaReader(SchemaReader.VALUE_NOT_PROVIDED, null, configMap.asScala.toMap) + fail() + } + catch { + case _: NullPointerException => + } + } + + test("from_avro invalid client options -- missing tenant_id") { + // tenant_id, client_id and client_secret must be provided + val configMap: util.Map[String, String] = new util.HashMap[String, String]() + configMap.put(functions.SCHEMA_REGISTRY_URL, "https://namespace.servicebus.windows.net") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_ID_KEY, "client_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + val caughtEx = intercept[MissingPropertyException] { + val schemaReader = SchemaReader.createSchemaReader("schema_id", SchemaReader.VALUE_NOT_PROVIDED, configMap.asScala.toMap) + //functions.from_avro(new Column("empty"), "schema_id", configMap) + } + assert(caughtEx.getMessage == "schemaRegistryCredential requires the tenant id. Please provide the tenant id in the properties, using the schema.registry.tenant.id key.") + } + + test("from_avro invalid client options -- missing endpoint url") { + // tenant_id, client_id and client_secret must be provided + val configMap = new util.HashMap[String, String]() + configMap.put(functions.SCHEMA_REGISTRY_TENANT_ID_KEY, "tenant_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_ID_KEY, "client_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + val caughtEx = intercept[MissingPropertyException] { + val schemaReader = SchemaReader.createSchemaReader("schema_id", SchemaReader.VALUE_NOT_PROVIDED, configMap.asScala.toMap) + //functions.from_avro(new Column("empty"), "schema_id", configMap) + } + assert(caughtEx.getMessage == "schemaRegistryClient requires the endpoint url. Please provide the endpoint url in the properties, using the schema.registry.url key.") + } + + test("to_avro do not handle null column") { + try { + functions.to_avro(null, "schema_id", null) + fail() + } + catch { + case _: NullPointerException => + } + } + + test("to_avro do not handle null client options") { + try { + functions.to_avro(new Column("empty"), "schema_id", null) + fail() + } + catch { + case _: NullPointerException => + } + } + + test("to_avro invalid client options -- missing client_id") { + // tenant_id, client_id and client_secret must be provided + val configMap = new util.HashMap[String, String]() + configMap.put(functions.SCHEMA_REGISTRY_URL, "https://namespace.servicebus.windows.net") + configMap.put(functions.SCHEMA_REGISTRY_TENANT_ID_KEY, "tenant_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + val caughtEx = intercept[MissingPropertyException] { + val schemaReader = SchemaReader.createSchemaReader("schema_id", SchemaReader.VALUE_NOT_PROVIDED, configMap.asScala.toMap, true) + //functions.to_avro(new Column("empty"), "schema_id", configMap) + } + assert(caughtEx.getMessage == "schemaRegistryCredential requires the client id. Please provide the client id in the properties, using the schema.registry.client.id key.") + } + + test("to_avro with schema content requires the schema group and schema name in client options -- missing schema group") { + // tenant_id, client_id and client_secret must be provided + val configMap = new util.HashMap[String, String]() + configMap.put(functions.SCHEMA_REGISTRY_URL, "https://namespace.servicebus.windows.net") + configMap.put(functions.SCHEMA_REGISTRY_TENANT_ID_KEY, "tenant_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_ID_KEY, "client_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + configMap.put(functions.SCHEMA_NAME_KEY, "schema_name") + val caughtEx = intercept[MissingPropertyException] { + val schemaReader = SchemaReader.createSchemaReader(SchemaReader.VALUE_NOT_PROVIDED, "schema_content", configMap.asScala.toMap, true) + //functions.to_avro(new Column("empty"), "schema_content", configMap, false) + } + assert(caughtEx.getMessage == "schemaRegistryClient requires the schema group to get the schema Guid. " + + s"Please provide the schema group in the properties, using the schema.group key.") + } + + test("to_avro with schema content requires the schema group and schema name in client options -- missing schema name") { + // tenant_id, client_id and client_secret must be provided + val configMap = new util.HashMap[String, String]() + configMap.put(functions.SCHEMA_REGISTRY_URL, "https://namespace.servicebus.windows.net") + configMap.put(functions.SCHEMA_REGISTRY_TENANT_ID_KEY, "tenant_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_ID_KEY, "client_id") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + configMap.put(functions.SCHEMA_REGISTRY_CLIENT_SECRET_KEY, "client_secret") + configMap.put(functions.SCHEMA_GROUP_KEY, "schema_group") + val caughtEx = intercept[MissingPropertyException] { + val schemaReader = SchemaReader.createSchemaReader(SchemaReader.VALUE_NOT_PROVIDED, "schema_content", configMap.asScala.toMap, true) + //functions.to_avro(new Column("empty"), "schema_content", configMap, false) + } + assert(caughtEx.getMessage == "schemaRegistryClient requires the schema name to get the schema Guid. " + + s"Please provide the schema name in the properties, using the schema.name key.") + } + +}