diff --git a/Cargo.lock b/Cargo.lock index 364b71f805..36e8fa2c79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1919,7 +1919,6 @@ dependencies = [ "lazy_static", "log", "md5", - "oneshot", "openssl-sys", "pyo3", "rand 0.8.5", diff --git a/src/common/runtime/src/compute.rs b/src/common/runtime/src/compute.rs deleted file mode 100644 index 8635b3a4ba..0000000000 --- a/src/common/runtime/src/compute.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::{ - future::Future, - panic::AssertUnwindSafe, - sync::{Arc, OnceLock}, -}; - -use common_error::{DaftError, DaftResult}; -use futures::FutureExt; -use tokio::task::JoinHandle; - -pub(crate) static COMPUTE_RUNTIME: OnceLock = OnceLock::new(); -pub type ComputeRuntimeRef = Arc; - -pub struct ComputeRuntime { - runtime: tokio::runtime::Runtime, -} - -impl ComputeRuntime { - pub(crate) fn new(runtime: tokio::runtime::Runtime) -> ComputeRuntimeRef { - Arc::new(Self { runtime }) - } - - async fn execute_task(future: F) -> DaftResult - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - AssertUnwindSafe(future).catch_unwind().await.map_err(|e| { - let s = if let Some(s) = e.downcast_ref::() { - s.clone() - } else if let Some(s) = e.downcast_ref::<&str>() { - (*s).to_string() - } else { - "unknown internal error".to_string() - }; - DaftError::ComputeError(format!( - "Caught panic when spawning blocking task in compute pool {s})" - )) - }) - } - - /// Similar to block_on, but is async and can be awaited - pub async fn await_on(&self, future: F) -> DaftResult - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let _join_handle = self.spawn(async move { - let task_output = Self::execute_task(future).await; - if tx.send(task_output).is_err() { - log::warn!("Spawned task output ignored: receiver dropped"); - } - }); - rx.await.expect("Spawned task transmitter dropped") - } - - fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.runtime.spawn(future) - } -} diff --git a/src/common/runtime/src/io.rs b/src/common/runtime/src/io.rs deleted file mode 100644 index e496717a52..0000000000 --- a/src/common/runtime/src/io.rs +++ /dev/null @@ -1,90 +0,0 @@ -use std::{ - future::Future, - panic::AssertUnwindSafe, - sync::{Arc, OnceLock}, -}; - -use common_error::{DaftError, DaftResult}; -use futures::FutureExt; -use tokio::task::JoinHandle; - -pub(crate) static THREADED_IO_RUNTIME: OnceLock = OnceLock::new(); -pub(crate) static SINGLE_THREADED_IO_RUNTIME: OnceLock = OnceLock::new(); - -pub type IORuntimeRef = Arc; - -pub struct IORuntime { - runtime: tokio::runtime::Runtime, -} - -impl IORuntime { - pub(crate) fn new(runtime: tokio::runtime::Runtime) -> IORuntimeRef { - Arc::new(Self { runtime }) - } - - async fn execute_task(future: F) -> DaftResult - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - AssertUnwindSafe(future).catch_unwind().await.map_err(|e| { - let s = if let Some(s) = e.downcast_ref::() { - s.clone() - } else if let Some(s) = e.downcast_ref::<&str>() { - (*s).to_string() - } else { - "unknown internal error".to_string() - }; - DaftError::ComputeError(format!( - "Caught panic when spawning blocking task in IO pool {s})" - )) - }) - } - - /// Similar to tokio's Runtime::block_on but requires static lifetime + Send - /// You should use this when you are spawning IO tasks from an Expression Evaluator or in the Executor - pub fn block_on(&self, future: F) -> DaftResult - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let _join_handle = self.spawn(async move { - let task_output = Self::execute_task(future).await; - if tx.send(task_output).is_err() { - log::warn!("Spawned task output ignored: receiver dropped"); - } - }); - rx.recv().expect("Spawned task transmitter dropped") - } - - /// Similar to block_on, but is async and can be awaited - pub async fn await_on(&self, future: F) -> DaftResult - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let _join_handle = self.spawn(async move { - let task_output = Self::execute_task(future).await; - if tx.send(task_output).is_err() { - log::warn!("Spawned task output ignored: receiver dropped"); - } - }); - rx.await.expect("Spawned task transmitter dropped") - } - - /// Blocks current thread to compute future. Can not be called in tokio runtime context - /// - pub fn block_on_current_thread(&self, future: F) -> F::Output { - self.runtime.block_on(future) - } - - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.runtime.spawn(future) - } -} diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index af423a37a1..d0c34f5b54 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -1,12 +1,16 @@ -pub mod compute; -pub mod io; +use std::{ + future::Future, + panic::AssertUnwindSafe, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, OnceLock, + }, +}; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use compute::{ComputeRuntime, ComputeRuntimeRef, COMPUTE_RUNTIME}; -use io::{IORuntime, IORuntimeRef, SINGLE_THREADED_IO_RUNTIME, THREADED_IO_RUNTIME}; +use common_error::{DaftError, DaftResult}; +use futures::FutureExt; use lazy_static::lazy_static; -use tokio::runtime::RuntimeFlavor; +use tokio::{runtime::RuntimeFlavor, task::JoinHandle}; lazy_static! { static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get(); @@ -15,7 +19,99 @@ lazy_static! { static ref COMPUTE_RUNTIME_MAX_BLOCKING_THREADS: usize = 1; // Compute thread should not use blocking threads, limit this to the minimum, i.e. 1 } -fn init_compute_runtime() -> ComputeRuntimeRef { +static THREADED_IO_RUNTIME: OnceLock = OnceLock::new(); +static SINGLE_THREADED_IO_RUNTIME: OnceLock = OnceLock::new(); +static COMPUTE_RUNTIME: OnceLock = OnceLock::new(); + +pub type RuntimeRef = Arc; + +#[derive(Debug, Clone, Copy)] +enum PoolType { + Compute, + IO, +} + +pub struct Runtime { + runtime: tokio::runtime::Runtime, + pool_type: PoolType, +} + +impl Runtime { + pub(crate) fn new(runtime: tokio::runtime::Runtime, pool_type: PoolType) -> RuntimeRef { + Arc::new(Self { runtime, pool_type }) + } + + async fn execute_task(future: F, pool_type: PoolType) -> DaftResult + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + AssertUnwindSafe(future).catch_unwind().await.map_err(|e| { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + (*s).to_string() + } else { + "unknown internal error".to_string() + }; + DaftError::ComputeError(format!( + "Caught panic when spawning blocking task in the {:?} runtime: {})", + pool_type, s + )) + }) + } + + /// Similar to tokio's Runtime::block_on but requires static lifetime + Send + /// You should use this when you are spawning IO tasks from an Expression Evaluator or in the Executor + pub fn block_on(&self, future: F) -> DaftResult + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let pool_type = self.pool_type; + let _join_handle = self.spawn(async move { + let task_output = Self::execute_task(future, pool_type).await; + if tx.send(task_output).is_err() { + log::warn!("Spawned task output ignored: receiver dropped"); + } + }); + rx.recv().expect("Spawned task transmitter dropped") + } + + /// Similar to block_on, but is async and can be awaited + pub async fn await_on(&self, future: F) -> DaftResult + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let pool_type = self.pool_type; + let _join_handle = self.spawn(async move { + let task_output = Self::execute_task(future, pool_type).await; + if tx.send(task_output).is_err() { + log::warn!("Spawned task output ignored: receiver dropped"); + } + }); + rx.await.expect("Spawned task transmitter dropped") + } + + /// Blocks current thread to compute future. Can not be called in tokio runtime context + /// + pub fn block_on_current_thread(&self, future: F) -> F::Output { + self.runtime.block_on(future) + } + + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.runtime.spawn(future) + } +} + +fn init_compute_runtime() -> RuntimeRef { std::thread::spawn(move || { let mut builder = tokio::runtime::Builder::new_multi_thread(); builder @@ -27,13 +123,13 @@ fn init_compute_runtime() -> ComputeRuntimeRef { format!("Compute-Thread-{}", id) }) .max_blocking_threads(*COMPUTE_RUNTIME_MAX_BLOCKING_THREADS); - ComputeRuntime::new(builder.build().unwrap()) + Runtime::new(builder.build().unwrap(), PoolType::Compute) }) .join() .unwrap() } -fn init_io_runtime(multi_thread: bool) -> IORuntimeRef { +fn init_io_runtime(multi_thread: bool) -> RuntimeRef { std::thread::spawn(move || { let mut builder = tokio::runtime::Builder::new_multi_thread(); builder @@ -48,17 +144,17 @@ fn init_io_runtime(multi_thread: bool) -> IORuntimeRef { let id = COMPUTE_THREAD_ATOMIC_ID.fetch_add(1, Ordering::SeqCst); format!("IO-Thread-{}", id) }); - IORuntime::new(builder.build().unwrap()) + Runtime::new(builder.build().unwrap(), PoolType::IO) }) .join() .unwrap() } -pub fn get_compute_runtime() -> ComputeRuntimeRef { +pub fn get_compute_runtime() -> RuntimeRef { COMPUTE_RUNTIME.get_or_init(init_compute_runtime).clone() } -pub fn get_io_runtime(multi_thread: bool) -> IORuntimeRef { +pub fn get_io_runtime(multi_thread: bool) -> RuntimeRef { if !multi_thread { SINGLE_THREADED_IO_RUNTIME .get_or_init(|| init_io_runtime(false)) diff --git a/src/daft-io/Cargo.toml b/src/daft-io/Cargo.toml index 16e9038f01..88f7ca85d0 100644 --- a/src/daft-io/Cargo.toml +++ b/src/daft-io/Cargo.toml @@ -27,7 +27,6 @@ hyper-tls = "0.5.0" itertools = {workspace = true} lazy_static = {workspace = true} log = {workspace = true} -oneshot = "0.1.8" openssl-sys = {version = "0.9.102", features = ["vendored"]} pyo3 = {workspace = true, optional = true} rand = "0.8.5" diff --git a/src/daft-local-execution/src/intermediate_ops/aggregate.rs b/src/daft-local-execution/src/intermediate_ops/aggregate.rs index dac61c445b..eb3d3bb495 100644 --- a/src/daft-local-execution/src/intermediate_ops/aggregate.rs +++ b/src/daft-local-execution/src/intermediate_ops/aggregate.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use tracing::instrument; use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -29,7 +29,7 @@ impl IntermediateOperator for AggregateOperator { &self, _idx: usize, input: &PipelineResultType, - _state: &mut dyn IntermediateOperatorState, + _state: &IntermediateOperatorStateWrapper, ) -> DaftResult { let out = input.as_data().agg(&self.agg_exprs, &self.group_by)?; Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new( diff --git a/src/daft-local-execution/src/intermediate_ops/anti_semi_hash_join_probe.rs b/src/daft-local-execution/src/intermediate_ops/anti_semi_hash_join_probe.rs index 69a5c6eb87..0ef6cd562a 100644 --- a/src/daft-local-execution/src/intermediate_ops/anti_semi_hash_join_probe.rs +++ b/src/daft-local-execution/src/intermediate_ops/anti_semi_hash_join_probe.rs @@ -10,6 +10,7 @@ use tracing::{info_span, instrument}; use super::intermediate_op::{ IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -108,9 +109,10 @@ impl IntermediateOperator for AntiSemiProbeOperator { &self, idx: usize, input: &PipelineResultType, - state: &mut dyn IntermediateOperatorState, + state: &IntermediateOperatorStateWrapper, ) -> DaftResult { - let state = state + let mut guard = state.inner.lock().unwrap(); + let state = guard .as_any_mut() .downcast_mut::() .expect("AntiSemiProbeOperator state should be AntiSemiProbeState"); diff --git a/src/daft-local-execution/src/intermediate_ops/filter.rs b/src/daft-local-execution/src/intermediate_ops/filter.rs index 3075235a29..960788dfe5 100644 --- a/src/daft-local-execution/src/intermediate_ops/filter.rs +++ b/src/daft-local-execution/src/intermediate_ops/filter.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use tracing::instrument; use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -25,7 +25,7 @@ impl IntermediateOperator for FilterOperator { &self, _idx: usize, input: &PipelineResultType, - _state: &mut dyn IntermediateOperatorState, + _state: &IntermediateOperatorStateWrapper, ) -> DaftResult { let out = input.as_data().filter(&[self.predicate.clone()])?; Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new( diff --git a/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs b/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs index 52e7bef936..7672115b66 100644 --- a/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs +++ b/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs @@ -10,6 +10,7 @@ use tracing::{info_span, instrument}; use super::intermediate_op::{ IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -164,9 +165,10 @@ impl IntermediateOperator for InnerHashJoinProbeOperator { &self, idx: usize, input: &PipelineResultType, - state: &mut dyn IntermediateOperatorState, + state: &IntermediateOperatorStateWrapper, ) -> DaftResult { - let state = state + let mut guard = state.inner.lock().unwrap(); + let state = guard .as_any_mut() .downcast_mut::() .expect("InnerHashJoinProbeOperator state should be InnerHashJoinProbeState"); diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 33f1cd7c3f..de1e4b6982 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -1,10 +1,9 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use common_display::tree::TreeDisplay; use common_error::DaftResult; use common_runtime::get_compute_runtime; use daft_micropartition::MicroPartition; -use tokio::sync::Mutex; use tracing::{info_span, instrument}; use super::buffer::OperatorBuffer; @@ -15,16 +14,29 @@ use crate::{ ExecutionRuntimeHandle, NUM_CPUS, }; -pub trait IntermediateOperatorState: Send + Sync { +pub(crate) trait IntermediateOperatorState: Send + Sync { fn as_any_mut(&mut self) -> &mut dyn std::any::Any; } -pub struct DefaultIntermediateOperatorState {} + +struct DefaultIntermediateOperatorState {} impl IntermediateOperatorState for DefaultIntermediateOperatorState { fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } } +pub(crate) struct IntermediateOperatorStateWrapper { + pub inner: Mutex>, +} + +impl IntermediateOperatorStateWrapper { + fn new(inner: Box) -> Arc { + Arc::new(Self { + inner: Mutex::new(inner), + }) + } +} + pub enum IntermediateOperatorResult { NeedMoreInput(Option>), #[allow(dead_code)] @@ -36,7 +48,7 @@ pub trait IntermediateOperator: Send + Sync { &self, idx: usize, input: &PipelineResultType, - state: &mut dyn IntermediateOperatorState, + state: &IntermediateOperatorStateWrapper, ) -> DaftResult; fn name(&self) -> &'static str; fn make_state(&self) -> Box { @@ -84,17 +96,16 @@ impl IntermediateNode { ) -> DaftResult<()> { let span = info_span!("IntermediateOp::execute"); let compute_runtime = get_compute_runtime(); - let state = Arc::new(Mutex::new(op.make_state())); + let state_wrapper = IntermediateOperatorStateWrapper::new(op.make_state()); while let Some((idx, morsel)) = receiver.recv().await { loop { - let state = state.clone(); let op = op.clone(); let morsel = morsel.clone(); let span = span.clone(); let rt_context = rt_context.clone(); + let state_wrapper = state_wrapper.clone(); let fut = async move { - let mut state_guard = state.lock().await; - rt_context.in_span(&span, || op.execute(idx, &morsel, state_guard.as_mut())) + rt_context.in_span(&span, || op.execute(idx, &morsel, &state_wrapper)) }; let result = compute_runtime.await_on(fut).await??; match result { diff --git a/src/daft-local-execution/src/intermediate_ops/pivot.rs b/src/daft-local-execution/src/intermediate_ops/pivot.rs index 9edbc91406..cc540cbf37 100644 --- a/src/daft-local-execution/src/intermediate_ops/pivot.rs +++ b/src/daft-local-execution/src/intermediate_ops/pivot.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use tracing::instrument; use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -38,7 +38,7 @@ impl IntermediateOperator for PivotOperator { &self, _idx: usize, input: &PipelineResultType, - _state: &mut dyn IntermediateOperatorState, + _state: &IntermediateOperatorStateWrapper, ) -> DaftResult { let out = input.as_data().pivot( &self.group_by, diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index 48c0681731..190a18ab0d 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use tracing::instrument; use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -25,7 +25,7 @@ impl IntermediateOperator for ProjectOperator { &self, _idx: usize, input: &PipelineResultType, - _state: &mut dyn IntermediateOperatorState, + _state: &IntermediateOperatorStateWrapper, ) -> DaftResult { let out = input.as_data().eval_expression_list(&self.projection)?; Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new( diff --git a/src/daft-local-execution/src/intermediate_ops/sample.rs b/src/daft-local-execution/src/intermediate_ops/sample.rs index 177b3c45b2..9123fd3efe 100644 --- a/src/daft-local-execution/src/intermediate_ops/sample.rs +++ b/src/daft-local-execution/src/intermediate_ops/sample.rs @@ -4,7 +4,7 @@ use common_error::DaftResult; use tracing::instrument; use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -30,7 +30,7 @@ impl IntermediateOperator for SampleOperator { &self, _idx: usize, input: &PipelineResultType, - _state: &mut dyn IntermediateOperatorState, + _state: &IntermediateOperatorStateWrapper, ) -> DaftResult { let out = input diff --git a/src/daft-local-execution/src/intermediate_ops/unpivot.rs b/src/daft-local-execution/src/intermediate_ops/unpivot.rs index cbeb99bd95..f927e12465 100644 --- a/src/daft-local-execution/src/intermediate_ops/unpivot.rs +++ b/src/daft-local-execution/src/intermediate_ops/unpivot.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use tracing::instrument; use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, + IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorStateWrapper, }; use crate::pipeline::PipelineResultType; @@ -38,7 +38,7 @@ impl IntermediateOperator for UnpivotOperator { &self, _idx: usize, input: &PipelineResultType, - _state: &mut dyn IntermediateOperatorState, + _state: &IntermediateOperatorStateWrapper, ) -> DaftResult { let out = input.as_data().unpivot( &self.ids, diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 06293219a7..883de475eb 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use common_file_formats::{CsvSourceConfig, FileFormat, FileFormatConfig, ParquetSourceConfig}; -use common_runtime::io::IORuntimeRef; +use common_runtime::RuntimeRef; use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; @@ -30,7 +30,7 @@ pub struct GlobScanOperator { /// Wrapper struct that implements a sync Iterator for a BoxStream struct BoxStreamIterator<'a, T> { boxstream: BoxStream<'a, T>, - runtime_handle: IORuntimeRef, + runtime_handle: RuntimeRef, } impl<'a, T> Iterator for BoxStreamIterator<'a, T> { @@ -69,7 +69,7 @@ fn run_glob( glob_path: &str, limit: Option, io_client: Arc, - runtime: IORuntimeRef, + runtime: RuntimeRef, io_stats: Option, file_format: FileFormat, ) -> DaftResult { @@ -94,7 +94,7 @@ fn run_glob( fn run_glob_parallel( glob_paths: Vec, io_client: Arc, - runtime: IORuntimeRef, + runtime: RuntimeRef, io_stats: Option, file_format: FileFormat, ) -> DaftResult>> { diff --git a/src/daft-scan/src/storage_config.rs b/src/daft-scan/src/storage_config.rs index e54a50107c..c502ae62dc 100644 --- a/src/daft-scan/src/storage_config.rs +++ b/src/daft-scan/src/storage_config.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use common_error::DaftResult; use common_io_config::IOConfig; use common_py_serde::impl_bincode_py_state_serialization; -use common_runtime::{get_io_runtime, io::IORuntimeRef}; +use common_runtime::{get_io_runtime, RuntimeRef}; use daft_io::{get_io_client, IOClient}; use serde::{Deserialize, Serialize}; #[cfg(feature = "python")] @@ -23,7 +23,7 @@ pub enum StorageConfig { } impl StorageConfig { - pub fn get_io_client_and_runtime(&self) -> DaftResult<(IORuntimeRef, Arc)> { + pub fn get_io_client_and_runtime(&self) -> DaftResult<(RuntimeRef, Arc)> { // Grab an IOClient and Runtime // TODO: This should be cleaned up and hidden behind a better API from daft-io match self {