Skip to content

Commit

Permalink
Merge branch 'main' into distinct-on-implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Oct 30, 2023
2 parents deff43f + 9b45967 commit 553f87a
Show file tree
Hide file tree
Showing 132 changed files with 4,294 additions and 2,592 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v3
- uses: actions/setup-node@v4
with:
node-version: "14"
node-version: "20"
- name: Prettier check
run: |
# if you encounter error, rerun the command below and commit the changes
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,9 @@ jobs:
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- uses: actions/setup-node@v3
- uses: actions/setup-node@v4
with:
node-version: "14"
node-version: "20"
- name: Check if configs.md has been modified
run: |
# If you encounter an error, run './dev/update_config_docs.sh' and commit
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ members = [
"datafusion/substrait",
"datafusion/wasmtest",
"datafusion-examples",
"docs",
"test-utils",
"benchmarks",
]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Default features:
- `compression`: reading files compressed with `xz2`, `bzip2`, `flate2`, and `zstd`
- `crypto_expressions`: cryptographic functions such as `md5` and `sha256`
- `encoding_expressions`: `encode` and `decode` functions
- `parquet`: support for reading the [Apache Parquet] format
- `regex_expressions`: regular expression functions, such as `regexp_match`
- `unicode_expressions`: Include unicode aware functions such as `character_length`

Expand All @@ -59,6 +60,7 @@ Optional features:
- `simd`: enable arrow-rs's manual `SIMD` kernels (requires Rust `nightly`)

[apache avro]: https://avro.apache.org/
[apache parquet]: https://parquet.apache.org/

## Rust Version Compatibility

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"] }
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ name = "datafusion-examples"
description = "DataFusion usage examples"
keywords = ["arrow", "query", "sql"]
publish = false
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
readme = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
Expand All @@ -36,7 +36,7 @@ arrow-schema = { workspace = true }
async-trait = "0.1.41"
bytes = "1.4"
dashmap = "5.4"
datafusion = { path = "../datafusion/core" }
datafusion = { path = "../datafusion/core", features = ["avro"] }
datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
Expand Down
6 changes: 1 addition & 5 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -270,8 +270,4 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
5 changes: 2 additions & 3 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
name = "datafusion-common"
description = "Common functionality for DataFusion query engine"
keywords = ["arrow", "query", "sql"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
readme = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
Expand All @@ -35,8 +35,7 @@ path = "src/lib.rs"
[features]
avro = ["apache-avro"]
backtrace = []
default = ["parquet"]
pyarrow = ["pyo3", "arrow/pyarrow"]
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
Expand Down
32 changes: 26 additions & 6 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,32 @@ config_namespace! {
pub bloom_filter_ndv: Option<u64>, default = None

/// Controls whether DataFusion will attempt to speed up writing
/// large parquet files by first writing multiple smaller files
/// and then stitching them together into a single large file.
/// This will result in faster write speeds, but higher memory usage.
/// Also currently unsupported are bloom filters and column indexes
/// when single_file_parallelism is enabled.
pub allow_single_file_parallelism: bool, default = false
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = true

/// By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
/// a performance benefit when writing large parquet files
/// by increasing maximum_parallel_row_group_writers and
/// maximum_buffered_record_batches_per_stream if your system
/// has idle cores and can tolerate additional memory usage.
/// Boosting these values is likely worthwhile when
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_parallel_row_group_writers: usize, default = 1

/// By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
/// a performance benefit when writing large parquet files
/// by increasing maximum_parallel_row_group_writers and
/// maximum_buffered_record_batches_per_stream if your system
/// has idle cores and can tolerate additional memory usage.
/// Boosting these values is likely worthwhile when
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

}
}
Expand Down
99 changes: 99 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,33 @@ impl DFSchema {
})
}

/// Returns true if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns false otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
}
let self_fields = self.fields().iter();
let other_fields = other.fields().iter();
self_fields.zip(other_fields).all(|(f1, f2)| {
f1.qualifier() == f2.qualifier()
&& f1.name() == f2.name()
&& Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
})
}

/// Returns true if the two schemas have the same qualified named
/// fields with the same data types. Returns false otherwise.
///
/// This is a specialized version of Eq that ignores differences
/// in nullability and metadata.
///
/// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
/// logical type checking, which for example would consider a dictionary
/// encoded UTF8 array to be equivalent to a plain UTF8 array.
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
Expand All @@ -409,6 +431,46 @@ impl DFSchema {
})
}

/// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
/// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
/// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
/// logically equal to Dictionary<K2, V1>.
fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
// check nested fields
match (dt1, dt2) {
(DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
v1.as_ref() == v2.as_ref()
}
(DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
(othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
(DataType::List(f1), DataType::List(f2))
| (DataType::LargeList(f1), DataType::LargeList(f2))
| (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
| (DataType::Map(f1, _), DataType::Map(f2, _)) => {
Self::field_is_logically_equal(f1, f2)
}
(DataType::Struct(fields1), DataType::Struct(fields2)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
iter1
.zip(iter2)
.all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
iter1
.zip(iter2)
.all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
}
_ => dt1 == dt2,
}
}

/// Returns true of two [`DataType`]s are semantically equal (same
/// name and type), ignoring both metadata and nullability.
///
Expand Down Expand Up @@ -444,10 +506,23 @@ impl DFSchema {
.zip(iter2)
.all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
}
(
DataType::Decimal128(_l_precision, _l_scale),
DataType::Decimal128(_r_precision, _r_scale),
) => true,
(
DataType::Decimal256(_l_precision, _l_scale),
DataType::Decimal256(_r_precision, _r_scale),
) => true,
_ => dt1 == dt2,
}
}

fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
}

fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
Expand Down Expand Up @@ -778,6 +853,13 @@ pub trait SchemaExt {
///
/// It works the same as [`DFSchema::equivalent_names_and_types`].
fn equivalent_names_and_types(&self, other: &Self) -> bool;

/// Returns true if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns false otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
}

impl SchemaExt for Schema {
Expand All @@ -797,6 +879,23 @@ impl SchemaExt for Schema {
)
})
}

fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
}

self.fields()
.iter()
.zip(other.fields().iter())
.all(|(f1, f2)| {
f1.name() == f2.name()
&& DFSchema::datatype_is_logically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
}

#[cfg(test)]
Expand Down
17 changes: 17 additions & 0 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,4 +558,21 @@ mod tests {
assert_eq!(iter.next(), Some(&Constraint::Unique(vec![20])));
assert_eq!(iter.next(), None);
}

#[test]
fn test_get_updated_id_keys() {
let fund_dependencies =
FunctionalDependencies::new(vec![FunctionalDependence::new(
vec![1],
vec![0, 1, 2],
true,
)]);
let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
vec![0],
vec![0, 1],
true,
)]);
assert_eq!(res, expected);
}
}
1 change: 1 addition & 0 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub fn arrow_test_data() -> String {
/// let filename = format!("{}/binary.parquet", testdata);
/// assert!(std::path::PathBuf::from(filename).exists());
/// ```
#[cfg(feature = "parquet")]
pub fn parquet_test_data() -> String {
match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") {
Ok(pb) => pb.display().to_string(),
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"]
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
encoding_expressions = ["datafusion-physical-expr/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
pyarrow = ["datafusion-common/pyarrow"]
parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
serde = ["arrow-schema/serde"]
simd = ["arrow/simd"]
Expand All @@ -61,7 +62,7 @@ bytes = "1.4"
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "32.0.0", features = ["parquet", "object_store"] }
datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false }
datafusion-execution = { path = "../execution", version = "32.0.0" }
datafusion-expr = { path = "../expr", version = "32.0.0" }
datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false }
Expand All @@ -80,7 +81,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.7.0"
parking_lot = "0.12"
parquet = { workspace = true }
parquet = { workspace = true, optional = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
Expand All @@ -93,7 +94,6 @@ uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
zstd = { version = "0.13", optional = true, default-features = false }


[dev-dependencies]
async-trait = "0.1.53"
bigdecimal = "0.4.1"
Expand Down
Loading

0 comments on commit 553f87a

Please sign in to comment.