You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm working on a PySpark job on Google Cloud Platform (GCP) using the Splink library within a Dataproc cluster, which is managed dynamically through Airflow using DataprocCreateClusterOperator and DataprocDeleteClusterOperator. The job is submitted with DataprocSubmitJobOperator.
The main challenge I'm facing is in loading and registering custom Splink UDFs (such as JaroWinklerSimilarity) provided by the Splink JAR files. Despite configuring the classpath and trying multiple approaches, I continue to receive an error indicating that the classes cannot be found on the classpath. Additionally, even if I skip loading the JAR explicitly (since it should be included when installing Splink), the error persists.
Error:
"Unable to load custom Spark SQL functions such as jaro_winkler from the jar that's provided with Splink."
"Failed to register UDF jaro_winkler_sim: Cannot load class JaroWinklerSimilarity. Please make sure it is on the classpath."
Environment Details:
PySpark Version: 3.5.3
Splink Version: 4.0.4
Python Version: 3.12
Google Cloud Dataproc
Orchestrated through Google Cloud Composer (Airflow)
Implementation Details:
I’ll provide some code snippets below to give a clearer understanding of the configurations and methods I've attempted.
For clarity, I’ve tried switching between specifying the JAR path directly (scala-udf-similarity-0.1.1_spark3.x.jar) and using similarity_jar_location() from Splink, as well as referencing the classes explicitly in various ways (JaroWinklerSimilarity.class). Additionally, I attempted to send the JAR files through the properties setting in Dataproc.
1. Cluster Creation and Package Configuration:
PySpark and Splink (in addition to other dependencies) are installed via the cluster's software configuration:
Below are the main methods I attempted to register the JAR, each using different configuration options:
- Attempt 1: Basic Spark Session Initialization
frompyspark.sqlimportSparkSessionfromsplink.backends.sparkimportsimilarity_jar_locationspark=SparkSession.builder \
.appName("PySpark-Splink-Job") \
.config("spark.jars", similarity_jar_location()) \ # Attempted this without the parenthesis as well -> similarity_jar_location
.getOrCreate()
In each approach, I attempted to register UDFs like JaroWinklerSimilarity, followed by creating a Linker instance in Splink:
UDF registration:
defadd_udfs(spark_sess):
udfs= [
("JARO_WINKLER", "uk.gov.moj.dash.linkage.JaroWinklerSimilarity", DoubleType()), # I tried a lot of mixing and matching for the class path as well
...
]
forfunction_name, class_name, return_typeinudfs:
try:
spark_sess.udf.registerJavaFunction(
function_name,
class_name,
return_type
)
print(f"Registered UDF: {function_name} as {class_name}")
exceptExceptionase:
print(f"Failed to register UDF {function_name}: {str(e)}")
returnspark_sessadd_udfs(spark)
Failed to register UDF jaro_winkler_sim: Cannot load class JaroWinklerSimilarity, please make sure it is on the classpath.
Unable to load custom Spark SQL functions such as jaro_winkler from the jar that's provided with Splink.
You need to ensure the Splink jar is registered.
See https://moj-analytical-services.github.io/splink/demos/example_simple_pyspark.html for an example.
You will not be able to use these functions in your linkage.
You can find the location of the jar by calling the following function:
from splink.spark.jar_location import similarity_jar_location
Full error:
Can not load class uk.gov.moj.dash.linkage.JaroSimilarity, please make sure it is on the classpath.
Additional Notes:
I have verified that the scala-udf-similarity-0.1.1_spark3.x.jar file, and the files retrieved from the similarity_jar_location() function, exists in the cluster. However, none of the approaches allowed successful UDF registration.
I have also tried older splink versions like 3.7.3.
Any pointers or alternative methods to solve this error or load and register the JAR file correctly would be greatly appreciated.
The text was updated successfully, but these errors were encountered:
Issue:
I'm working on a PySpark job on Google Cloud Platform (GCP) using the Splink library within a Dataproc cluster, which is managed dynamically through Airflow using
DataprocCreateClusterOperator
andDataprocDeleteClusterOperator
. The job is submitted withDataprocSubmitJobOperator
.The main challenge I'm facing is in loading and registering custom Splink UDFs (such as JaroWinklerSimilarity) provided by the Splink JAR files. Despite configuring the classpath and trying multiple approaches, I continue to receive an error indicating that the classes cannot be found on the classpath. Additionally, even if I skip loading the JAR explicitly (since it should be included when installing Splink), the error persists.
Error:
Environment Details:
Implementation Details:
I’ll provide some code snippets below to give a clearer understanding of the configurations and methods I've attempted.
For clarity, I’ve tried switching between specifying the JAR path directly (
scala-udf-similarity-0.1.1_spark3.x.jar
) and using similarity_jar_location() from Splink, as well as referencing the classes explicitly in various ways (JaroWinklerSimilarity.class
). Additionally, I attempted to send the JAR files through the properties setting in Dataproc.1. Cluster Creation and Package Configuration:
PySpark and Splink (in addition to other dependencies) are installed via the cluster's software configuration:
2. Job Submission Parameters:
Main PySpark file and submodules are specified in
DataprocSubmitJobOperator
:3. Code to Load and Register UDFs:
Below are the main methods I attempted to register the JAR, each using different configuration options:
- Attempt 1: Basic Spark Session Initialization
- Attempt 2: Using SparkConf to Set JAR Location
- Attempt 3: Setting spark.driver.extraClassPath and spark.executor.extraClassPath
- Attempt 4: Explicitly Adding JAR Using addFile
4. Function to Register UDFs:
In each approach, I attempted to register UDFs like JaroWinklerSimilarity, followed by creating a
Linker
instance in Splink:UDF registration:
Linker instance:
Error Message:
Additional Notes:
I have verified that the
scala-udf-similarity-0.1.1_spark3.x.jar
file, and the files retrieved from thesimilarity_jar_location()
function, exists in the cluster. However, none of the approaches allowed successful UDF registration.I have also tried older splink versions like 3.7.3.
Any pointers or alternative methods to solve this error or load and register the JAR file correctly would be greatly appreciated.
The text was updated successfully, but these errors were encountered: