Skip to content

Commit

Permalink
feat: Support writing partitioned parquet to cloud (#20590)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jan 8, 2025
1 parent 2a705b4 commit bd4dd65
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 57 deletions.
68 changes: 31 additions & 37 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,52 @@ use std::path::Path;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use polars_utils::create_file;
use rayon::prelude::*;

use crate::cloud::CloudOptions;
use crate::parquet::write::ParquetWriteOptions;
#[cfg(feature = "ipc")]
use crate::prelude::IpcWriterOptions;
use crate::prelude::URL_ENCODE_CHAR_SET;
use crate::{SerWriter, WriteDataFrameToFile};
use crate::utils::file::try_get_writeable;
use crate::{is_cloud_url, SerWriter, WriteDataFrameToFile};

impl WriteDataFrameToFile for ParquetWriteOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
fn write_df_to_file(
&self,
df: &mut DataFrame,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<()> {
let f = try_get_writeable(path, cloud_options)?;
self.to_writer(f).finish(df)?;
Ok(())
}
}

#[cfg(feature = "ipc")]
impl WriteDataFrameToFile for IpcWriterOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
fn write_df_to_file(
&self,
df: &mut DataFrame,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<()> {
let f = try_get_writeable(path, cloud_options)?;
self.to_writer(f).finish(df)?;
Ok(())
}
}

fn write_partitioned_dataset_impl<W>(
/// Write a partitioned parquet dataset. This functionality is unstable.
pub fn write_partitioned_dataset(
df: &mut DataFrame,
path: &Path,
partition_by: Vec<PlSmallStr>,
file_write_options: &W,
file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
cloud_options: Option<&CloudOptions>,
chunk_size: usize,
) -> PolarsResult<()>
where
W: WriteDataFrameToFile + Send + Sync,
{
let partition_by = partition_by.into_iter().collect::<Vec<PlSmallStr>>();
) -> PolarsResult<()> {
// Ensure we have a single chunk as the gather will otherwise rechunk per group.
df.as_single_chunk_par();

Expand Down Expand Up @@ -86,12 +97,16 @@ where
};

let base_path = path;
let is_cloud = is_cloud_url(base_path);
let groups = df.group_by(partition_by)?.take_groups();

let init_part_base_dir = |part_df: &DataFrame| {
let path_part = get_hive_path_part(part_df);
let dir = base_path.join(path_part);
std::fs::create_dir_all(&dir)?;

if !is_cloud {
std::fs::create_dir_all(&dir)?;
}

PolarsResult::Ok(dir)
};
Expand All @@ -107,9 +122,8 @@ where
(n_files, rows_per_file)
};

let write_part = |df: DataFrame, path: &Path| {
let f = create_file(path)?;
file_write_options.write_df_to_file(df, f)?;
let write_part = |mut df: DataFrame, path: &Path| {
file_write_options.write_df_to_file(&mut df, path.to_str().unwrap(), cloud_options)?;
PolarsResult::Ok(())
};

Expand Down Expand Up @@ -184,23 +198,3 @@ where

Ok(())
}

/// Write a partitioned parquet dataset. This functionality is unstable.
pub fn write_partitioned_dataset<I, S, W>(
df: &mut DataFrame,
path: &Path,
partition_by: I,
file_write_options: &W,
chunk_size: usize,
) -> PolarsResult<()>
where
I: IntoIterator<Item = S>,
S: Into<PlSmallStr>,
W: WriteDataFrameToFile + Send + Sync,
{
let partition_by = partition_by
.into_iter()
.map(Into::into)
.collect::<Vec<PlSmallStr>>();
write_partitioned_dataset_impl(df, path, partition_by, file_write_options, chunk_size)
}
8 changes: 7 additions & 1 deletion crates/polars-io/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::array::new_empty_array;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;

use crate::cloud::CloudOptions;
use crate::options::RowIndex;
#[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))]
use crate::predicates::PhysicalIoExpr;
Expand Down Expand Up @@ -41,7 +42,12 @@ where
}

pub trait WriteDataFrameToFile {
fn write_df_to_file<W: std::io::Write>(&self, df: DataFrame, file: W) -> PolarsResult<()>;
fn write_df_to_file(
&self,
df: &mut DataFrame,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<()>;
}

pub trait ArrowReader {
Expand Down
38 changes: 19 additions & 19 deletions crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,24 @@ impl PyDataFrame {

let compression = parse_parquet_compression(compression, compression_level)?;

#[cfg(feature = "cloud")]
let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
Some(
cloud_options
.with_max_retries(retries)
.with_credential_provider(
credential_provider.map(PlCredentialProvider::from_python_func_object),
),
)
} else {
None
};

#[cfg(not(feature = "cloud"))]
let cloud_options = None;

if let Some(partition_by) = partition_by {
// TODO: Support cloud
let path = py_f.extract::<String>(py)?;

py.allow_threads(|| {
Expand All @@ -447,8 +463,9 @@ impl PyDataFrame {
write_partitioned_dataset(
&mut self.df,
std::path::Path::new(path.as_str()),
partition_by.as_slice(),
partition_by.into_iter().map(|x| x.into()).collect(),
&write_options,
cloud_options.as_ref(),
partition_chunk_size_bytes,
)
.map_err(PyPolarsErr::from)
Expand All @@ -457,23 +474,6 @@ impl PyDataFrame {
return Ok(());
};

#[cfg(feature = "cloud")]
let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
Some(
cloud_options
.with_max_retries(retries)
.with_credential_provider(
credential_provider.map(PlCredentialProvider::from_python_func_object),
),
)
} else {
None
};

#[cfg(not(feature = "cloud"))]
let cloud_options = None;

let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;

py.allow_threads(|| {
Expand Down

0 comments on commit bd4dd65

Please sign in to comment.