Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with Registering Splink UDFs in Dataproc PySpark Job on GCP #2486

Open
jessicadahdouh opened this issue Oct 29, 2024 · 0 comments
Open

Comments

@jessicadahdouh
Copy link

Issue:

I'm working on a PySpark job on Google Cloud Platform (GCP) using the Splink library within a Dataproc cluster, which is managed dynamically through Airflow using DataprocCreateClusterOperator and DataprocDeleteClusterOperator. The job is submitted with DataprocSubmitJobOperator.

The main challenge I'm facing is in loading and registering custom Splink UDFs (such as JaroWinklerSimilarity) provided by the Splink JAR files. Despite configuring the classpath and trying multiple approaches, I continue to receive an error indicating that the classes cannot be found on the classpath. Additionally, even if I skip loading the JAR explicitly (since it should be included when installing Splink), the error persists.

Error:

"Unable to load custom Spark SQL functions such as jaro_winkler from the jar that's provided with Splink."
"Failed to register UDF jaro_winkler_sim: Cannot load class JaroWinklerSimilarity. Please make sure it is on the classpath."

Environment Details:

  • PySpark Version: 3.5.3
  • Splink Version: 4.0.4
  • Python Version: 3.12
  • Google Cloud Dataproc
  • Orchestrated through Google Cloud Composer (Airflow)

Implementation Details:

I’ll provide some code snippets below to give a clearer understanding of the configurations and methods I've attempted.

For clarity, I’ve tried switching between specifying the JAR path directly (scala-udf-similarity-0.1.1_spark3.x.jar) and using similarity_jar_location() from Splink, as well as referencing the classes explicitly in various ways (JaroWinklerSimilarity.class). Additionally, I attempted to send the JAR files through the properties setting in Dataproc.

1. Cluster Creation and Package Configuration:

PySpark and Splink (in addition to other dependencies) are installed via the cluster's software configuration:

software_config = {
    "properties": {
        "dataproc:pip.packages": "splink==4.0.4, pyspark==3.5.3, ..."
    }
}

2. Job Submission Parameters:

Main PySpark file and submodules are specified in DataprocSubmitJobOperator:

PYSPARK_FILE_PATH = "gs://GCS_BUCKET_FILE_PATH/"
PYSPARK_HELPERS_FILE_PATHS = ["gs://bucket-name/helper.py", "gs://bucket-name/scala-udf-similarity-0.1.1_spark3.x.jar"]

pyspark_job = {
    "main_python_file_uri": PYSPARK_FILE_PATH,
    "python_file_uris": PYSPARK_HELPERS_FILE_PATHS
}

3. Code to Load and Register UDFs:

Below are the main methods I attempted to register the JAR, each using different configuration options:

- Attempt 1: Basic Spark Session Initialization

from pyspark.sql import SparkSession
from splink.backends.spark import similarity_jar_location

spark = SparkSession.builder \
    .appName("PySpark-Splink-Job") \
    .config("spark.jars", similarity_jar_location()) \  # Attempted this without the parenthesis as well -> similarity_jar_location
    .getOrCreate()

- Attempt 2: Using SparkConf to Set JAR Location

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from splink.backends.spark import similarity_jar_location

conf = SparkConf().set("spark.driver.memory", "12g") \
                  .set("spark.default.parallelism", "8") \
                  .set("spark.sql.codegen.wholeStage", "false") \
                  .set("spark.jars", similarity_jar_location())

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

- Attempt 3: Setting spark.driver.extraClassPath and spark.executor.extraClassPath

spark = SparkSession.builder \
    .appName("PySpark-Splink-Job") \
    .config("spark.driver.extraClassPath", splink_files) \
    .config("spark.executor.extraClassPath", splink_files) \
    .config("spark.jars", splink_files) \
    .getOrCreate()

- Attempt 4: Explicitly Adding JAR Using addFile

spark = SparkSession.builder \
    .appName("PySpark-Splink-Job") \
    .config("spark.jars", splink_files) \
    .getOrCreate()

spark.sparkContext.addFile(splink_files)

4. Function to Register UDFs:

In each approach, I attempted to register UDFs like JaroWinklerSimilarity, followed by creating a Linker instance in Splink:

UDF registration:

def add_udfs(spark_sess):
    udfs = [
        ("JARO_WINKLER", "uk.gov.moj.dash.linkage.JaroWinklerSimilarity", DoubleType()),  # I tried a lot of mixing and matching for the class path as well
        ...
    ]

    for function_name, class_name, return_type in udfs:
        try:
            spark_sess.udf.registerJavaFunction(
                function_name,
                class_name,
                return_type
            )
            print(f"Registered UDF: {function_name} as {class_name}")
        except Exception as e:
            print(f"Failed to register UDF {function_name}: {str(e)}")
    return spark_sess

add_udfs(spark)

Linker instance:

from splink import Linker, SettingsCreator, SparkAPI, block_on
import splink.comparison_library as cl

settings = SettingsCreator(
    link_type="dedupe_only",
    comparisons=[
        cl.NameComparison("first_name"),
        cl.NameComparison("surname"),
        cl.LevenshteinAtThresholds("dob"),
        cl.ExactMatch("city").configure(term_frequency_adjustments=True),
        cl.EmailComparison("email"),
    ],
    blocking_rules_to_generate_predictions=[
        block_on("first_name"),
        "l.surname = r.surname",
    ],
    retain_intermediate_calculation_columns=True,
    em_convergence=0.01,
)

project_id = "project_id"
input_table = "input_table_id"

df = spark.read.format("bigquery") \
    .option("project", project_id) \
    .option("table", input_table) \
    .load().limit(10)

# Instantiate the Linker with the Spark DataFrame and settings
linker = Linker(df, settings, db_api=SparkAPI(spark_session=spark))

Error Message:

Failed to register UDF jaro_winkler_sim: Cannot load class JaroWinklerSimilarity, please make sure it is on the classpath.
Unable to load custom Spark SQL functions such as jaro_winkler from the jar that's provided with Splink.
You need to ensure the Splink jar is registered.
See https://moj-analytical-services.github.io/splink/demos/example_simple_pyspark.html for an example.
You will not be able to use these functions in your linkage.
You can find the location of the jar by calling the following function:
from splink.spark.jar_location import similarity_jar_location

Full error:
Can not load class uk.gov.moj.dash.linkage.JaroSimilarity, please make sure it is on the classpath.

Additional Notes:

I have verified that the scala-udf-similarity-0.1.1_spark3.x.jar file, and the files retrieved from the similarity_jar_location() function, exists in the cluster. However, none of the approaches allowed successful UDF registration.

I have also tried older splink versions like 3.7.3.

Any pointers or alternative methods to solve this error or load and register the JAR file correctly would be greatly appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant