Skip to content

Commit

Permalink
[ENH] Add rust message id conversion for pulsar (#1531)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Adds a pulsar MsgID to our SeqID conversion based on the python version. I think we may need to revisit this encoding, as it reduces the range of the various fields, unless I am thinking about it incorrectly. For now I duplicated what we had, and we can revisit in the future.
 - New functionality
	 - /

## Test plan
*How are these changes tested?*

- [x] Tests pass locally with `cargo test`

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
HammadB authored Jan 16, 2024
1 parent fc7b3ea commit 0ab9885
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
27 changes: 27 additions & 0 deletions rust/worker/src/ingest/message_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// mirrors chromadb/utils/messageid.py
use num_bigint::BigInt;
use pulsar::proto::MessageIdData;

use crate::types::SeqId;

pub(crate) fn pulsar_to_int(message_id: &MessageIdData) -> SeqId {
let ledger_id = message_id.ledger_id;
let entry_id = message_id.entry_id;
let batch_index = message_id.batch_index.unwrap_or(0);
let partition = message_id.partition.unwrap_or(0);

let mut ledger_id = BigInt::from(ledger_id);
let mut entry_id = BigInt::from(entry_id);
let mut batch_index = BigInt::from(batch_index);
let mut partition = BigInt::from(partition);

// Convert to offset binary encoding to preserve ordering semantics when encoded
// see https://en.wikipedia.org/wiki/Offset_binary
ledger_id = ledger_id + BigInt::from(2).pow(63);
entry_id = entry_id + BigInt::from(2).pow(63);
batch_index = batch_index + BigInt::from(2).pow(31);
partition = partition + BigInt::from(2).pow(31);

let res = ledger_id << 128 | entry_id << 96 | batch_index << 64 | partition;
res
}
1 change: 1 addition & 0 deletions rust/worker/src/ingest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) mod config;
mod ingest;
mod message_id;

// Re-export the ingest provider for use in the worker
pub(crate) use ingest::*;

0 comments on commit 0ab9885

Please sign in to comment.