Skip to content

Commit

Permalink
Merge pull request #18 from synnada-ai/metadata-flags
Browse files Browse the repository at this point in the history
Metadata Flags for RecordBatch
  • Loading branch information
berkaysynnada authored Dec 18, 2024
2 parents f5b51ff + 8583d55 commit 25a4f7c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
2 changes: 2 additions & 0 deletions arrow-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ pub use array::*;
mod record_batch;
pub use record_batch::{
RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader, RecordBatchWriter,
CHECKPOINT_MESSAGE, INTERMEDIATE_NODE_GENERATED_WATERMARK, NORMAL_RECORD_BATCH,
SOURCE_GENERATED_WATERMARK,
};

mod arithmetic;
Expand Down
79 changes: 79 additions & 0 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef
use std::ops::Index;
use std::sync::Arc;

/// THIS CONSTANT IS ARAS ONLY
pub const NORMAL_RECORD_BATCH: u8 = 0;
/// THIS CONSTANT IS ARAS ONLY
///
/// Whether the watermark is [`SOURCE_GENERATED_WATERMARK`] or
/// [`INTERMEDIATE_NODE_GENERATED_WATERMARK`] is not be important
/// for the watermark algorithms. They are seperated because of
/// testing purposes. Once the watermark infrastructure is solid
/// and complete, then we will unify them.
pub const SOURCE_GENERATED_WATERMARK: u8 = 1;
/// THIS CONSTANT IS ARAS ONLY
///
/// Whether the watermark is [`SOURCE_GENERATED_WATERMARK`] or
/// [`INTERMEDIATE_NODE_GENERATED_WATERMARK`] is not be important
/// for the watermark algorithms. They are seperated because of
/// testing purposes. Once the watermark infrastructure is solid
/// and complete, then we will unify them.
pub const INTERMEDIATE_NODE_GENERATED_WATERMARK: u8 = 2;
/// THIS CONSTANT IS ARAS ONLY
pub const CHECKPOINT_MESSAGE: u8 = 3;

/// Trait for types that can read `RecordBatch`'s.
///
/// To create from an iterator, see [RecordBatchIterator].
Expand Down Expand Up @@ -213,6 +234,28 @@ pub struct RecordBatch {
///
/// This is stored separately from the columns to handle the case of no columns
row_count: usize,

/// THIS MEMBER IS ARAS ONLY
///
/// This tag is used to store metadata flags for the record batch. This
/// should include information that do not make a material change in the
/// schema, but still needs to be carried with. Carrying such information
/// as part of schema metadata results in "schema mismatch" issues, so
/// this attribute should be used in such cases. Typical downstream use
/// cases include specifying whether the record batch is a proper data
/// batch, or a watermark batch, or a control message like a checkpoint
/// barrier.
///
/// The flags define the type of data or message represented by the record batch.
/// The following values are currently defined:
///
/// - `0`: Normal record batch data.
/// - `1`: A source-generated watermark.
/// - `2`: An intermediate node-generated watermark.
/// - `3`: A checkpoint message.
///
/// Additional flag values may be defined in the future to support new use cases.
metadata_flags: u8,
}

impl RecordBatch {
Expand Down Expand Up @@ -272,6 +315,7 @@ impl RecordBatch {
schema,
columns,
row_count: 0,
metadata_flags: 0,
}
}

Expand Down Expand Up @@ -345,6 +389,7 @@ impl RecordBatch {
schema,
columns,
row_count,
metadata_flags: options.metadata_flags,
})
}

Expand All @@ -364,6 +409,7 @@ impl RecordBatch {
schema,
columns: self.columns,
row_count: self.row_count,
metadata_flags: self.metadata_flags,
})
}

Expand Down Expand Up @@ -399,6 +445,7 @@ impl RecordBatch {
&RecordBatchOptions {
match_field_names: true,
row_count: Some(self.row_count),
metadata_flags: self.metadata_flags,
},
)
}
Expand Down Expand Up @@ -521,6 +568,7 @@ impl RecordBatch {
schema: self.schema.clone(),
columns,
row_count: length,
metadata_flags: self.metadata_flags,
}
}

Expand Down Expand Up @@ -629,6 +677,21 @@ impl RecordBatch {
.map(|array| array.get_array_memory_size())
.sum()
}

/// THIS METHOD IS ARAS ONLY
///
/// Gets the metadata_flags of RecordBatch
pub fn metadata_flags(&self) -> u8 {
self.metadata_flags
}

/// THIS METHOD IS ARAS ONLY
///
/// Sets the metadata_flags of RecordBatch and returns self
pub fn with_metadata_flags(mut self, metadata_flags: u8) -> Self {
self.metadata_flags = metadata_flags;
self
}
}

/// Options that control the behaviour used when creating a [`RecordBatch`].
Expand All @@ -640,6 +703,11 @@ pub struct RecordBatchOptions {

/// Optional row count, useful for specifying a row count for a RecordBatch with no columns
pub row_count: Option<usize>,

/// THIS MEMBER IS ARAS ONLY
///
/// This tag is used to store metadata flags for the record batch.
pub metadata_flags: u8,
}

impl RecordBatchOptions {
Expand All @@ -648,6 +716,7 @@ impl RecordBatchOptions {
Self {
match_field_names: true,
row_count: None,
metadata_flags: 0,
}
}
/// Sets the row_count of RecordBatchOptions and returns self
Expand All @@ -660,6 +729,13 @@ impl RecordBatchOptions {
self.match_field_names = match_field_names;
self
}
/// THIS METHOD IS ARAS ONLY
///
/// Sets the metadata_flags of RecordBatchOptions and returns self
pub fn with_metadata_flags(mut self, metadata_flags: u8) -> Self {
self.metadata_flags = metadata_flags;
self
}
}
impl Default for RecordBatchOptions {
fn default() -> Self {
Expand All @@ -680,6 +756,7 @@ impl From<StructArray> for RecordBatch {
schema: Arc::new(Schema::new(fields)),
row_count,
columns,
metadata_flags: 0,
}
}
}
Expand Down Expand Up @@ -991,6 +1068,7 @@ mod tests {
let options = RecordBatchOptions {
match_field_names: false,
row_count: None,
metadata_flags: 0,
};
let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
assert!(batch.is_ok());
Expand Down Expand Up @@ -1235,6 +1313,7 @@ mod tests {
&RecordBatchOptions {
match_field_names: true,
row_count: Some(3),
metadata_flags: 0,
},
)
.expect("valid conversion");
Expand Down

0 comments on commit 25a4f7c

Please sign in to comment.