Skip to content

Commit

Permalink
fix typo and reduce 1 alloc
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Sep 26, 2024
1 parent 3321272 commit 355c05d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 29 deletions.
2 changes: 1 addition & 1 deletion daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def read_parquet(
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Include the source path(s) as a column called
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
Defaults to None, which will let Daft decide based on the runner it is currently using.
Expand Down
13 changes: 7 additions & 6 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn read_csv_bulk(
multithreaded_io: bool,
max_chunks_in_flight: Option<usize>,
num_parallel_tasks: usize,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = get_runtime(multithreaded_io)?;
let tables = runtime_handle.block_on_current_thread(async move {
Expand All @@ -101,7 +101,7 @@ pub fn read_csv_bulk(
read_options.clone(),
io_client.clone(),
io_stats.clone(),
file_path_column.clone(),
file_path_column.map(|s| s.to_string()),
);
tokio::task::spawn(async move {
read_csv_single_into_table(
Expand All @@ -112,7 +112,7 @@ pub fn read_csv_bulk(
io_client,
io_stats,
max_chunks_in_flight,
file_path_column,
file_path_column.as_deref(),
)
.await
})
Expand Down Expand Up @@ -220,7 +220,7 @@ async fn read_csv_single_into_table(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let predicate = convert_options
.as_ref()
Expand Down Expand Up @@ -340,9 +340,10 @@ async fn read_csv_single_into_table(
}
}?;

Check warning on line 341 in src/daft-csv/src/read.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-csv/src/read.rs#L341

Added line #L341 was not covered by tests
if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name.as_str(),
std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()),
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(output_table.len()),
)
.into_series();
return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
Expand Down
7 changes: 4 additions & 3 deletions src/daft-json/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub fn read_json_local(
parse_options: Option<JsonParseOptions>,
read_options: Option<JsonReadOptions>,
max_chunks_in_flight: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let uri = uri.trim_start_matches("file://");
let file = std::fs::File::open(uri)?;
Expand All @@ -46,9 +46,10 @@ pub fn read_json_local(
)?;
let output_table = reader.finish()?;
if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name.as_str(),
std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()),
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(output_table.len()),
)
.into_series();
return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
Expand Down
13 changes: 7 additions & 6 deletions src/daft-json/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn read_json_bulk(
multithreaded_io: bool,
max_chunks_in_flight: Option<usize>,
num_parallel_tasks: usize,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = get_runtime(multithreaded_io)?;
let tables = runtime_handle.block_on_current_thread(async move {
Expand All @@ -94,7 +94,7 @@ pub fn read_json_bulk(
read_options.clone(),
io_client.clone(),
io_stats.clone(),
file_path_column.clone(),
file_path_column.map(|s| s.to_string()),
);
tokio::task::spawn(async move {
let table = read_json_single_into_table(
Expand All @@ -105,7 +105,7 @@ pub fn read_json_bulk(
io_client,
io_stats,
max_chunks_in_flight,
file_path_column,
file_path_column.as_deref(),
)
.await?;
DaftResult::Ok(table)
Expand Down Expand Up @@ -189,7 +189,7 @@ async fn read_json_single_into_table(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let (source_type, fixed_uri) = parse_url(uri)?;
let is_compressed = CompressionCodec::from_uri(uri).is_some();
Expand Down Expand Up @@ -303,9 +303,10 @@ async fn read_json_single_into_table(
}
}?;

Check warning on line 304 in src/daft-json/src/read.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-json/src/read.rs#L304

Added line #L304 was not covered by tests
if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name.as_str(),
std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(output_table.len()),
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(output_table.len()),
)
.into_series();
return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);

Check warning on line 312 in src/daft-json/src/read.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-json/src/read.rs#L306-L312

Added lines #L306 - L312 were not covered by tests
Expand Down
14 changes: 7 additions & 7 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ fn materialize_scan_task(
metadatas,
Some(delete_map),
*chunk_size,
scan_task.file_path_column.clone(),
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -236,7 +236,7 @@ fn materialize_scan_task(
native_storage_config.multithreaded_io,
None,
8,
scan_task.file_path_column.clone(),
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -267,7 +267,7 @@ fn materialize_scan_task(
native_storage_config.multithreaded_io,
None,
8,
scan_task.file_path_column.clone(),
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -656,7 +656,7 @@ impl MicroPartition {
field_id_mapping.clone(),
parquet_metadata,
chunk_size,
scan_task.file_path_column.clone(),
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ fn _read_parquet_into_loaded_micropartition<T: AsRef<str>>(
catalog_provided_schema: Option<SchemaRef>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
chunk_size: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<MicroPartition> {
let delete_map = iceberg_delete_files
.map(|files| {
Expand Down Expand Up @@ -1125,7 +1125,7 @@ pub(crate) fn read_parquet_into_micropartition<T: AsRef<str>>(
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
parquet_metadata: Option<Vec<Arc<FileMetaData>>>,
chunk_size: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset
&& so > 0
Expand Down Expand Up @@ -1315,7 +1315,7 @@ pub(crate) fn read_parquet_into_micropartition<T: AsRef<str>>(
}),
num_rows,
),
file_path_column,
file_path_column.map(|s| s.to_string()),
);

let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());
Expand Down
13 changes: 7 additions & 6 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async fn read_parquet_single(
metadata: Option<Arc<FileMetaData>>,
delete_rows: Option<Vec<i64>>,
chunk_size: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let field_id_mapping_provided = field_id_mapping.is_some();
let mut columns_to_read = columns.clone();
Expand Down Expand Up @@ -358,9 +358,10 @@ async fn read_parquet_single(
}

if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name.as_str(),
std::iter::repeat(Some(uri.trim_start_matches("file://"))).take(table.len()),
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(table.len()),
)
.into_series();
return table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
Expand Down Expand Up @@ -762,7 +763,7 @@ pub fn read_parquet_bulk<T: AsRef<str>>(
metadata: Option<Vec<Arc<FileMetaData>>>,
delete_map: Option<HashMap<String, Vec<i64>>>,
chunk_size: Option<usize>,
file_path_column: Option<String>,
file_path_column: Option<&str>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = daft_io::get_runtime(multithreaded_io)?;

Expand Down Expand Up @@ -790,7 +791,7 @@ pub fn read_parquet_bulk<T: AsRef<str>>(
let schema_infer_options = *schema_infer_options;
let owned_field_id_mapping = field_id_mapping.clone();
let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned());
let owned_file_path_column = file_path_column.clone();
let owned_file_path_column = file_path_column.map(|s| s.to_string());
tokio::task::spawn(async move {
read_parquet_single(
&uri,
Expand All @@ -806,7 +807,7 @@ pub fn read_parquet_bulk<T: AsRef<str>>(
metadata,
delete_rows,
chunk_size,
owned_file_path_column,
owned_file_path_column.as_deref(),
)
.await
})
Expand Down

0 comments on commit 355c05d

Please sign in to comment.