Skip to content

Commit

Permalink
refactor: allow InMemory to take in non python based entries (#3554)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 authored Dec 18, 2024
1 parent 07752b8 commit 6602502
Show file tree
Hide file tree
Showing 34 changed files with 783 additions and 547 deletions.
35 changes: 26 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ common-daft-config = {path = "src/common/daft-config", default-features = false}
common-display = {path = "src/common/display", default-features = false}
common-file-formats = {path = "src/common/file-formats", default-features = false}
common-hashable-float-wrapper = {path = "src/common/hashable-float-wrapper", default-features = false}
common-partitioning = {path = "src/common/partitioning", default-features = false}
common-resource-request = {path = "src/common/resource-request", default-features = false}
common-runtime = {path = "src/common/runtime", default-features = false}
common-scan-info = {path = "src/common/scan-info", default-features = false}
Expand Down Expand Up @@ -47,18 +48,12 @@ sysinfo = {workspace = true}
# maturin will turn this on
python = [
"common-daft-config/python",
"common-daft-config/python",
"common-daft-config/python",
"common-display/python",
"common-display/python",
"common-display/python",
"common-resource-request/python",
"common-resource-request/python",
"common-partitioning/python",
"common-resource-request/python",
"common-file-formats/python",
"common-scan-info/python",
"common-system-info/python",
"common-system-info/python",
"common-system-info/python",
"daft-catalog-python-catalog/python",
"daft-catalog/python",
"daft-connect/python",
Expand All @@ -80,7 +75,6 @@ python = [
"daft-scheduler/python",
"daft-sql/python",
"daft-stats/python",
"daft-stats/python",
"daft-table/python",
"daft-writers/python",
"dep:daft-catalog-python-catalog",
Expand Down Expand Up @@ -177,7 +171,8 @@ members = [
"src/daft-connect",
"src/parquet2",
# "src/spark-connect-script",
"src/generated/spark-connect"
"src/generated/spark-connect",
"src/common/partitioning"
]

[workspace.dependencies]
Expand Down
17 changes: 17 additions & 0 deletions src/common/partitioning/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[dependencies]
common-py-serde = {path = "../py-serde", optional = true}
pyo3 = {workspace = true, optional = true}
common-error.workspace = true
futures.workspace = true
serde.workspace = true

[features]
python = ["dep:pyo3", "common-error/python", "common-py-serde"]

[lints]
workspace = true

[package]
name = "common-partitioning"
edition.workspace = true
version.workspace = true
167 changes: 167 additions & 0 deletions src/common/partitioning/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::{any::Any, sync::Arc};

use common_error::DaftResult;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
#[cfg(feature = "python")]
use {
common_py_serde::{deserialize_py_object, serialize_py_object},
pyo3::PyObject,
};

/// Common trait interface for dataset partitioning, defined in this shared crate to avoid circular dependencies.
/// Acts as a forward declaration for concrete partition implementations. _(Specifically the `MicroPartition` type defined in `daft-micropartition`)_
pub trait Partition: std::fmt::Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn size_bytes(&self) -> DaftResult<Option<usize>>;
}

impl<T> Partition for Arc<T>
where
T: Partition,
{
fn as_any(&self) -> &dyn Any {
(**self).as_any()
}
fn size_bytes(&self) -> DaftResult<Option<usize>> {
(**self).size_bytes()
}
}

/// An Arc'd reference to a [`Partition`]
pub type PartitionRef = Arc<dyn Partition>;

/// Key used to identify a partition
pub type PartitionId = usize;

/// ported over from `daft/runners/partitioning.py`
// TODO: port over the rest of the functionality
#[derive(Debug, Clone)]
pub struct PartitionMetadata {
pub num_rows: usize,
pub size_bytes: usize,
}

/// A partition set is a collection of partitions.
/// It is up to the implementation to decide how to store and manage the partition batches.
/// For example, an in memory partition set could likely be stored as `HashMap<PartitionId, PartitionBatchRef<T>>`.
///
/// It is important to note that the methods do not take `&mut self` but instead take `&self`.
/// So it is up to the implementation to manage any interior mutability.
pub trait PartitionSet<T: Partition>: std::fmt::Debug + Send + Sync {
/// Merge all micropartitions into a single micropartition
fn get_merged_partitions(&self) -> DaftResult<PartitionRef>;
/// Get a preview of the micropartitions
fn get_preview_partitions(&self, num_rows: usize) -> DaftResult<Vec<T>>;
/// Number of partitions
fn num_partitions(&self) -> usize;
fn len(&self) -> usize;
/// Check if the partition set is empty
fn is_empty(&self) -> bool {
self.len() == 0
}
/// Size of the partition set in bytes
fn size_bytes(&self) -> DaftResult<usize>;
/// Check if a partition exists
fn has_partition(&self, idx: &PartitionId) -> bool;
/// Delete a partition
fn delete_partition(&self, idx: &PartitionId) -> DaftResult<()>;
/// Set a partition
fn set_partition(&self, idx: PartitionId, part: &T) -> DaftResult<()>;
/// Get a partition
fn get_partition(&self, idx: &PartitionId) -> DaftResult<T>;
/// Consume the partition set and return a stream of partitions
fn to_partition_stream(&self) -> BoxStream<'static, DaftResult<T>>;
fn metadata(&self) -> PartitionMetadata;
}

impl<P, PS> PartitionSet<P> for Arc<PS>
where
P: Partition + Clone,
PS: PartitionSet<P> + Clone,
{
fn get_merged_partitions(&self) -> DaftResult<PartitionRef> {
PS::get_merged_partitions(self)
}

fn get_preview_partitions(&self, num_rows: usize) -> DaftResult<Vec<P>> {
PS::get_preview_partitions(self, num_rows)
}

fn num_partitions(&self) -> usize {
PS::num_partitions(self)
}

fn len(&self) -> usize {
PS::len(self)
}

fn size_bytes(&self) -> DaftResult<usize> {
PS::size_bytes(self)
}

fn has_partition(&self, idx: &PartitionId) -> bool {
PS::has_partition(self, idx)
}

fn delete_partition(&self, idx: &PartitionId) -> DaftResult<()> {
PS::delete_partition(self, idx)
}

fn set_partition(&self, idx: PartitionId, part: &P) -> DaftResult<()> {
PS::set_partition(self, idx, part)
}

fn get_partition(&self, idx: &PartitionId) -> DaftResult<P> {
PS::get_partition(self, idx)
}

fn to_partition_stream(&self) -> BoxStream<'static, DaftResult<P>> {
PS::to_partition_stream(self)
}

fn metadata(&self) -> PartitionMetadata {
PS::metadata(self)
}
}

pub type PartitionSetRef<T> = Arc<dyn PartitionSet<T>>;

pub trait PartitionSetCache<P: Partition, PS: PartitionSet<P>>:
std::fmt::Debug + Send + Sync
{
fn get_partition_set(&self, key: &str) -> Option<PartitionSetRef<P>>;
fn get_all_partition_sets(&self) -> Vec<PartitionSetRef<P>>;
fn put_partition_set(&self, key: &str, partition_set: &PS);
fn rm_partition_set(&self, key: &str);
fn clear(&self);
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartitionCacheEntry {
#[serde(
serialize_with = "serialize_py_object",
deserialize_with = "deserialize_py_object"
)]
#[cfg(feature = "python")]
/// in python, the partition cache is a weakvalue dictionary, so it will store the entry as long as this reference exists.
Python(PyObject),

Rust {
key: String,
#[serde(skip)]
/// We don't ever actually reference the value, we're just holding it to ensure the partition set is kept alive.
///
/// It's only wrapped in an `Option` to satisfy serde Deserialize. We skip (de)serializing, but serde still complains if it's not an Option.
value: Option<Arc<dyn Any + Send + Sync + 'static>>,
},
}

impl PartitionCacheEntry {
pub fn new_rust<T: Any + Send + Sync + 'static>(key: String, value: Arc<T>) -> Self {
Self::Rust {
key,
value: Some(value),
}
}
}
1 change: 1 addition & 0 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::session::Session;
mod config;
mod err;
mod op;

mod session;
mod translation;
pub mod util;
Expand Down
Loading

0 comments on commit 6602502

Please sign in to comment.