diff --git a/src/bigtable/mod.rs b/src/bigtable/mod.rs index 97fdba9..24dc002 100644 --- a/src/bigtable/mod.rs +++ b/src/bigtable/mod.rs @@ -66,9 +66,7 @@ pub mod api { use api::bigtable::v2; -pub use api::bigtable::v2::ReadRowsRequest; - -pub use api::bigtable::v2::{RowRange, RowSet}; +pub use api::bigtable::v2::{CheckAndMutateRowRequest, ReadRowsRequest, RowRange, RowSet}; fn bound_to_start_key(bound: Bound<&Bytes>) -> Option { use v2::row_range::StartKey; @@ -615,6 +613,31 @@ where } } + /// Performs a conditional mutation on a single row, returning whether the filter matched. + pub async fn check_and_mutate_row( + &mut self, + request: CheckAndMutateRowRequest, + ) -> Result { + let mut retry = self.retry.new_operation(); + + let response = loop { + match self.inner.check_and_mutate_row(request.clone()).await { + Ok(resp) => { + break resp.into_inner(); + } + Err(e) => { + if let Some(sleep) = retry.check_retry(&(), &e) { + sleep.await; + } else { + return Err(e.into()); + } + } + } + }; + + Ok(response.predicate_matched) + } + /// Performs a mutation request for a single row. /// /// Note that the table name in `request` needs to be diff --git a/tests/bigtable_client.rs b/tests/bigtable_client.rs index a80b5b9..e7366f5 100644 --- a/tests/bigtable_client.rs +++ b/tests/bigtable_client.rs @@ -6,8 +6,13 @@ mod bigtable_client_tests { self, admin::Rule, api, + api::bigtable::v2, emulator::{Emulator, EmulatorClient}, - ReadRowsRequest, + mutation::{ + self, + v2::{row_filter, RowFilter}, + }, + CheckAndMutateRowRequest, ReadRowsRequest, }; #[tokio::test] @@ -163,4 +168,64 @@ mod bigtable_client_tests { row.most_recent_cells().map(|c| c.value).collect::>() ); } + + #[tokio::test] + async fn conditional_mutate() { + let table_name = "test-table"; + let (_emulator, mut client) = default_client(table_name).await; + + client + .set_row_data( + table_name, + "fam1".into(), + "row1-key", + [("col1", "data1"), ("col2", "data2")], + ) + .await + .unwrap(); + + let mut mutation = v2::Mutation::default(); + mutation.mutation = Some( + mutation::SetCell::new( + "fam1".to_owned(), + Bytes::from_static(b"col1"), + Bytes::from_static(b"new"), + ) + .with_timestamp(-1) + .into(), + ); + + let mut req = CheckAndMutateRowRequest::default(); + let mut filter = RowFilter::default(); + filter.filter = Some(row_filter::Filter::ValueRegexFilter(Bytes::from_static( + b"data3", + ))); + req.table_name = client.fully_qualified_table_name(table_name); + req.row_key = Bytes::from_static(b"row1-key"); + req.predicate_filter = Some(filter.into()); + req.true_mutations = vec![mutation]; + + // This filter shouldn't match. + assert!(!client.check_and_mutate_row(req.clone()).await.unwrap()); + let row = client + .read_one_row(table_name, "row1-key") + .await + .unwrap() + .unwrap(); + assert_eq!("data1", row.families[0].columns[0].cells[0].value); + assert_eq!("data2", row.families[0].columns[1].cells[0].value); + + req.predicate_filter.as_mut().unwrap().filter = Some(row_filter::Filter::ValueRegexFilter( + Bytes::from_static(b"data2"), + )); + // This filter should match. + assert!(client.check_and_mutate_row(req.clone()).await.unwrap()); + let row = client + .read_one_row(table_name, "row1-key") + .await + .unwrap() + .unwrap(); + assert_eq!("new", row.families[0].columns[0].cells[0].value); + assert_eq!("data2", row.families[0].columns[1].cells[0].value); + } }