Skip to content

Commit

Permalink
_with_metadata support functions
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Apr 2, 2024
1 parent af22734 commit c4066dc
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 2 deletions.
60 changes: 58 additions & 2 deletions src/db/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ pub trait FirestoreQuerySupport {
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<Document>>>;

async fn stream_query_doc_with_metadata<'b>(
&self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreWithMetadata<FirestoreDocument>>>>;

async fn query_obj<T>(&self, params: FirestoreQueryParams) -> FirestoreResult<Vec<T>>
where
for<'de> T: Deserialize<'de>;
Expand All @@ -47,6 +52,14 @@ pub trait FirestoreQuerySupport {
for<'de> T: Deserialize<'de>,
T: Send + 'b;

async fn stream_query_obj_with_metadata<'b, T>(
&self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreWithMetadata<T>>>>
where
for<'de> T: Deserialize<'de>,
T: Send + 'b;

fn stream_partition_cursors_with_errors(
&self,
params: FirestorePartitionQueryParams,
Expand Down Expand Up @@ -99,7 +112,8 @@ impl FirestoreDb {
params: FirestoreQueryParams,
retries: usize,
span: Span,
) -> BoxFuture<FirestoreResult<BoxStream<'b, FirestoreResult<RunQueryResponse>>>> {
) -> BoxFuture<FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreWithMetadata<Document>>>>>
{
async move {
let query_request = self.create_query_request(params.clone())?;
let begin_query_utc: DateTime<Utc> = Utc::now();
Expand All @@ -112,7 +126,11 @@ impl FirestoreDb {
.await
{
Ok(query_response) => {
let query_stream = query_response.into_inner().map_err(|e| e.into()).boxed();
let query_stream = query_response
.into_inner()
.map_err(|e| e.into())
.map(|r| r.and_then(|r| r.try_into()))
.boxed();

let end_query_utc: DateTime<Utc> = Utc::now();
let query_duration = end_query_utc.signed_duration_since(begin_query_utc);
Expand Down Expand Up @@ -291,6 +309,22 @@ impl FirestoreQuerySupport for FirestoreDb {
})))
}

async fn stream_query_doc_with_metadata<'b>(
&self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreWithMetadata<Document>>>> {
let collection_str = params.collection_id.to_string();

let span = span!(
Level::DEBUG,
"Firestore Streaming Query with Metadata",
"/firestore/collection_name" = collection_str.as_str(),
"/firestore/response_time" = field::Empty
);

self.stream_query_doc_with_retries(params, 0, span).await
}

async fn query_obj<T>(&self, params: FirestoreQueryParams) -> FirestoreResult<Vec<T>>
where
for<'de> T: Deserialize<'de>,
Expand Down Expand Up @@ -340,6 +374,28 @@ impl FirestoreQuerySupport for FirestoreDb {
})))
}

async fn stream_query_obj_with_metadata<'b, T>(
&self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreWithMetadata<T>>>>
where
for<'de> T: Deserialize<'de>,
T: Send + 'b,
{
let res_stream = self.stream_query_doc_with_metadata(params).await?;
Ok(Box::pin(res_stream.map(|res| {
res.and_then(|with_meta| {
Ok(FirestoreWithMetadata {
document: with_meta
.document
.map(|document| Self::deserialize_doc_to::<T>(&document))
.transpose()?,
metadata: with_meta.metadata,
})
})
})))
}

fn stream_partition_cursors_with_errors(
&self,
params: FirestorePartitionQueryParams,
Expand Down
96 changes: 96 additions & 0 deletions src/firestore_meta_doc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use crate::errors::FirestoreError;
use crate::timestamp_utils::{from_duration, from_timestamp};
use crate::FirestoreTransactionId;
use chrono::{DateTime, Duration, Utc};
use gcloud_sdk::google::firestore::v1::{Document, ExplainMetrics, RunQueryResponse};
use rsb_derive::Builder;
use std::collections::BTreeMap;

#[derive(Debug, PartialEq, Clone)]
pub struct FirestoreWithMetadata<T> {
pub document: Option<T>,
pub metadata: FirestoreDocumentMetadata,
}

#[derive(Debug, PartialEq, Clone, Builder)]
pub struct FirestoreDocumentMetadata {
pub transaction_id: Option<FirestoreTransactionId>,
pub read_time: Option<DateTime<Utc>>,
pub skipped_results: usize,
pub explain_metrics: Option<FirestoreExplainMetrics>,
}

#[derive(Debug, PartialEq, Clone, Builder)]
pub struct FirestoreExplainMetrics {
pub plan_summary: Option<FirestorePlanSummary>,
pub execution_stats: Option<FirestoreExecutionStats>,
}

pub type FirestoreDynamicStruct = BTreeMap<String, gcloud_sdk::prost_types::Value>;

#[derive(Debug, PartialEq, Clone, Builder)]
pub struct FirestorePlanSummary {
pub indexes_used: Vec<FirestoreDynamicStruct>,
}

#[derive(Debug, PartialEq, Clone, Builder)]
pub struct FirestoreExecutionStats {
pub results_returned: usize,
pub execution_duration: Option<Duration>,
pub read_operations: usize,
pub debug_stats: Option<FirestoreDynamicStruct>,
}

impl TryFrom<RunQueryResponse> for FirestoreWithMetadata<Document> {
type Error = FirestoreError;

fn try_from(value: RunQueryResponse) -> Result<Self, Self::Error> {
Ok(FirestoreWithMetadata {
document: value.document,
metadata: FirestoreDocumentMetadata {
transaction_id: Some(value.transaction),
read_time: value.read_time.map(from_timestamp).transpose()?,
skipped_results: value.skipped_results as usize,
explain_metrics: value.explain_metrics.map(|v| v.try_into()).transpose()?,
},
})
}
}

impl TryFrom<ExplainMetrics> for FirestoreExplainMetrics {
type Error = FirestoreError;

fn try_from(value: ExplainMetrics) -> Result<Self, Self::Error> {
Ok(FirestoreExplainMetrics {
plan_summary: value.plan_summary.map(|v| v.try_into()).transpose()?,
execution_stats: value.execution_stats.map(|v| v.try_into()).transpose()?,
})
}
}

impl TryFrom<gcloud_sdk::google::firestore::v1::PlanSummary> for FirestorePlanSummary {
type Error = FirestoreError;

fn try_from(
value: gcloud_sdk::google::firestore::v1::PlanSummary,
) -> Result<Self, Self::Error> {
Ok(FirestorePlanSummary {
indexes_used: value.indexes_used.into_iter().map(|v| v.fields).collect(),
})
}
}

impl TryFrom<gcloud_sdk::google::firestore::v1::ExecutionStats> for FirestoreExecutionStats {
type Error = FirestoreError;

fn try_from(
value: gcloud_sdk::google::firestore::v1::ExecutionStats,
) -> Result<Self, Self::Error> {
Ok(FirestoreExecutionStats {
results_returned: value.results_returned as usize,
execution_duration: value.execution_duration.map(from_duration),
read_operations: value.read_operations as usize,
debug_stats: value.debug_stats.map(|v| v.fields),
})
}
}
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,15 @@

pub mod errors;
mod firestore_value;

pub use firestore_value::*;

mod db;

pub use db::*;

mod firestore_serde;

pub use firestore_serde::*;

mod struct_path_macro;
Expand All @@ -147,10 +150,16 @@ pub type FirestoreResult<T> = std::result::Result<T, FirestoreError>;

pub type FirestoreDocument = gcloud_sdk::google::firestore::v1::Document;

mod firestore_meta_doc;

pub use firestore_meta_doc::*;

mod firestore_document_functions;

pub use firestore_document_functions::*;

mod fluent_api;

pub use fluent_api::*;

pub extern crate struct_path;
Expand Down
5 changes: 5 additions & 0 deletions src/timestamp_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ pub fn to_timestamp(dt: DateTime<Utc>) -> gcloud_sdk::prost_types::Timestamp {
nanos: dt.nanosecond() as i32,
}
}

pub fn from_duration(duration: gcloud_sdk::prost_types::Duration) -> chrono::Duration {
chrono::Duration::seconds(duration.seconds)
+ chrono::Duration::nanoseconds(duration.nanos.into())
}

0 comments on commit c4066dc

Please sign in to comment.