Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50310][PYTHON] Add a flag to disable DataFrameQueryContext for PySpark #48964

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions python/pyspark/errors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,41 @@
Type,
Optional,
Union,
TYPE_CHECKING,
overload,
cast,
)
import pyspark
from pyspark.errors.error_classes import ERROR_CLASSES_MAP

if TYPE_CHECKING:
from pyspark.sql import SparkSession

T = TypeVar("T")
FuncT = TypeVar("FuncT", bound=Callable[..., Any])

_current_origin = threading.local()

# Providing DataFrame debugging options to reduce performance slowdown.
# Default is True.
_enable_debugging_cache = None


def is_debugging_enabled() -> bool:
global _enable_debugging_cache

if _enable_debugging_cache is None:
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()
if spark is not None:
_enable_debugging_cache = (
spark.conf.get(
"spark.sql.dataFrameDebugging.enabled", "true" # type: ignore[union-attr]
).lower()
== "true"
)
else:
_enable_debugging_cache = False

return _enable_debugging_cache


def current_origin() -> threading.local:
global _current_origin
Expand Down Expand Up @@ -164,17 +184,12 @@ def get_message_template(self, errorClass: str) -> str:
return message_template


def _capture_call_site(spark_session: "SparkSession", depth: int) -> str:
def _capture_call_site(depth: int) -> str:
"""
Capture the call site information including file name, line number, and function name.
This function updates the thread-local storage from JVM side (PySparkCurrentOrigin)
with the current call site information when a PySpark API function is called.

Parameters
----------
spark_session : SparkSession
Current active Spark session.

Notes
-----
The call site information is used to enhance error messages with the exact location
Expand Down Expand Up @@ -245,7 +260,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:

# Getting the configuration requires RPC call. Uses the default value for now.
depth = 1
set_current_origin(func.__name__, _capture_call_site(spark, depth))
set_current_origin(func.__name__, _capture_call_site(depth))

try:
return func(*args, **kwargs)
Expand All @@ -262,7 +277,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
)
)
# Update call site when the function is called
jvm_pyspark_origin.set(func.__name__, _capture_call_site(spark, depth))
jvm_pyspark_origin.set(func.__name__, _capture_call_site(depth))

try:
return func(*args, **kwargs)
Expand Down Expand Up @@ -297,7 +312,10 @@ def with_origin_to_class(
return lambda cls: with_origin_to_class(cls, ignores)
else:
cls = cls_or_ignores
if os.environ.get("PYSPARK_PIN_THREAD", "true").lower() == "true":
if (
os.environ.get("PYSPARK_PIN_THREAD", "true").lower() == "true"
and is_debugging_enabled()
):
skipping = set(
["__init__", "__new__", "__iter__", "__nonzero__", "__repr__", "__bool__"]
+ (ignores or [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,14 @@ object StaticSQLConf {
.version("3.1.0")
.stringConf
.createWithDefault("")

val DATA_FRAME_DEBUGGING_ENABLED =
buildStaticConf("spark.sql.dataFrameDebugging.enabled")
itholic marked this conversation as resolved.
Show resolved Hide resolved
itholic marked this conversation as resolved.
Show resolved Hide resolved
.internal()
.doc(
"Enable the DataFrame debugging. This feature is enabled by default, but has a " +
"non-trivial performance overhead because of the stack trace collection.")
.version("4.0.0")
.booleanConf
.createWithDefault(true)
}