From 5c00dbc28d8febfe9a520c17685c2d38879a10a6 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 15 Nov 2024 10:16:24 +0800 Subject: [PATCH] [FEAT] Lance writes for swordfish (#3299) This PR implements Lance writes for swordfish. The scaffolding for writes was merged in: https://github.com/Eventual-Inc/Daft/pull/2992, and so this one simply adds the lance writes functionality. Co-authored-by: Colin Ho --- src/daft-local-execution/src/pipeline.rs | 17 ++++ src/daft-local-execution/src/sinks/write.rs | 2 + src/daft-local-plan/src/lib.rs | 2 + src/daft-local-plan/src/plan.rs | 31 ++++++- src/daft-local-plan/src/translate.rs | 25 +++-- src/daft-writers/src/catalog.rs | 2 +- src/daft-writers/src/lance.rs | 92 +++++++++++++++++++ src/daft-writers/src/lib.rs | 7 +- src/daft-writers/src/physical.rs | 4 +- .../src/{python.rs => pyarrow.rs} | 0 tests/io/lancedb/test_lancedb_writes.py | 9 +- 11 files changed, 170 insertions(+), 21 deletions(-) create mode 100644 src/daft-writers/src/lance.rs rename src/daft-writers/src/{python.rs => pyarrow.rs} (100%) diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 83aa411e09..98767b239e 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -531,6 +531,23 @@ pub fn physical_plan_to_pipeline( ); BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed() } + #[cfg(feature = "python")] + LocalPhysicalPlan::LanceWrite(daft_local_plan::LanceWrite { + input, + lance_info, + file_schema, + .. + }) => { + let child_node = physical_plan_to_pipeline(input, psets, cfg)?; + let writer_factory = daft_writers::make_lance_writer_factory(lance_info.clone()); + let write_sink = WriteSink::new( + WriteFormat::Lance, + writer_factory, + None, + file_schema.clone(), + ); + BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed() + } }; Ok(out) diff --git a/src/daft-local-execution/src/sinks/write.rs b/src/daft-local-execution/src/sinks/write.rs index 4a0975bff3..96a2ccd843 100644 --- a/src/daft-local-execution/src/sinks/write.rs +++ b/src/daft-local-execution/src/sinks/write.rs @@ -24,6 +24,7 @@ pub enum WriteFormat { PartitionedIceberg, Deltalake, PartitionedDeltalake, + Lance, } struct WriteState { @@ -114,6 +115,7 @@ impl BlockingSink for WriteSink { WriteFormat::PartitionedIceberg => "PartitionedIcebergSink", WriteFormat::Deltalake => "DeltalakeSink", WriteFormat::PartitionedDeltalake => "PartitionedDeltalakeSink", + WriteFormat::Lance => "LanceSink", } } diff --git a/src/daft-local-plan/src/lib.rs b/src/daft-local-plan/src/lib.rs index 67d1eecb2a..77fd75e708 100644 --- a/src/daft-local-plan/src/lib.rs +++ b/src/daft-local-plan/src/lib.rs @@ -4,6 +4,8 @@ mod translate; #[cfg(feature = "python")] pub use plan::CatalogWrite; +#[cfg(feature = "python")] +pub use plan::LanceWrite; pub use plan::{ ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, PhysicalScan, PhysicalWrite, Pivot, Project, diff --git a/src/daft-local-plan/src/plan.rs b/src/daft-local-plan/src/plan.rs index d0c14a4985..cf1cd91a97 100644 --- a/src/daft-local-plan/src/plan.rs +++ b/src/daft-local-plan/src/plan.rs @@ -40,8 +40,8 @@ pub enum LocalPhysicalPlan { // TabularWriteCsv(TabularWriteCsv), #[cfg(feature = "python")] CatalogWrite(CatalogWrite), - // #[cfg(feature = "python")] - // LanceWrite(LanceWrite), + #[cfg(feature = "python")] + LanceWrite(LanceWrite), } impl LocalPhysicalPlan { @@ -324,6 +324,23 @@ impl LocalPhysicalPlan { .arced() } + #[cfg(feature = "python")] + pub(crate) fn lance_write( + input: LocalPhysicalPlanRef, + lance_info: daft_logical_plan::LanceCatalogInfo, + data_schema: SchemaRef, + file_schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + Self::LanceWrite(LanceWrite { + input, + lance_info, + data_schema, + file_schema, + plan_stats: PlanStats {}, + }) + .arced() + } + pub fn schema(&self) -> &SchemaRef { match self { Self::PhysicalScan(PhysicalScan { schema, .. }) @@ -504,5 +521,15 @@ pub struct CatalogWrite { pub plan_stats: PlanStats, } +#[cfg(feature = "python")] +#[derive(Debug)] +pub struct LanceWrite { + pub input: LocalPhysicalPlanRef, + pub lance_info: daft_logical_plan::LanceCatalogInfo, + pub data_schema: SchemaRef, + pub file_schema: SchemaRef, + pub plan_stats: PlanStats, +} + #[derive(Debug)] pub struct PlanStats {} diff --git a/src/daft-local-plan/src/translate.rs b/src/daft-local-plan/src/translate.rs index b04bea550e..de77ce5d70 100644 --- a/src/daft-local-plan/src/translate.rs +++ b/src/daft-local-plan/src/translate.rs @@ -172,12 +172,25 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { info.clone(), )), #[cfg(feature = "python")] - SinkInfo::CatalogInfo(info) => Ok(LocalPhysicalPlan::catalog_write( - input, - info.catalog.clone(), - data_schema, - sink.schema.clone(), - )), + SinkInfo::CatalogInfo(info) => match &info.catalog { + daft_logical_plan::CatalogType::DeltaLake(..) + | daft_logical_plan::CatalogType::Iceberg(..) => { + Ok(LocalPhysicalPlan::catalog_write( + input, + info.catalog.clone(), + data_schema, + sink.schema.clone(), + )) + } + daft_logical_plan::CatalogType::Lance(info) => { + Ok(LocalPhysicalPlan::lance_write( + input, + info.clone(), + data_schema, + sink.schema.clone(), + )) + } + }, } } LogicalPlan::Explode(explode) => { diff --git a/src/daft-writers/src/catalog.rs b/src/daft-writers/src/catalog.rs index 10ca700e27..1520e2ad8f 100644 --- a/src/daft-writers/src/catalog.rs +++ b/src/daft-writers/src/catalog.rs @@ -5,7 +5,7 @@ use daft_logical_plan::{CatalogType, DeltaLakeCatalogInfo, IcebergCatalogInfo}; use daft_micropartition::MicroPartition; use daft_table::Table; -use crate::{python::PyArrowWriter, FileWriter, WriterFactory}; +use crate::{pyarrow::PyArrowWriter, FileWriter, WriterFactory}; /// CatalogWriterFactory is a factory for creating Catalog writers, i.e. iceberg, delta writers. pub struct CatalogWriterFactory { diff --git a/src/daft-writers/src/lance.rs b/src/daft-writers/src/lance.rs new file mode 100644 index 0000000000..66387a67af --- /dev/null +++ b/src/daft-writers/src/lance.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_logical_plan::LanceCatalogInfo; +use daft_micropartition::{python::PyMicroPartition, MicroPartition}; +use daft_table::{python::PyTable, Table}; +use pyo3::{types::PyAnyMethods, Python}; + +use crate::{FileWriter, WriterFactory}; + +pub struct LanceWriter { + is_closed: bool, + lance_info: LanceCatalogInfo, + results: Vec, +} + +impl LanceWriter { + pub fn new(lance_info: LanceCatalogInfo) -> Self { + Self { + is_closed: false, + lance_info, + results: vec![], + } + } +} + +impl FileWriter for LanceWriter { + type Input = Arc; + type Result = Vec
; + + fn write(&mut self, data: &Self::Input) -> DaftResult<()> { + assert!(!self.is_closed, "Cannot write to a closed LanceWriter"); + Python::with_gil(|py| { + let py_micropartition = py + .import_bound(pyo3::intern!(py, "daft.table"))? + .getattr(pyo3::intern!(py, "MicroPartition"))? + .getattr(pyo3::intern!(py, "_from_pymicropartition"))? + .call1((PyMicroPartition::from(data.clone()),))?; + let written_fragments: PyTable = py + .import_bound(pyo3::intern!(py, "daft.table.table_io"))? + .getattr(pyo3::intern!(py, "write_lance"))? + .call1(( + py_micropartition, + &self.lance_info.path, + &self.lance_info.mode, + self.lance_info + .io_config + .as_ref() + .map(|cfg| daft_io::python::IOConfig { + config: cfg.clone(), + }), + &self.lance_info.kwargs, + ))? + .getattr(pyo3::intern!(py, "to_table"))? + .call0()? + .getattr(pyo3::intern!(py, "_table"))? + .extract()?; + self.results.push(written_fragments.into()); + Ok(()) + }) + } + + fn close(&mut self) -> DaftResult { + self.is_closed = true; + Ok(std::mem::take(&mut self.results)) + } +} + +pub fn make_lance_writer_factory( + lance_info: LanceCatalogInfo, +) -> Arc, Result = Vec
>> { + Arc::new(LanceWriterFactory { lance_info }) +} + +pub struct LanceWriterFactory { + pub lance_info: LanceCatalogInfo, +} + +impl WriterFactory for LanceWriterFactory { + type Input = Arc; + + type Result = Vec
; + + fn create_writer( + &self, + _file_idx: usize, + _partition_values: Option<&Table>, + ) -> DaftResult>> { + let writer = LanceWriter::new(self.lance_info.clone()); + Ok(Box::new(writer)) + } +} diff --git a/src/daft-writers/src/lib.rs b/src/daft-writers/src/lib.rs index cc369415f8..4ef30c6ddb 100644 --- a/src/daft-writers/src/lib.rs +++ b/src/daft-writers/src/lib.rs @@ -11,7 +11,9 @@ mod test; #[cfg(feature = "python")] mod catalog; #[cfg(feature = "python")] -mod python; +mod lance; +#[cfg(feature = "python")] +mod pyarrow; use std::{cmp::min, sync::Arc}; @@ -25,9 +27,10 @@ use daft_logical_plan::OutputFileInfo; use daft_micropartition::MicroPartition; use daft_table::Table; use file::TargetFileSizeWriterFactory; +#[cfg(feature = "python")] +pub use lance::make_lance_writer_factory; use partition::PartitionedWriterFactory; use physical::PhysicalWriterFactory; - /// This trait is used to abstract the writing of data to a file. /// The `Input` type is the type of data that will be written to the file. /// The `Result` type is the type of the result that will be returned when the file is closed. diff --git a/src/daft-writers/src/physical.rs b/src/daft-writers/src/physical.rs index fbfad4482d..fb5e3ca5d6 100644 --- a/src/daft-writers/src/physical.rs +++ b/src/daft-writers/src/physical.rs @@ -59,7 +59,7 @@ pub fn create_pyarrow_file_writer( ) -> DaftResult, Result = Option
>>> { match format { #[cfg(feature = "python")] - FileFormat::Parquet => Ok(Box::new(crate::python::PyArrowWriter::new_parquet_writer( + FileFormat::Parquet => Ok(Box::new(crate::pyarrow::PyArrowWriter::new_parquet_writer( root_dir, file_idx, compression, @@ -67,7 +67,7 @@ pub fn create_pyarrow_file_writer( partition, )?)), #[cfg(feature = "python")] - FileFormat::Csv => Ok(Box::new(crate::python::PyArrowWriter::new_csv_writer( + FileFormat::Csv => Ok(Box::new(crate::pyarrow::PyArrowWriter::new_csv_writer( root_dir, file_idx, io_config, partition, )?)), _ => Err(DaftError::ComputeError( diff --git a/src/daft-writers/src/python.rs b/src/daft-writers/src/pyarrow.rs similarity index 100% rename from src/daft-writers/src/python.rs rename to src/daft-writers/src/pyarrow.rs diff --git a/tests/io/lancedb/test_lancedb_writes.py b/tests/io/lancedb/test_lancedb_writes.py index 7fefb8f92e..e9f1ce6fd7 100644 --- a/tests/io/lancedb/test_lancedb_writes.py +++ b/tests/io/lancedb/test_lancedb_writes.py @@ -4,12 +4,6 @@ import pytest import daft -from tests.conftest import get_tests_daft_runner_name - -native_executor_skip = pytest.mark.skipif( - get_tests_daft_runner_name() == "native", - reason="Native executor fails for these tests", -) TABLE_NAME = "my_table" data = { @@ -20,10 +14,9 @@ PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) PY_LE_3_9_0 = sys.version_info < (3, 9) -py_version_and_arrow_skip = pytest.mark.skipif( +pytestmark = pytest.mark.skipif( PYARROW_LE_8_0_0 or PY_LE_3_9_0, reason="lance only supported if pyarrow >= 8.0.0 and python >= 3.9.0" ) -pytestmark = [native_executor_skip, py_version_and_arrow_skip] @pytest.fixture(scope="function")