diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 47e88d9afd..f7231fb17a 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1056,9 +1056,11 @@ def url_download( ) -> PyExpr: ... def url_upload( expr: PyExpr, - folder_location: str, + folder_location: PyExpr, max_connections: int, + raise_error_on_failure: bool, multi_thread: bool, + is_single_folder: bool, io_config: IOConfig | None, ) -> PyExpr: ... def tokenize_encode( diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 1dfd4730a7..d190f7801e 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -1381,21 +1381,28 @@ def download( def upload( self, - location: str, + location: str | Expression, max_connections: int = 32, + on_error: Literal["raise", "null"] = "raise", io_config: IOConfig | None = None, ) -> Expression: - """Uploads a column of binary data to the provided location (also supports S3, local etc). + """Uploads a column of binary data to the provided location(s) (also supports S3, local etc). - Files will be written into the location (folder) with a generated UUID filename, and the result + Files will be written into the location (folder(s)) with a generated UUID filename, and the result will be returned as a column of string paths that is compatible with the ``.url.download()`` Expression. Example: >>> col("data").url.upload("s3://my-bucket/my-folder") # doctest: +SKIP + Upload to row-specific URLs + + >>> col("data").url.upload(col("paths")) # doctest: +SKIP + Args: - location: a folder location to upload data into + location: a folder location or column of folder locations to upload data into max_connections: The maximum number of connections to use per thread to use for uploading data. Defaults to 32. + on_error: Behavior when a URL upload error is encountered - "raise" to raise the error immediately or "null" to log + the error but fallback to a Null value. Defaults to "raise". io_config: IOConfig to use when uploading data Returns: @@ -1404,10 +1411,29 @@ def upload( if not (isinstance(max_connections, int) and max_connections > 0): raise ValueError(f"Invalid value for `max_connections`: {max_connections}") + location_expr = Expression._to_expression(location) + raise_on_error = False + if on_error == "raise": + raise_on_error = True + elif on_error == "null": + raise_on_error = False + else: + raise NotImplementedError(f"Unimplemented on_error option: {on_error}.") multi_thread = ExpressionUrlNamespace._should_use_multithreading_tokio_runtime() + # If the user specifies a single location via a string, we should upload to a single folder. Otherwise, + # if the user gave an expression, we assume that each row has a specific url to upload to. + is_single_folder = isinstance(location, str) io_config = ExpressionUrlNamespace._override_io_config_max_connections(max_connections, io_config) return Expression._from_pyexpr( - native.url_upload(self._expr, location, max_connections, multi_thread, io_config) + native.url_upload( + self._expr, + location_expr._expr, + max_connections, + raise_on_error, + multi_thread, + is_single_folder, + io_config, + ) ) diff --git a/src/daft-functions/src/python/uri.rs b/src/daft-functions/src/python/uri.rs index a7f23f2d3d..65f7e1fc8c 100644 --- a/src/daft-functions/src/python/uri.rs +++ b/src/daft-functions/src/python/uri.rs @@ -29,9 +29,11 @@ pub fn url_download( #[pyfunction] pub fn url_upload( expr: PyExpr, - folder_location: &str, + folder_location: PyExpr, max_connections: i64, + raise_error_on_failure: bool, multi_thread: bool, + is_single_folder: bool, io_config: Option, ) -> PyResult { if max_connections <= 0 { @@ -41,9 +43,11 @@ pub fn url_upload( } Ok(crate::uri::upload( expr.into(), - folder_location, + folder_location.into(), max_connections as usize, + raise_error_on_failure, multi_thread, + is_single_folder, io_config.map(|io_config| io_config.config), ) .into()) diff --git a/src/daft-functions/src/uri/mod.rs b/src/daft-functions/src/uri/mod.rs index cb74a6f045..67418fa1df 100644 --- a/src/daft-functions/src/uri/mod.rs +++ b/src/daft-functions/src/uri/mod.rs @@ -29,19 +29,22 @@ pub fn download( #[must_use] pub fn upload( input: ExprRef, - location: &str, + location: ExprRef, max_connections: usize, + raise_error_on_failure: bool, multi_thread: bool, + is_single_folder: bool, config: Option, ) -> ExprRef { ScalarFunction::new( UploadFunction { - location: location.to_string(), max_connections, + raise_error_on_failure, multi_thread, + is_single_folder, config: config.unwrap_or_default().into(), }, - vec![input], + vec![input, location], ) .into() } diff --git a/src/daft-functions/src/uri/upload.rs b/src/daft-functions/src/uri/upload.rs index 1ad91b888b..5b01858b94 100644 --- a/src/daft-functions/src/uri/upload.rs +++ b/src/daft-functions/src/uri/upload.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, iter::repeat, path::Path, sync::Arc}; use common_error::{DaftError, DaftResult}; use common_runtime::get_io_runtime; @@ -10,9 +10,10 @@ use serde::Serialize; #[derive(Debug, Clone, Serialize, serde::Deserialize, PartialEq, Eq, Hash)] pub(super) struct UploadFunction { - pub(super) location: String, pub(super) max_connections: usize, + pub(super) raise_error_on_failure: bool, pub(super) multi_thread: bool, + pub(super) is_single_folder: bool, pub(super) config: Arc, } @@ -28,23 +29,26 @@ impl ScalarUDF for UploadFunction { fn evaluate(&self, inputs: &[Series]) -> DaftResult { let Self { - location, config, max_connections, + raise_error_on_failure, multi_thread, + is_single_folder, } = self; match inputs { - [data] => url_upload( + [data, location] => url_upload( data, location, *max_connections, + *raise_error_on_failure, *multi_thread, + *is_single_folder, config.clone(), None, ), _ => Err(DaftError::ValueError(format!( - "Expected 1 input args, got {}", + "Expected 2 input args, got {}", inputs.len() ))), } @@ -52,35 +56,128 @@ impl ScalarUDF for UploadFunction { fn to_field(&self, inputs: &[ExprRef], schema: &Schema) -> DaftResult { match inputs { - [data] => { + [data, location] => { let data_field = data.to_field(schema)?; + let location_field = location.to_field(schema)?; match data_field.dtype { - DataType::Binary | DataType::FixedSizeBinary(..) | DataType::Utf8 => Ok(Field::new(data_field.name, DataType::Utf8)), - _ => Err(DaftError::TypeError(format!("Expects input to url_upload to be Binary, FixedSizeBinary or String, but received {data_field}"))), + DataType::Binary | DataType::FixedSizeBinary(..) | DataType::Utf8 => (), + _ => return Err(DaftError::TypeError(format!("Expects input to url_upload to be Binary, FixedSizeBinary or String, but received {data_field}"))), } + if !location_field.dtype.is_string() { + return Err(DaftError::TypeError(format!( + "Expected location to be string, received: {}", + location_field.dtype + ))); + } + Ok(Field::new(data_field.name, DataType::Utf8)) } _ => Err(DaftError::SchemaMismatch(format!( - "Expected 1 input arg, got {}", + "Expected 2 input args, got {}", inputs.len() ))), } } } +/// Helper function that takes a given folder path, a boolean `is_single_folder` that indicates if we were given a single folder +/// for uploading files or if we were given a row-specific path, and a set of instantiated folder paths, then creates the folder +/// if they has not yet been instantiated. +/// +/// This function also parses the given folder path and trims '/' from the end of the path. +fn instantiate_and_trim_path( + folder_path: &str, + is_single_folder: bool, + instantiated_folder_paths: &mut HashSet, +) -> DaftResult { + // HACK: Creates folders if running locally. This is a bit of a hack to do it here because we'd rather delegate this to + // the appropriate source. However, most sources such as the object stores don't have the concept of "folders". + let (source, folder_path) = daft_io::parse_url(folder_path)?; + if matches!(source, SourceType::File) { + let local_prefixless_folder_path = match folder_path.strip_prefix("file://") { + Some(p) => p, + None => folder_path.as_ref(), + }; + if is_single_folder { + // If we were given a single folder, create a directory at the given path. + if instantiated_folder_paths.insert(local_prefixless_folder_path.to_string()) { + std::fs::create_dir_all(local_prefixless_folder_path).map_err(|e| { + daft_io::Error::UnableToCreateDir { + path: local_prefixless_folder_path.to_string(), + source: e, + } + })?; + } + } else { + // If we were given row-specific paths, then we create directories at the parents of the given paths. + let path = Path::new(local_prefixless_folder_path); + if let Some(parent_dir) = path.parent() { + if let Some(parent_dir) = parent_dir.to_str() { + if instantiated_folder_paths.insert(parent_dir.to_string()) { + std::fs::create_dir_all(parent_dir).map_err(|e| { + daft_io::Error::UnableToCreateDir { + path: parent_dir.to_string(), + source: e, + } + })?; + } + } + } + } + } + Ok(folder_path.trim_end_matches('/').to_string()) +} + +/// Helper function that takes an utf8 array of folder paths that may have either 1 or `len` paths, +/// and returns a Vec of `len` folder paths. This function will also instantiate and trim the given +/// folder paths as needed. +/// +/// If `is_single_folder` is set, the prepared path is repeated `len` times. +/// Otherwise, we return the array of prepared paths. +fn prepare_folder_paths( + arr: &Utf8Array, + len: usize, + is_single_folder: bool, +) -> DaftResult> { + let mut instantiated_folder_paths = HashSet::new(); + if is_single_folder { + let folder_path = arr.get(0).unwrap(); + let folder_path = + instantiate_and_trim_path(folder_path, true, &mut instantiated_folder_paths)?; + Ok(repeat(folder_path).take(len).collect()) + } else { + debug_assert_eq!(arr.len(), len); + Ok(arr + .as_arrow() + .iter() + .map(|folder_path| { + instantiate_and_trim_path( + folder_path.unwrap(), + false, + &mut instantiated_folder_paths, + ) + }) + .collect::>>()?) + } +} + /// Uploads data from a Binary/FixedSizeBinary/Utf8 Series to the provided folder_path /// /// This performs an async upload of each row, and creates in-memory copies of the data that is currently in-flight. /// Memory consumption should be tunable by configuring `max_connections`, which tunes the number of in-flight tokio tasks. +#[allow(clippy::too_many_arguments)] pub fn url_upload( series: &Series, - folder_path: &str, + folder_paths: &Series, max_connections: usize, + raise_error_on_failure: bool, multi_thread: bool, + is_single_folder: bool, config: Arc, io_stats: Option, ) -> DaftResult { + #[allow(clippy::too_many_arguments)] fn _upload_bytes_to_folder( - folder_path: &str, + folder_path_iter: Vec, // TODO: We can further optimize this for larger rows by using instead an Iterator // This would allow us to iteratively copy smaller chunks of data and feed it to the AWS SDKs, instead // of materializing the entire row at once as a single bytes::Bytes. @@ -89,51 +186,50 @@ pub fn url_upload( // arrow2 buffer, without making a copy. This would be the ideal case. to_upload: Vec>, max_connections: usize, + raise_error_on_failure: bool, multi_thread: bool, + is_single_folder: bool, config: Arc, io_stats: Option, ) -> DaftResult>> { - // HACK: Creates folders if running locally. This is a bit of a hack to do it here because we'd rather delegate this to - // the appropriate source. However, most sources such as the object stores don't have the concept of "folders". - let (source, folder_path) = daft_io::parse_url(folder_path)?; - if matches!(source, SourceType::File) { - let local_prefixless_folder_path = match folder_path.strip_prefix("file://") { - Some(p) => p, - None => folder_path.as_ref(), - }; - - std::fs::create_dir_all(local_prefixless_folder_path).map_err(|e| { - daft_io::Error::UnableToCreateDir { - path: folder_path.as_ref().to_string(), - source: e, - } - })?; - } - let runtime_handle = get_io_runtime(multi_thread); let max_connections = match multi_thread { false => max_connections, true => max_connections * usize::from(std::thread::available_parallelism()?), }; let io_client = get_io_client(multi_thread, config)?; - let folder_path = folder_path.as_ref().trim_end_matches('/').to_string(); let uploads = async move { - futures::stream::iter(to_upload.into_iter().enumerate().map(|(i, data)| { - let owned_client = io_client.clone(); - let owned_io_stats = io_stats.clone(); - - // TODO: Allow configuration of this path (e.g. providing a file extension, or a corresponding Series with matching length with filenames) - let path = format!("{}/{}", folder_path, uuid::Uuid::new_v4()); - tokio::spawn(async move { - ( - i, - owned_client - .single_url_upload(i, path, data, owned_io_stats) - .await, - ) - }) - })) + futures::stream::iter(to_upload.into_iter().zip(folder_path_iter).enumerate().map( + |(i, (data, folder_path))| { + let owned_client = io_client.clone(); + let owned_io_stats = io_stats.clone(); + + // TODO: Allow configuration of the folder path (e.g. providing a file extension, or a corresponding Series with matching length with filenames) + + // If the user specifies a single location via a string, we should upload to a single folder by appending a UUID to each path. Otherwise, + // if the user gave an expression, we assume that each row has a specific url to upload to. + let path = if is_single_folder { + format!("{}/{}", folder_path, uuid::Uuid::new_v4()) + } else { + folder_path + }; + tokio::spawn(async move { + ( + i, + owned_client + .single_url_upload( + i, + path, + data, + raise_error_on_failure, + owned_io_stats, + ) + .await, + ) + }) + }, + )) .buffer_unordered(max_connections) .then(async move |r| match r { Ok((i, Ok(v))) => Ok((i, v)), @@ -150,6 +246,9 @@ pub fn url_upload( Ok(results.into_iter().map(|(_, path)| path).collect()) } + let folder_path_series = folder_paths.cast(&DataType::Utf8)?; + let folder_path_arr = folder_path_series.utf8().unwrap(); + let folder_path_arr = prepare_folder_paths(folder_path_arr, series.len(), is_single_folder)?; let results = match series.data_type() { DataType::Binary => { let bytes_array = series @@ -160,10 +259,12 @@ pub fn url_upload( .map(|v| v.map(|b| bytes::Bytes::from(b.to_vec()))) .collect(); _upload_bytes_to_folder( - folder_path, + folder_path_arr, bytes_array, max_connections, + raise_error_on_failure, multi_thread, + is_single_folder, config, io_stats, ) @@ -177,10 +278,12 @@ pub fn url_upload( .map(|v| v.map(|b| bytes::Bytes::from(b.to_vec()))) .collect(); _upload_bytes_to_folder( - folder_path, + folder_path_arr, bytes_array, max_connections, + raise_error_on_failure, multi_thread, + is_single_folder, config, io_stats, ) @@ -194,10 +297,12 @@ pub fn url_upload( .map(|utf8_slice| utf8_slice.map(|s| bytes::Bytes::from(s.as_bytes().to_vec()))) .collect(); _upload_bytes_to_folder( - folder_path, + folder_path_arr, bytes_array, max_connections, + raise_error_on_failure, multi_thread, + is_single_folder, config, io_stats, ) diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 5530b17882..641ef5f763 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -317,9 +317,9 @@ impl IOClient { Err(err) } else { log::warn!( - "Error occurred during url_download at index: {index} {} (falling back to Null)", - err - ); + "Error occurred during url_download at index: {index} {} (falling back to Null)", + err + ); Ok(None) } } @@ -332,6 +332,7 @@ impl IOClient { index: usize, dest: String, data: Option, + raise_error_on_failure: bool, io_stats: Option, ) -> Result> { let value = if let Some(data) = data { @@ -344,11 +345,15 @@ impl IOClient { match value { Some(Ok(())) => Ok(Some(dest)), Some(Err(err)) => { - log::warn!( - "Error occurred during file upload at index: {index} {} (falling back to Null)", - err - ); - Err(err) + if raise_error_on_failure { + Err(err) + } else { + log::warn!( + "Error occurred during file upload at index: {index} {} (falling back to Null)", + err + ); + Ok(None) + } } None => Ok(None), } diff --git a/tests/io/test_url_upload_local.py b/tests/io/test_url_upload_local.py index 2f3fcbd5e3..3a23d0d8a5 100644 --- a/tests/io/test_url_upload_local.py +++ b/tests/io/test_url_upload_local.py @@ -1,5 +1,7 @@ from __future__ import annotations +import pytest + import daft @@ -7,7 +9,49 @@ def test_upload_local(tmpdir): bytes_data = [b"a", b"b", b"c"] data = {"data": bytes_data} df = daft.from_pydict(data) - df = df.with_column("files", df["data"].url.upload(str(tmpdir))) + df = df.with_column("files", df["data"].url.upload(str(tmpdir + "/nested"))) + df.collect() + + results = df.to_pydict() + assert results["data"] == bytes_data + assert len(results["files"]) == len(bytes_data) + for path, expected in zip(results["files"], bytes_data): + assert path.startswith("file://") + path = path[len("file://") :] + with open(path, "rb") as f: + assert f.read() == expected + + +def test_upload_local_single_file_url(tmpdir): + bytes_data = [b"a"] + paths = [f"{tmpdir}/0"] + data = {"data": bytes_data, "paths": paths} + df = daft.from_pydict(data) + # Even though there is only one row, since we pass in the upload URL via an expression, we + # should treat the given path as a per-row path and write directly to that path, instead of + # treating the path as a directory and writing to `{path}/uuid`. + df = df.with_column("files", df["data"].url.upload(df["paths"])) + df.collect() + + results = df.to_pydict() + assert results["data"] == bytes_data + assert len(results["files"]) == len(bytes_data) + for path, expected in zip(results["files"], bytes_data): + assert path.startswith("file://") + path = path[len("file://") :] + with open(path, "rb") as f: + assert f.read() == expected + # Check that data was uploaded to the correct paths. + for path, expected in zip(results["files"], paths): + assert path == "file://" + expected + + +def test_upload_local_row_specifc_urls(tmpdir): + bytes_data = [b"a", b"b", b"c"] + paths = [f"{tmpdir}/0", f"{tmpdir}/1", f"{tmpdir}/2"] + data = {"data": bytes_data, "paths": paths} + df = daft.from_pydict(data) + df = df.with_column("files", df["data"].url.upload(df["paths"])) df.collect() results = df.to_pydict() @@ -18,3 +62,30 @@ def test_upload_local(tmpdir): path = path[len("file://") :] with open(path, "rb") as f: assert f.read() == expected + # Check that data was uploaded to the correct paths. + for path, expected in zip(results["files"], paths): + assert path == "file://" + expected + + +def test_upload_local_no_write_permissions(tmpdir): + bytes_data = [b"a", b"b", b"c"] + # We have no write permissions to the first and third paths. + paths = ["/some-root-path", f"{tmpdir}/normal_path", "/another-bad-path"] + expected_paths = [None, f"file://{tmpdir}/normal_path", None] + expected_data = b"b" + data = {"data": bytes_data, "paths": paths} + df = daft.from_pydict(data) + df_raise_error = df.with_column("files", df["data"].url.upload(df["paths"])) + with pytest.raises(ValueError, match="Unable to write data to file"): + df_raise_error.collect() + # Retry with `on_error` set to `null`. + df_null = df.with_column("files", df["data"].url.upload(df["paths"], on_error="null")) + df_null.collect() + results = df_null.to_pydict() + for path, expected_path in zip(results["files"], expected_paths): + assert (path is None and expected_path is None) or path == expected_path + if path is not None: + assert path.startswith("file://") + path = path[len("file://") :] + with open(path, "rb") as f: + assert f.read() == expected_data