Skip to content

Commit

Permalink
[FEAT] Lance writes for swordfish (#3299)
Browse files Browse the repository at this point in the history
This PR implements Lance writes for swordfish. 

The scaffolding for writes was merged in:
#2992, and so this one simply
adds the lance writes functionality.

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Nov 15, 2024
1 parent 88bf83f commit 5c00dbc
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 21 deletions.
17 changes: 17 additions & 0 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,23 @@ pub fn physical_plan_to_pipeline(
);
BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed()
}
#[cfg(feature = "python")]
LocalPhysicalPlan::LanceWrite(daft_local_plan::LanceWrite {
input,
lance_info,
file_schema,
..
}) => {
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
let writer_factory = daft_writers::make_lance_writer_factory(lance_info.clone());
let write_sink = WriteSink::new(
WriteFormat::Lance,
writer_factory,
None,
file_schema.clone(),
);
BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed()
}
};

Ok(out)
Expand Down
2 changes: 2 additions & 0 deletions src/daft-local-execution/src/sinks/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum WriteFormat {
PartitionedIceberg,
Deltalake,
PartitionedDeltalake,
Lance,
}

struct WriteState {
Expand Down Expand Up @@ -114,6 +115,7 @@ impl BlockingSink for WriteSink {
WriteFormat::PartitionedIceberg => "PartitionedIcebergSink",
WriteFormat::Deltalake => "DeltalakeSink",
WriteFormat::PartitionedDeltalake => "PartitionedDeltalakeSink",
WriteFormat::Lance => "LanceSink",
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/daft-local-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ mod translate;

#[cfg(feature = "python")]
pub use plan::CatalogWrite;
#[cfg(feature = "python")]
pub use plan::LanceWrite;
pub use plan::{
ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan,
Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, PhysicalScan, PhysicalWrite, Pivot, Project,
Expand Down
31 changes: 29 additions & 2 deletions src/daft-local-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ pub enum LocalPhysicalPlan {
// TabularWriteCsv(TabularWriteCsv),
#[cfg(feature = "python")]
CatalogWrite(CatalogWrite),
// #[cfg(feature = "python")]
// LanceWrite(LanceWrite),
#[cfg(feature = "python")]
LanceWrite(LanceWrite),
}

impl LocalPhysicalPlan {
Expand Down Expand Up @@ -324,6 +324,23 @@ impl LocalPhysicalPlan {
.arced()
}

#[cfg(feature = "python")]
pub(crate) fn lance_write(
input: LocalPhysicalPlanRef,
lance_info: daft_logical_plan::LanceCatalogInfo,
data_schema: SchemaRef,
file_schema: SchemaRef,
) -> LocalPhysicalPlanRef {
Self::LanceWrite(LanceWrite {
input,
lance_info,
data_schema,
file_schema,
plan_stats: PlanStats {},
})
.arced()
}

pub fn schema(&self) -> &SchemaRef {
match self {
Self::PhysicalScan(PhysicalScan { schema, .. })
Expand Down Expand Up @@ -504,5 +521,15 @@ pub struct CatalogWrite {
pub plan_stats: PlanStats,
}

#[cfg(feature = "python")]
#[derive(Debug)]
pub struct LanceWrite {
pub input: LocalPhysicalPlanRef,
pub lance_info: daft_logical_plan::LanceCatalogInfo,
pub data_schema: SchemaRef,
pub file_schema: SchemaRef,
pub plan_stats: PlanStats,
}

#[derive(Debug)]
pub struct PlanStats {}
25 changes: 19 additions & 6 deletions src/daft-local-plan/src/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,25 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
info.clone(),
)),
#[cfg(feature = "python")]
SinkInfo::CatalogInfo(info) => Ok(LocalPhysicalPlan::catalog_write(
input,
info.catalog.clone(),
data_schema,
sink.schema.clone(),
)),
SinkInfo::CatalogInfo(info) => match &info.catalog {
daft_logical_plan::CatalogType::DeltaLake(..)
| daft_logical_plan::CatalogType::Iceberg(..) => {
Ok(LocalPhysicalPlan::catalog_write(
input,
info.catalog.clone(),
data_schema,
sink.schema.clone(),
))
}
daft_logical_plan::CatalogType::Lance(info) => {
Ok(LocalPhysicalPlan::lance_write(
input,
info.clone(),
data_schema,
sink.schema.clone(),
))
}
},
}
}
LogicalPlan::Explode(explode) => {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-writers/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use daft_logical_plan::{CatalogType, DeltaLakeCatalogInfo, IcebergCatalogInfo};
use daft_micropartition::MicroPartition;
use daft_table::Table;

use crate::{python::PyArrowWriter, FileWriter, WriterFactory};
use crate::{pyarrow::PyArrowWriter, FileWriter, WriterFactory};

/// CatalogWriterFactory is a factory for creating Catalog writers, i.e. iceberg, delta writers.
pub struct CatalogWriterFactory {
Expand Down
92 changes: 92 additions & 0 deletions src/daft-writers/src/lance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_logical_plan::LanceCatalogInfo;
use daft_micropartition::{python::PyMicroPartition, MicroPartition};
use daft_table::{python::PyTable, Table};
use pyo3::{types::PyAnyMethods, Python};

use crate::{FileWriter, WriterFactory};

pub struct LanceWriter {
is_closed: bool,
lance_info: LanceCatalogInfo,
results: Vec<Table>,
}

impl LanceWriter {
pub fn new(lance_info: LanceCatalogInfo) -> Self {
Self {
is_closed: false,
lance_info,
results: vec![],
}
}
}

impl FileWriter for LanceWriter {
type Input = Arc<MicroPartition>;
type Result = Vec<Table>;

fn write(&mut self, data: &Self::Input) -> DaftResult<()> {
assert!(!self.is_closed, "Cannot write to a closed LanceWriter");
Python::with_gil(|py| {
let py_micropartition = py
.import_bound(pyo3::intern!(py, "daft.table"))?
.getattr(pyo3::intern!(py, "MicroPartition"))?
.getattr(pyo3::intern!(py, "_from_pymicropartition"))?
.call1((PyMicroPartition::from(data.clone()),))?;
let written_fragments: PyTable = py
.import_bound(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "write_lance"))?
.call1((
py_micropartition,
&self.lance_info.path,
&self.lance_info.mode,
self.lance_info
.io_config
.as_ref()
.map(|cfg| daft_io::python::IOConfig {
config: cfg.clone(),
}),
&self.lance_info.kwargs,
))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()?;
self.results.push(written_fragments.into());
Ok(())
})
}

fn close(&mut self) -> DaftResult<Self::Result> {
self.is_closed = true;
Ok(std::mem::take(&mut self.results))
}
}

pub fn make_lance_writer_factory(
lance_info: LanceCatalogInfo,
) -> Arc<dyn WriterFactory<Input = Arc<MicroPartition>, Result = Vec<Table>>> {
Arc::new(LanceWriterFactory { lance_info })
}

pub struct LanceWriterFactory {
pub lance_info: LanceCatalogInfo,
}

impl WriterFactory for LanceWriterFactory {
type Input = Arc<MicroPartition>;

type Result = Vec<Table>;

fn create_writer(
&self,
_file_idx: usize,
_partition_values: Option<&Table>,
) -> DaftResult<Box<dyn FileWriter<Input = Self::Input, Result = Self::Result>>> {
let writer = LanceWriter::new(self.lance_info.clone());
Ok(Box::new(writer))
}
}
7 changes: 5 additions & 2 deletions src/daft-writers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ mod test;
#[cfg(feature = "python")]
mod catalog;
#[cfg(feature = "python")]
mod python;
mod lance;
#[cfg(feature = "python")]
mod pyarrow;

use std::{cmp::min, sync::Arc};

Expand All @@ -25,9 +27,10 @@ use daft_logical_plan::OutputFileInfo;
use daft_micropartition::MicroPartition;
use daft_table::Table;
use file::TargetFileSizeWriterFactory;
#[cfg(feature = "python")]
pub use lance::make_lance_writer_factory;
use partition::PartitionedWriterFactory;
use physical::PhysicalWriterFactory;

/// This trait is used to abstract the writing of data to a file.
/// The `Input` type is the type of data that will be written to the file.
/// The `Result` type is the type of the result that will be returned when the file is closed.
Expand Down
4 changes: 2 additions & 2 deletions src/daft-writers/src/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ pub fn create_pyarrow_file_writer(
) -> DaftResult<Box<dyn FileWriter<Input = Arc<MicroPartition>, Result = Option<Table>>>> {
match format {
#[cfg(feature = "python")]
FileFormat::Parquet => Ok(Box::new(crate::python::PyArrowWriter::new_parquet_writer(
FileFormat::Parquet => Ok(Box::new(crate::pyarrow::PyArrowWriter::new_parquet_writer(
root_dir,
file_idx,
compression,
io_config,
partition,
)?)),
#[cfg(feature = "python")]
FileFormat::Csv => Ok(Box::new(crate::python::PyArrowWriter::new_csv_writer(
FileFormat::Csv => Ok(Box::new(crate::pyarrow::PyArrowWriter::new_csv_writer(
root_dir, file_idx, io_config, partition,
)?)),
_ => Err(DaftError::ComputeError(
Expand Down
File renamed without changes.
9 changes: 1 addition & 8 deletions tests/io/lancedb/test_lancedb_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
import pytest

import daft
from tests.conftest import get_tests_daft_runner_name

native_executor_skip = pytest.mark.skipif(
get_tests_daft_runner_name() == "native",
reason="Native executor fails for these tests",
)

TABLE_NAME = "my_table"
data = {
Expand All @@ -20,10 +14,9 @@

PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0)
PY_LE_3_9_0 = sys.version_info < (3, 9)
py_version_and_arrow_skip = pytest.mark.skipif(
pytestmark = pytest.mark.skipif(
PYARROW_LE_8_0_0 or PY_LE_3_9_0, reason="lance only supported if pyarrow >= 8.0.0 and python >= 3.9.0"
)
pytestmark = [native_executor_skip, py_version_and_arrow_skip]


@pytest.fixture(scope="function")
Expand Down

0 comments on commit 5c00dbc

Please sign in to comment.