Skip to content

Commit

Permalink
[FEAT] Anonymous Scan Operator (#1526)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 authored Oct 25, 2023
1 parent f547b86 commit a122029
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 8 deletions.
90 changes: 90 additions & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::fmt::Display;

use common_error::DaftResult;
use daft_core::schema::SchemaRef;

use crate::{DataFileSource, FileType, ScanOperator, ScanOperatorRef, ScanTask};
#[derive(Debug)]
pub struct AnonymousScanOperator {
schema: SchemaRef,
file_type: FileType,
files: Vec<String>,
columns_to_select: Option<Vec<String>>,
limit: Option<usize>,
}

impl AnonymousScanOperator {
pub fn new(schema: SchemaRef, file_type: FileType, files: Vec<String>) -> Self {
Self {
schema,
file_type,
files,
columns_to_select: None,
limit: None,
}
}
}

impl Display for AnonymousScanOperator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#?}", self)
}
}

impl ScanOperator for AnonymousScanOperator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn partitioning_keys(&self) -> &[daft_core::datatypes::Field] {
&[]
}

fn num_partitions(&self) -> common_error::DaftResult<usize> {
Ok(self.files.len())
}

fn select(self: Box<Self>, columns: &[&str]) -> common_error::DaftResult<ScanOperatorRef> {
for c in columns {
if self.schema.get_field(c).is_err() {
return Err(common_error::DaftError::FieldNotFound(format!(
"{c} not found in {:?}",
self.columns_to_select
)));
}
}
let mut to_rtn = self;
to_rtn.columns_to_select = Some(columns.iter().map(|s| s.to_string()).collect());
Ok(to_rtn)
}

fn limit(self: Box<Self>, num: usize) -> DaftResult<ScanOperatorRef> {
let mut to_rtn = self;
to_rtn.limit = Some(num);
Ok(to_rtn)
}

fn filter(self: Box<Self>, _predicate: &daft_dsl::Expr) -> DaftResult<(bool, ScanOperatorRef)> {
Ok((false, self))
}

fn to_scan_tasks(
self: Box<Self>,
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<crate::ScanTask>>>> {
let iter = self.files.clone().into_iter().map(move |f| {
let source = DataFileSource::AnonymousDataFile {
file_type: self.file_type,
path: f,
metadata: None,
partition_spec: None,
statistics: None,
};
Ok(ScanTask {
source,
columns: self.columns_to_select.clone(),
limit: self.limit,
})
});
Ok(Box::new(iter))
}
}
51 changes: 43 additions & 8 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,58 @@
use common_error::DaftResult;
use std::{
fmt::{Debug, Display},
str::FromStr,
};

use common_error::{DaftError, DaftResult};
use daft_core::{datatypes::Field, schema::SchemaRef};
use daft_dsl::Expr;
use daft_stats::{PartitionSpec, TableMetadata, TableStatistics};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
mod anonymous;
#[cfg(feature = "python")]
pub mod python;
#[cfg(feature = "python")]
pub use python::register_modules;

#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub enum FileType {
Parquet,
Avro,
Orc,
Csv,
}

impl FromStr for FileType {
type Err = DaftError;

fn from_str(file_type: &str) -> DaftResult<Self> {
use FileType::*;
if file_type.trim().eq_ignore_ascii_case("parquet") {
Ok(Parquet)
} else if file_type.trim().eq_ignore_ascii_case("avro") {
Ok(Avro)
} else if file_type.trim().eq_ignore_ascii_case("orc") {
Ok(Orc)
} else if file_type.trim().eq_ignore_ascii_case("csv") {
Ok(Csv)
} else {
Err(DaftError::TypeError(format!(
"FileType {} not supported!",
file_type
)))
}
}
}

#[derive(Serialize, Deserialize)]
pub enum DataFileSource {
AnonymousDataFile {
file_type: FileType,
path: String,
metadata: Option<TableMetadata>,
partition_spec: Option<PartitionSpec>,
statistics: Option<TableStatistics>,
},
CatalogDataFile {
file_type: FileType,
Expand All @@ -34,17 +69,17 @@ pub struct ScanTask {
source: DataFileSource,
columns: Option<Vec<String>>,
limit: Option<usize>,
filter: Option<Expr>,
}

pub trait ScanOperator {
pub trait ScanOperator: Send + Display {
fn schema(&self) -> SchemaRef;
fn partitioning_keys(&self) -> &[Field];
fn partition_spec(&self) -> Option<&PartitionSpec>;
fn num_partitions(&self) -> DaftResult<usize>;
fn filter(self: Box<Self>, predicate: &Expr) -> DaftResult<Box<Self>>;
fn select(self: Box<Self>, columns: &[&str]) -> DaftResult<Box<Self>>;
fn limit(self: Box<Self>, num: usize) -> DaftResult<Box<Self>>;

// also returns a bool to indicate if the scan operator can "absorb" the predicate
fn filter(self: Box<Self>, predicate: &Expr) -> DaftResult<(bool, ScanOperatorRef)>;
fn select(self: Box<Self>, columns: &[&str]) -> DaftResult<ScanOperatorRef>;
fn limit(self: Box<Self>, num: usize) -> DaftResult<ScanOperatorRef>;
fn to_scan_tasks(self: Box<Self>)
-> DaftResult<Box<dyn Iterator<Item = DaftResult<ScanTask>>>>;
}
Expand Down
46 changes: 46 additions & 0 deletions src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use pyo3::prelude::*;

pub mod pylib {
use pyo3::prelude::*;
use std::str::FromStr;

use daft_core::python::schema::PySchema;

use pyo3::pyclass;

use crate::anonymous::AnonymousScanOperator;
use crate::FileType;
use crate::ScanOperatorRef;

#[pyclass(module = "daft.daft", frozen)]
pub(crate) struct ScanOperator {
scan_op: ScanOperatorRef,
}

#[pymethods]
impl ScanOperator {
pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{}", self.scan_op))
}

#[staticmethod]
pub fn anonymous_scan(
schema: PySchema,
file_type: &str,
files: Vec<String>,
) -> PyResult<Self> {
let schema = schema.schema;
let operator = Box::new(AnonymousScanOperator::new(
schema,
FileType::from_str(file_type)?,
files,
));
Ok(ScanOperator { scan_op: operator })
}
}
}

pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<pylib::ScanOperator>()?;
Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod pylib {
daft_csv::register_modules(_py, m)?;
daft_plan::register_modules(_py, m)?;
daft_micropartition::register_modules(_py, m)?;
daft_scan::register_modules(_py, m)?;

m.add_wrapped(wrap_pyfunction!(version))?;
m.add_wrapped(wrap_pyfunction!(build_type))?;
Expand Down

0 comments on commit a122029

Please sign in to comment.