Skip to content

Commit

Permalink
Add support for check_and_mutate_row
Browse files Browse the repository at this point in the history
  • Loading branch information
jneem committed Jul 1, 2024
1 parent 9f57230 commit b5c3040
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
29 changes: 26 additions & 3 deletions src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ pub mod api {

use api::bigtable::v2;

pub use api::bigtable::v2::ReadRowsRequest;

pub use api::bigtable::v2::{RowRange, RowSet};
pub use api::bigtable::v2::{CheckAndMutateRowRequest, ReadRowsRequest, RowRange, RowSet};

fn bound_to_start_key(bound: Bound<&Bytes>) -> Option<v2::row_range::StartKey> {
use v2::row_range::StartKey;
Expand Down Expand Up @@ -615,6 +613,31 @@ where
}
}

/// Performs a conditional mutation on a single row, returning whether the filter matched.
pub async fn check_and_mutate_row(
&mut self,
request: CheckAndMutateRowRequest,
) -> Result<bool, MutateRowsError> {
let mut retry = self.retry.new_operation();

let response = loop {
match self.inner.check_and_mutate_row(request.clone()).await {
Ok(resp) => {
break resp.into_inner();
}
Err(e) => {
if let Some(sleep) = retry.check_retry(&(), &e) {
sleep.await;
} else {
return Err(e.into());
}
}
}
};

Ok(response.predicate_matched)
}

/// Performs a mutation request for a single row.
///
/// Note that the table name in `request` needs to be
Expand Down
67 changes: 66 additions & 1 deletion tests/bigtable_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ mod bigtable_client_tests {
self,
admin::Rule,
api,
api::bigtable::v2,
emulator::{Emulator, EmulatorClient},
ReadRowsRequest,
mutation::{
self,
v2::{row_filter, RowFilter},
},
CheckAndMutateRowRequest, ReadRowsRequest,
};

#[tokio::test]
Expand Down Expand Up @@ -163,4 +168,64 @@ mod bigtable_client_tests {
row.most_recent_cells().map(|c| c.value).collect::<Vec<_>>()
);
}

#[tokio::test]
async fn conditional_mutate() {
let table_name = "test-table";
let (_emulator, mut client) = default_client(table_name).await;

client
.set_row_data(
table_name,
"fam1".into(),
"row1-key",
[("col1", "data1"), ("col2", "data2")],
)
.await
.unwrap();

let mut mutation = v2::Mutation::default();
mutation.mutation = Some(
mutation::SetCell::new(
"fam1".to_owned(),
Bytes::from_static(b"col1"),
Bytes::from_static(b"new"),
)
.with_timestamp(-1)
.into(),
);

let mut req = CheckAndMutateRowRequest::default();
let mut filter = RowFilter::default();
filter.filter = Some(row_filter::Filter::ValueRegexFilter(Bytes::from_static(
b"data3",
)));
req.table_name = client.fully_qualified_table_name(table_name);
req.row_key = Bytes::from_static(b"row1-key");
req.predicate_filter = Some(filter.into());
req.true_mutations = vec![mutation];

// This filter shouldn't match.
assert!(!client.check_and_mutate_row(req.clone()).await.unwrap());
let row = client
.read_one_row(table_name, "row1-key")
.await
.unwrap()
.unwrap();
assert_eq!("data1", row.families[0].columns[0].cells[0].value);
assert_eq!("data2", row.families[0].columns[1].cells[0].value);

req.predicate_filter.as_mut().unwrap().filter = Some(row_filter::Filter::ValueRegexFilter(
Bytes::from_static(b"data2"),
));
// This filter should match.
assert!(client.check_and_mutate_row(req.clone()).await.unwrap());
let row = client
.read_one_row(table_name, "row1-key")
.await
.unwrap()
.unwrap();
assert_eq!("new", row.families[0].columns[0].cells[0].value);
assert_eq!("data2", row.families[0].columns[1].cells[0].value);
}
}

0 comments on commit b5c3040

Please sign in to comment.