diff --git a/clade/proto/sync.proto b/clade/proto/sync.proto index 9f61daa1..49423237 100644 --- a/clade/proto/sync.proto +++ b/clade/proto/sync.proto @@ -38,6 +38,9 @@ message DataSyncCommand { // Monotonically-increasing transaction number. // Only specified in the last message of a transaction optional uint64 sequence_number = 5; + + // Table format + schema.TableFormat format = 6; } message DataSyncResponse { diff --git a/src/frontend/flight/handler.rs b/src/frontend/flight/handler.rs index 32a00a5e..03a5743b 100644 --- a/src/frontend/flight/handler.rs +++ b/src/frontend/flight/handler.rs @@ -4,6 +4,7 @@ use arrow::record_batch::RecordBatch; use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder}; use arrow_flight::sql::{ProstMessageExt, SqlInfo, TicketStatementQuery}; use arrow_flight::{FlightDescriptor, FlightEndpoint, FlightInfo, Ticket}; +use clade::schema::TableFormat; use clade::sync::{DataSyncCommand, DataSyncResponse}; use dashmap::DashMap; use datafusion::common::Result; @@ -22,7 +23,7 @@ use url::Url; use crate::context::SeafowlContext; use crate::sync::schema::SyncSchema; use crate::sync::writer::SeafowlDataSyncWriter; -use crate::sync::SyncResult; +use crate::sync::{SyncError, SyncResult}; lazy_static! { pub static ref SEAFOWL_SQL_DATA: SqlInfoData = { @@ -162,6 +163,10 @@ impl SeafowlFlightHandler { }); } + if cmd.format != TableFormat::Delta as i32 { + return Err(SyncError::NotImplemented); + } + let log_store = match cmd.store { None => self .context diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 89d45b4c..3255b681 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -22,6 +22,9 @@ pub enum SyncError { #[error("Invalid sync message: {reason}")] InvalidMessage { reason: String }, + #[error("Not implemented")] + NotImplemented, + #[error(transparent)] ArrowError(#[from] arrow_schema::ArrowError), diff --git a/tests/flight/sync.rs b/tests/flight/sync.rs index 18759021..4c3494bb 100644 --- a/tests/flight/sync.rs +++ b/tests/flight/sync.rs @@ -1,6 +1,6 @@ use crate::fixtures::minio_options; use crate::flight::*; -use clade::schema::StorageLocation; +use clade::schema::{StorageLocation, TableFormat}; use clade::sync::{ColumnDescriptor, ColumnRole}; use deltalake::DeltaTable; use std::collections::HashMap; @@ -118,6 +118,7 @@ async fn test_sync_happy_path() -> std::result::Result<(), Box std::result::Result<(), Box