diff --git a/src/db/query.rs b/src/db/query.rs index 94b40c4..3c34ba2 100644 --- a/src/db/query.rs +++ b/src/db/query.rs @@ -28,6 +28,11 @@ pub trait FirestoreQuerySupport { params: FirestoreQueryParams, ) -> FirestoreResult>>; + async fn stream_query_doc_with_metadata<'b>( + &self, + params: FirestoreQueryParams, + ) -> FirestoreResult>>>; + async fn query_obj(&self, params: FirestoreQueryParams) -> FirestoreResult> where for<'de> T: Deserialize<'de>; @@ -47,6 +52,14 @@ pub trait FirestoreQuerySupport { for<'de> T: Deserialize<'de>, T: Send + 'b; + async fn stream_query_obj_with_metadata<'b, T>( + &self, + params: FirestoreQueryParams, + ) -> FirestoreResult>>> + where + for<'de> T: Deserialize<'de>, + T: Send + 'b; + fn stream_partition_cursors_with_errors( &self, params: FirestorePartitionQueryParams, @@ -99,7 +112,8 @@ impl FirestoreDb { params: FirestoreQueryParams, retries: usize, span: Span, - ) -> BoxFuture>>> { + ) -> BoxFuture>>>> + { async move { let query_request = self.create_query_request(params.clone())?; let begin_query_utc: DateTime = Utc::now(); @@ -112,7 +126,11 @@ impl FirestoreDb { .await { Ok(query_response) => { - let query_stream = query_response.into_inner().map_err(|e| e.into()).boxed(); + let query_stream = query_response + .into_inner() + .map_err(|e| e.into()) + .map(|r| r.and_then(|r| r.try_into())) + .boxed(); let end_query_utc: DateTime = Utc::now(); let query_duration = end_query_utc.signed_duration_since(begin_query_utc); @@ -291,6 +309,22 @@ impl FirestoreQuerySupport for FirestoreDb { }))) } + async fn stream_query_doc_with_metadata<'b>( + &self, + params: FirestoreQueryParams, + ) -> FirestoreResult>>> { + let collection_str = params.collection_id.to_string(); + + let span = span!( + Level::DEBUG, + "Firestore Streaming Query with Metadata", + "/firestore/collection_name" = collection_str.as_str(), + "/firestore/response_time" = field::Empty + ); + + self.stream_query_doc_with_retries(params, 0, span).await + } + async fn query_obj(&self, params: FirestoreQueryParams) -> FirestoreResult> where for<'de> T: Deserialize<'de>, @@ -340,6 +374,28 @@ impl FirestoreQuerySupport for FirestoreDb { }))) } + async fn stream_query_obj_with_metadata<'b, T>( + &self, + params: FirestoreQueryParams, + ) -> FirestoreResult>>> + where + for<'de> T: Deserialize<'de>, + T: Send + 'b, + { + let res_stream = self.stream_query_doc_with_metadata(params).await?; + Ok(Box::pin(res_stream.map(|res| { + res.and_then(|with_meta| { + Ok(FirestoreWithMetadata { + document: with_meta + .document + .map(|document| Self::deserialize_doc_to::(&document)) + .transpose()?, + metadata: with_meta.metadata, + }) + }) + }))) + } + fn stream_partition_cursors_with_errors( &self, params: FirestorePartitionQueryParams, diff --git a/src/firestore_meta_doc.rs b/src/firestore_meta_doc.rs new file mode 100644 index 0000000..3579c10 --- /dev/null +++ b/src/firestore_meta_doc.rs @@ -0,0 +1,96 @@ +use crate::errors::FirestoreError; +use crate::timestamp_utils::{from_duration, from_timestamp}; +use crate::FirestoreTransactionId; +use chrono::{DateTime, Duration, Utc}; +use gcloud_sdk::google::firestore::v1::{Document, ExplainMetrics, RunQueryResponse}; +use rsb_derive::Builder; +use std::collections::BTreeMap; + +#[derive(Debug, PartialEq, Clone)] +pub struct FirestoreWithMetadata { + pub document: Option, + pub metadata: FirestoreDocumentMetadata, +} + +#[derive(Debug, PartialEq, Clone, Builder)] +pub struct FirestoreDocumentMetadata { + pub transaction_id: Option, + pub read_time: Option>, + pub skipped_results: usize, + pub explain_metrics: Option, +} + +#[derive(Debug, PartialEq, Clone, Builder)] +pub struct FirestoreExplainMetrics { + pub plan_summary: Option, + pub execution_stats: Option, +} + +pub type FirestoreDynamicStruct = BTreeMap; + +#[derive(Debug, PartialEq, Clone, Builder)] +pub struct FirestorePlanSummary { + pub indexes_used: Vec, +} + +#[derive(Debug, PartialEq, Clone, Builder)] +pub struct FirestoreExecutionStats { + pub results_returned: usize, + pub execution_duration: Option, + pub read_operations: usize, + pub debug_stats: Option, +} + +impl TryFrom for FirestoreWithMetadata { + type Error = FirestoreError; + + fn try_from(value: RunQueryResponse) -> Result { + Ok(FirestoreWithMetadata { + document: value.document, + metadata: FirestoreDocumentMetadata { + transaction_id: Some(value.transaction), + read_time: value.read_time.map(from_timestamp).transpose()?, + skipped_results: value.skipped_results as usize, + explain_metrics: value.explain_metrics.map(|v| v.try_into()).transpose()?, + }, + }) + } +} + +impl TryFrom for FirestoreExplainMetrics { + type Error = FirestoreError; + + fn try_from(value: ExplainMetrics) -> Result { + Ok(FirestoreExplainMetrics { + plan_summary: value.plan_summary.map(|v| v.try_into()).transpose()?, + execution_stats: value.execution_stats.map(|v| v.try_into()).transpose()?, + }) + } +} + +impl TryFrom for FirestorePlanSummary { + type Error = FirestoreError; + + fn try_from( + value: gcloud_sdk::google::firestore::v1::PlanSummary, + ) -> Result { + Ok(FirestorePlanSummary { + indexes_used: value.indexes_used.into_iter().map(|v| v.fields).collect(), + }) + } +} + +impl TryFrom for FirestoreExecutionStats { + type Error = FirestoreError; + + fn try_from( + value: gcloud_sdk::google::firestore::v1::ExecutionStats, + ) -> Result { + Ok(FirestoreExecutionStats { + results_returned: value.results_returned as usize, + execution_duration: value.execution_duration.map(from_duration), + read_operations: value.read_operations as usize, + debug_stats: value.debug_stats.map(|v| v.fields), + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index 2c14727..6d7db74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,12 +126,15 @@ pub mod errors; mod firestore_value; + pub use firestore_value::*; mod db; + pub use db::*; mod firestore_serde; + pub use firestore_serde::*; mod struct_path_macro; @@ -147,10 +150,16 @@ pub type FirestoreResult = std::result::Result; pub type FirestoreDocument = gcloud_sdk::google::firestore::v1::Document; +mod firestore_meta_doc; + +pub use firestore_meta_doc::*; + mod firestore_document_functions; + pub use firestore_document_functions::*; mod fluent_api; + pub use fluent_api::*; pub extern crate struct_path; diff --git a/src/timestamp_utils.rs b/src/timestamp_utils.rs index 8dc6256..a725f8f 100644 --- a/src/timestamp_utils.rs +++ b/src/timestamp_utils.rs @@ -20,3 +20,8 @@ pub fn to_timestamp(dt: DateTime) -> gcloud_sdk::prost_types::Timestamp { nanos: dt.nanosecond() as i32, } } + +pub fn from_duration(duration: gcloud_sdk::prost_types::Duration) -> chrono::Duration { + chrono::Duration::seconds(duration.seconds) + + chrono::Duration::nanoseconds(duration.nanos.into()) +}