Skip to content

Commit

Permalink
[ENH] Add rust message id conversion for pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Dec 15, 2023
1 parent 7cfac3c commit 38e983c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
27 changes: 27 additions & 0 deletions rust/worker/src/ingest/message_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// mirrors chromadb/utils/messageid.py
use num_bigint::BigInt;
use pulsar::proto::MessageIdData;

use crate::types::SeqId;

pub(crate) fn pulsar_to_int(message_id: &MessageIdData) -> SeqId {
let ledger_id = message_id.ledger_id;
let entry_id = message_id.entry_id;
let batch_index = message_id.batch_index.unwrap_or(0);
let partition = message_id.partition.unwrap_or(0);

let mut ledger_id = BigInt::from(ledger_id);
let mut entry_id = BigInt::from(entry_id);
let mut batch_index = BigInt::from(batch_index);
let mut partition = BigInt::from(partition);

// Convert to offset binary encoding to preserve ordering semantics when encoded
// see https://en.wikipedia.org/wiki/Offset_binary
ledger_id = ledger_id + BigInt::from(2).pow(63);
entry_id = entry_id + BigInt::from(2).pow(63);
batch_index = batch_index + BigInt::from(2).pow(31);
partition = partition + BigInt::from(2).pow(31);

let res = ledger_id << 128 | entry_id << 96 | batch_index << 64 | partition;
res
}
1 change: 1 addition & 0 deletions rust/worker/src/ingest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) mod config;
mod ingest;
mod message_id;

// Re-export the ingest provider for use in the worker
pub(crate) use ingest::*;

0 comments on commit 38e983c

Please sign in to comment.