Skip to content

Commit

Permalink
Merge pull request #736 from splitgraph/datafusion-43-upgrade
Browse files Browse the repository at this point in the history
Upgrade to DataFusion 43
  • Loading branch information
gruuya authored Nov 25, 2024
2 parents d24aecd + 6297d45 commit e019ffc
Show file tree
Hide file tree
Showing 32 changed files with 511 additions and 453 deletions.
669 changes: 355 additions & 314 deletions Cargo.lock

Large diffs are not rendered by default.

54 changes: 21 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,40 @@
members = ["clade", "object_store_factory"]

[workspace.dependencies]
arrow = { version = "52.2.0", features = ["test_utils"] }
arrow-buffer = "52.2.0"
arrow-csv = "52.2.0"
arrow-flight = "52.2.0"
arrow = { version = "53.2.0", features = ["test_utils"] }
arrow-buffer = "53.2.0"
arrow-csv = "53.2.0"
arrow-flight = "53.2.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "52.2.0"
arrow-row = "52.2.0"
arrow-schema = "52.2.0"
arrow-integration-test = "53.2.0"
arrow-row = "53.2.0"
arrow-schema = "53.2.0"
async-trait = "0.1.83"

datafusion = { version = "41.0.0", features = ["backtrace"] }
datafusion-common = "41.0.0"
datafusion-expr = "41.0.0"
datafusion-functions-nested = "41.0.0"
datafusion = { version = "43.0.0", features = ["backtrace"] }
datafusion-common = "43.0.0"
datafusion-expr = "43.0.0"
datafusion-functions-nested = "43.0.0"

futures = "0.3"

itertools = ">=0.10.0"
object_store = { version = "0.10.2", features = ["aws", "azure", "gcp"] }
prost = "0.12.6"
object_store = { version = "0.11", features = ["aws", "azure", "gcp"] }
prost = "0.13"

serde = "1.0.213"
serde_json = "1.0.132"

tempfile = "3"
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
tonic = "0.12"
tracing = { version = "0.1", features = ["log"] }
tracing-log = "0.2"
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
url = "2.5"

[patch.crates-io]
arrow = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-array = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-buffer = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-csv = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-data = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-flight = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-integration-test = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-ipc = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-row = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-schema = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }

[package]
name = "seafowl"
build = "build.rs"
Expand Down Expand Up @@ -95,8 +84,8 @@ clap = { version = "4.5.21", features = [ "derive" ] }
config = "0.14.0"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }

dashmap = "6.1.0"

Expand All @@ -107,8 +96,7 @@ datafusion-functions-nested = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

# pr-2975-backport, pick up https://github.com/delta-io/delta-rs/pull/2975
deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "pr-2975-with-arrow-pr-6729-backport", features = ["datafusion"] }
deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-decimal-stat-overflow", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand All @@ -118,7 +106,7 @@ lazy_static = ">=1.4.0"
metrics = { version = "0.23.0" }
metrics-exporter-prometheus = { version = "0.15.3" }
moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] }
object_store = { version = "0.10.2", features = ["aws", "azure", "gcp"] }
object_store = { workspace = true }
object_store_factory = { path = "object_store_factory" }
percent-encoding = "2.2.0"
prost = { workspace = true }
Expand All @@ -135,15 +123,15 @@ rustyline = "14.0"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = ">=0.10.1"
sqlparser = { version = "0.49", features = ["visitor"] }
sqlparser = { version = "0.51", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "2"
tokio = { workspace = true }
tokio-graceful-shutdown = { version = "0.15" }
tonic = { version = "0.11.0", optional = true }
tonic = { workspace = true, optional = true }
tower = "0.5"
tracing = { workspace = true }
tracing-log = "0.2"
Expand All @@ -165,7 +153,7 @@ aws-credential-types = { version = "1.2.1", features = ["hardcoded-credentials"]
aws-sdk-sts = { version = "1.46.0", features = ["behavior-version-latest"] }
rstest = "*"
serial_test = "3"
tonic-reflection = "0.11"
tonic-reflection = "0.12"
wiremock = "0.6"

[build-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions clade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "Apache-2.0"
[dependencies]
arrow-flight = { workspace = true }
prost = { workspace = true }
tonic = "0.11"
tonic = { workspace = true }

[build-dependencies]
tonic-build = "0.11"
tonic-build = "0.12"
2 changes: 1 addition & 1 deletion clade/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_server(true)
.build_client(true)
.type_attribute("clade.sync.ColumnDescriptor", "#[derive(Eq, Hash)]")
.compile(&["proto/schema.proto", "proto/sync.proto"], &["proto"])?;
.compile_protos(&["proto/schema.proto", "proto/sync.proto"], &["proto"])?;

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-41-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-43-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion_remote_tables/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::ops::Deref;
use std::sync::Arc;

/// Factory for creating remote tables
#[derive(Debug)]
pub struct RemoteTableFactory {}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions datafusion_remote_tables/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::task;
use tracing::debug;

// Implementation of a remote table, capable of querying Postgres, MySQL, SQLite, etc...
#[derive(Debug)]
pub struct RemoteTable {
// We manually escape the field names during scans, but expect the user to escape the table name
// appropriately in the remote table definition
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clade::schema::ListSchemaResponse;

use super::CatalogError;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct EmptyStore {}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tonic::transport::{channel::Channel, Endpoint, Error};
use tonic::Request;

// An external store, facilitated via a remote clade server implementation
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct ExternalStore {
client: SchemaStoreServiceClient<Channel>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::catalog::{
use crate::repository::interface::AllDatabaseFunctionsResult;
use clade::schema::ListSchemaResponse;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct MemoryStore {
pub schemas: ListSchemaResponse,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type LocationAndOptions = (String, HashMap<String, String>);
// This is the main entrypoint to all individual catalogs for various objects types.
// The intention is to make it extensible and de-coupled from the underlying metastore
// persistence mechanism (such as the presently used `Repository`).
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Metastore {
pub catalogs: Arc<dyn CatalogStore>,
pub schemas: Arc<dyn SchemaStore>,
Expand Down
9 changes: 5 additions & 4 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use clade::schema::ListSchemaResponse;
use datafusion_common::DataFusionError;
use std::fmt::Debug;
use tonic::Status;
use uuid::Uuid;

Expand Down Expand Up @@ -149,7 +150,7 @@ impl From<serde_json::Error> for CreateFunctionError {
pub type CatalogResult<T> = Result<T, CatalogError>;

#[async_trait]
pub trait CatalogStore: Sync + Send {
pub trait CatalogStore: Debug + Sync + Send {
async fn create(&self, _name: &str) -> CatalogResult<()> {
not_impl()
}
Expand All @@ -164,7 +165,7 @@ pub trait CatalogStore: Sync + Send {
}

#[async_trait]
pub trait SchemaStore: Sync + Send {
pub trait SchemaStore: Debug + Sync + Send {
async fn create(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> {
not_impl()
}
Expand All @@ -187,7 +188,7 @@ pub trait SchemaStore: Sync + Send {
}

#[async_trait]
pub trait TableStore: Sync + Send {
pub trait TableStore: Debug + Sync + Send {
async fn create(
&self,
_catalog_name: &str,
Expand Down Expand Up @@ -275,7 +276,7 @@ pub trait TableStore: Sync + Send {
}

#[async_trait]
pub trait FunctionStore: Sync + Send {
pub trait FunctionStore: Debug + Sync + Send {
async fn create(
&self,
_catalog_name: &str,
Expand Down
1 change: 1 addition & 0 deletions src/catalog/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::repository::interface::{
use crate::wasm_udf::data_types::CreateFunctionDetails;

// The native catalog implementation for Seafowl.
#[derive(Debug)]
pub struct RepositoryStore {
pub repository: Arc<dyn Repository>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext>
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_DB, DEFAULT_SCHEMA);

let runtime_env = RuntimeEnv::new(runtime_config)?;
let runtime_env = RuntimeEnv::try_new(runtime_config)?;
let state = build_state_with_table_factories(session_config, Arc::new(runtime_env));
let context = SessionContext::new_with_state(state);

Expand Down
4 changes: 2 additions & 2 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ mod tests {
vec![
(
PART_0_FILE_NAME.to_string(),
1298,
1164,
true,
true,
json!({
Expand All @@ -606,7 +606,7 @@ mod tests {
),
(
PART_1_FILE_NAME.to_string(),
1313,
1176,
true,
true,
json!({
Expand Down
8 changes: 4 additions & 4 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ impl SeafowlContext {
})),
}))
},
Statement::Truncate { table: false, table_name, partitions, .. } => {
Statement::Truncate { table: false, table_names, partitions, .. } => {
let table_name = if partitions.is_none() {
Some(table_name.to_string())
Some(table_names[0].to_string())
} else {
None
};
Expand All @@ -268,10 +268,10 @@ impl SeafowlContext {
})),
}))
}
Statement::Truncate { table: true, table_name, .. } => {
Statement::Truncate { table: true, table_names, .. } => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::Truncate(Truncate {
table_name: table_name.to_string(),
table_name: table_names[0].to_string(),
output_schema: Arc::new(DFSchema::empty())
})),
}))
Expand Down
10 changes: 8 additions & 2 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::{
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
sql::TableReference,
};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column as ColumnExpr, ResolvedTableReference, SchemaReference};
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -190,7 +191,7 @@ impl SeafowlContext {
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertInto,
op: WriteOp::Insert(_),
input,
..
}) => {
Expand Down Expand Up @@ -941,7 +942,12 @@ impl SeafowlContext {
let table_path = ListingTableUrl::parse(file_path)?;
let file_format: Arc<dyn FileFormat> = match file_type {
"csv" => Arc::new(CsvFormat::default().with_has_header(has_header)),
"parquet" => Arc::new(ParquetFormat::default()),
"parquet" => {
// TODO: We can remove this once delta-rs supports Utf8View
let mut options = TableParquetOptions::default();
options.global.schema_force_view_types = false;
Arc::new(ParquetFormat::default().with_options(options))
}
_ => {
return Err(Error::Plan(format!(
"File type {file_type:?} not supported!"
Expand Down
Loading

0 comments on commit e019ffc

Please sign in to comment.