From f80dc85d48e9857dc01f8d4284087bcc0e56fad3 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 7 Nov 2023 23:54:49 +0530 Subject: [PATCH] [BUG] Fix Deadlock with ScanOperators in `to_physical_plan_scheduler` and show iostats for glob and from_scan_task (#1581) --- src/daft-micropartition/src/python.rs | 8 ++++- src/daft-plan/src/builder.rs | 10 +++--- src/daft-scan/src/glob.rs | 52 ++++++++++++++++----------- 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 5b75772526..2ebb3c7627 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -72,7 +72,13 @@ impl PyMicroPartition { #[staticmethod] pub fn from_scan_task(scan_task: PyScanTask, py: Python) -> PyResult { Ok(py - .allow_threads(|| MicroPartition::from_scan_task(scan_task.into(), None))? + .allow_threads(|| { + let io_stats = IOStatsContext::new(format!( + "MicroPartition::from_scan_task for {:?}", + scan_task.0.sources + )); + MicroPartition::from_scan_task(scan_task.into(), Some(io_stats)) + })? .into()) } diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 5907c23bd9..47c5571afb 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -459,10 +459,12 @@ impl PyLogicalPlanBuilder { /// Finalize the logical plan, translate the logical plan to a physical plan, and return /// a physical plan scheduler that's capable of launching the work necessary to compute the output /// of the physical plan. - pub fn to_physical_plan_scheduler(&self) -> PyResult { - let logical_plan = self.builder.build(); - let physical_plan: Arc = plan(logical_plan.as_ref())?.into(); - Ok(physical_plan.into()) + pub fn to_physical_plan_scheduler(&self, py: Python) -> PyResult { + py.allow_threads(|| { + let logical_plan = self.builder.build(); + let physical_plan: Arc = plan(logical_plan.as_ref())?.into(); + Ok(physical_plan.into()) + }) } pub fn repr_ascii(&self, simple: bool) -> PyResult { diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 9c2c7a9025..c72e381ff4 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext}; +use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; use crate::{ @@ -23,13 +23,14 @@ fn run_glob( limit: Option, io_client: Arc, runtime: Arc, + io_stats: Option, ) -> DaftResult> { let (_, parsed_glob_path) = parse_url(glob_path)?; let _rt_guard = runtime.enter(); runtime.block_on(async { Ok(io_client .as_ref() - .glob(&parsed_glob_path, None, None, limit, None) + .glob(&parsed_glob_path, None, None, limit, io_stats) .await? .into_iter() .map(|fm| fm.filepath) @@ -78,34 +79,35 @@ impl GlobScanOperator { Some(s) => s, None => { let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?; - let paths = run_glob(glob_path, Some(1), io_client.clone(), io_runtime)?; + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator::try_new schema inference for {glob_path}" + )); + let paths = run_glob( + glob_path, + Some(1), + io_client.clone(), + io_runtime, + Some(io_stats.clone()), + )?; let first_filepath = paths[0].as_str(); let inferred_schema = match file_format_config.as_ref() { FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, .. - }) => { - let io_stats = IOStatsContext::new(format!( - "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" - )); - daft_parquet::read::read_parquet_schema( - first_filepath, - io_client.clone(), - Some(io_stats), - ParquetSchemaInferenceOptions { - coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, - }, - )? - } + }) => daft_parquet::read::read_parquet_schema( + first_filepath, + io_client.clone(), + Some(io_stats), + ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + }, + )?, FileFormatConfig::Csv(CsvSourceConfig { delimiter, has_headers, double_quote, .. }) => { - let io_stats = IOStatsContext::new(format!( - "GlobScanOperator constructor read_csv_schema: for uri {first_filepath}" - )); let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( first_filepath, *has_headers, @@ -166,9 +168,19 @@ impl ScanOperator for GlobScanOperator { pushdowns: Pushdowns, ) -> DaftResult>>> { let (io_runtime, io_client) = get_io_client_and_runtime(self.storage_config.as_ref())?; + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator::to_scan_tasks for {}", + self.glob_path + )); // TODO: This runs the glob to exhaustion, but we should return an iterator instead - let files = run_glob(self.glob_path.as_str(), None, io_client, io_runtime)?; + let files = run_glob( + self.glob_path.as_str(), + None, + io_client, + io_runtime, + Some(io_stats), + )?; let file_format_config = self.file_format_config.clone(); let schema = self.schema.clone(); let storage_config = self.storage_config.clone();