Skip to content

Commit

Permalink
[BUG] Fix Deadlock with ScanOperators in to_physical_plan_scheduler
Browse files Browse the repository at this point in the history
… and show iostats for glob and from_scan_task (#1581)
  • Loading branch information
samster25 authored Nov 7, 2023
1 parent da120a3 commit f80dc85
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
8 changes: 7 additions & 1 deletion src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ impl PyMicroPartition {
#[staticmethod]
pub fn from_scan_task(scan_task: PyScanTask, py: Python) -> PyResult<Self> {
Ok(py
.allow_threads(|| MicroPartition::from_scan_task(scan_task.into(), None))?
.allow_threads(|| {
let io_stats = IOStatsContext::new(format!(
"MicroPartition::from_scan_task for {:?}",
scan_task.0.sources
));
MicroPartition::from_scan_task(scan_task.into(), Some(io_stats))
})?
.into())
}

Expand Down
10 changes: 6 additions & 4 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,12 @@ impl PyLogicalPlanBuilder {
/// Finalize the logical plan, translate the logical plan to a physical plan, and return
/// a physical plan scheduler that's capable of launching the work necessary to compute the output
/// of the physical plan.
pub fn to_physical_plan_scheduler(&self) -> PyResult<PhysicalPlanScheduler> {
let logical_plan = self.builder.build();
let physical_plan: Arc<PhysicalPlan> = plan(logical_plan.as_ref())?.into();
Ok(physical_plan.into())
pub fn to_physical_plan_scheduler(&self, py: Python) -> PyResult<PhysicalPlanScheduler> {
py.allow_threads(|| {
let logical_plan = self.builder.build();
let physical_plan: Arc<PhysicalPlan> = plan(logical_plan.as_ref())?.into();
Ok(physical_plan.into())
})
}

pub fn repr_ascii(&self, simple: bool) -> PyResult<String> {
Expand Down
52 changes: 32 additions & 20 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc};

use common_error::DaftResult;
use daft_core::schema::SchemaRef;
use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext};
use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext, IOStatsRef};
use daft_parquet::read::ParquetSchemaInferenceOptions;

use crate::{
Expand All @@ -23,13 +23,14 @@ fn run_glob(
limit: Option<usize>,
io_client: Arc<IOClient>,
runtime: Arc<tokio::runtime::Runtime>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<Vec<String>> {
let (_, parsed_glob_path) = parse_url(glob_path)?;
let _rt_guard = runtime.enter();
runtime.block_on(async {
Ok(io_client
.as_ref()
.glob(&parsed_glob_path, None, None, limit, None)
.glob(&parsed_glob_path, None, None, limit, io_stats)
.await?
.into_iter()
.map(|fm| fm.filepath)
Expand Down Expand Up @@ -78,34 +79,35 @@ impl GlobScanOperator {
Some(s) => s,
None => {
let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?;
let paths = run_glob(glob_path, Some(1), io_client.clone(), io_runtime)?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::try_new schema inference for {glob_path}"
));
let paths = run_glob(
glob_path,
Some(1),
io_client.clone(),
io_runtime,
Some(io_stats.clone()),
)?;
let first_filepath = paths[0].as_str();
let inferred_schema = match file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
..
}) => {
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}"
));
daft_parquet::read::read_parquet_schema(
first_filepath,
io_client.clone(),
Some(io_stats),
ParquetSchemaInferenceOptions {
coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit,
},
)?
}
}) => daft_parquet::read::read_parquet_schema(
first_filepath,
io_client.clone(),
Some(io_stats),
ParquetSchemaInferenceOptions {
coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit,
},
)?,
FileFormatConfig::Csv(CsvSourceConfig {
delimiter,
has_headers,
double_quote,
..
}) => {
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator constructor read_csv_schema: for uri {first_filepath}"
));
let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema(
first_filepath,
*has_headers,
Expand Down Expand Up @@ -166,9 +168,19 @@ impl ScanOperator for GlobScanOperator {
pushdowns: Pushdowns,
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<ScanTask>>>> {
let (io_runtime, io_client) = get_io_client_and_runtime(self.storage_config.as_ref())?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::to_scan_tasks for {}",
self.glob_path
));

// TODO: This runs the glob to exhaustion, but we should return an iterator instead
let files = run_glob(self.glob_path.as_str(), None, io_client, io_runtime)?;
let files = run_glob(
self.glob_path.as_str(),
None,
io_client,
io_runtime,
Some(io_stats),
)?;
let file_format_config = self.file_format_config.clone();
let schema = self.schema.clone();
let storage_config = self.storage_config.clone();
Expand Down

0 comments on commit f80dc85

Please sign in to comment.