Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make result summary available behind a feature flag #199

Merged
merged 14 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[alias]
xtask = "run --package xtask --"
ff = "hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-result-summary"
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ env:
RUST_LOG: debug
CARGO_TERM_COLOR: always
MSRV: 1.75.0
HACK: hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary
HACK: hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-result-summary

jobs:
check:
Expand Down
6 changes: 3 additions & 3 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ rust-version = "1.75.0"

[features]
json = ["serde_json"]
unstable-v1 = ["unstable-bolt-protocol-impl-v2", "unstable-streaming-summary"]
unstable-v1 = ["unstable-bolt-protocol-impl-v2", "unstable-result-summary"]
unstable-serde-packstream-format = []
unstable-streaming-summary = ["unstable-serde-packstream-format"]
unstable-result-summary = ["unstable-serde-packstream-format"]
unstable-bolt-protocol-impl-v2 = [
"unstable-serde-packstream-format",
"unstable-streaming-summary",
"unstable-result-summary",
"dep:nav-types",
"dep:time",
]
Expand Down
86 changes: 86 additions & 0 deletions lib/include/result_summary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
use ::futures::TryStreamExt as _;

use neo4rs::summary::{Type, Counters, ResultSummary};

#[allow(dead_code)]
#[derive(Debug, PartialEq, serde::Deserialize)]
struct N {
prop: String,
}

fn assert_item(n: N) {
assert_eq!(n.prop, "frobnicate");
}

fn assert_summary(summary: &ResultSummary) {
assert!(summary.available_after().is_some());
assert!(summary.consumed_after().is_some());
assert!(summary.db().is_some());
assert_eq!(summary.query_type(), Type::ReadWrite);
assert_eq!(summary.stats(), &Counters { nodes_created: 1, properties_set: 1, labels_added: 1, ..Default::default()});
}

//
// next_or_summary

let mut stream = graph
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
.await
.unwrap();

let Ok(Some(row)) = stream.next_or_summary().await else { panic!() };
assert!(row.row().is_some());
assert!(row.summary().is_none());

assert_item(row.row().unwrap().to().unwrap());

let Ok(Some(row)) = stream.next_or_summary().await else { panic!() };
assert!(row.row().is_none());
assert!(row.summary().is_some());

assert_summary(row.summary().unwrap());


//
// as_items

let mut stream = graph
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
.await
.unwrap();

let items = stream.as_items::<N>()
.try_collect::<Vec<_>>()
.await
.unwrap();

for item in items {
match item {
RowItem::Row(row) => assert_item(row),
RowItem::Summary(summary) => assert_summary(&summary),
}
}


//
// into_stream + finish

let mut stream = graph
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
.await
.unwrap();

let items = stream.into_stream_as::<N>()
.try_collect::<Vec<_>>()
.await
.unwrap();

let Ok(Some(summary)) = stream.finish().await else { panic!() };

for item in items {
assert_item(item);
}

assert_summary(&summary);
}
6 changes: 3 additions & 3 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Graph {
}

/// Runs a query on the configured database using a connection from the connection pool,
/// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream.
/// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream.
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
Expand All @@ -86,7 +86,7 @@ impl Graph {
}

/// Runs a query on the provided database using a connection from the connection pool.
/// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream.
/// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream.
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Graph {
self.impl_execute_on(self.config.db.clone(), q).await
}

/// Executes a query on the provided database and returns a [`DetaRowStream`]
/// Executes a query on the provided database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
Expand Down
35 changes: 33 additions & 2 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,37 @@
//!
//! ```
//!
#![cfg_attr(
feature = "unstable-result-summary",
doc = r##"### Streaming summary

To get access to the result summary after streaming a [`RowStream`], you can use the [`RowStream::next_or_summary`] method.
Alternatively, you can use one of the [`RowStream::as_row_items`], [`RowStream::as_items`], or [`RowStream::column_to_items`]
methods to get the result as a stream of [`RowItem`], whis an enum of either the row or the summary.
The last option is to use one of the [`RowStream::into_stream`], [`RowStream::into_stream_as`], or [`RowStream::column_into_stream`] methods
and after the stream is consumed, call [`RowStream::finish`] to get the summary.

```no_run
use neo4rs::*;

#[tokio::main]
async fn main() {
let uri = "127.0.0.1:7687";
let user = "neo4j";
let pass = "neo";
let graph = Graph::new(uri, user, pass).await.unwrap();

"##
)]
#![cfg_attr(feature="unstable-result-summary", doc = include_str!("../include/result_summary.rs"))]
#![cfg_attr(
feature = "unstable-result-summary",
doc = r"
}
```

"
)]
//! ### Rollback a transaction
//! ```no_run
//! use neo4rs::*;
Expand Down Expand Up @@ -436,7 +467,7 @@ mod pool;
mod query;
mod row;
mod stream;
#[cfg(feature = "unstable-streaming-summary")]
#[cfg(feature = "unstable-result-summary")]
pub mod summary;
mod txn;
mod types;
Expand All @@ -450,7 +481,7 @@ pub use crate::errors::{
pub use crate::graph::{query, Graph};
pub use crate::query::Query;
pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation};
pub use crate::stream::RowStream;
pub use crate::stream::{DetachedRowStream, RowItem, RowStream};
pub use crate::txn::Txn;
pub use crate::types::serde::{
DeError, EndNodeId, Id, Indices, Keys, Labels, Nodes, Offset, Relationships, StartNodeId,
Expand Down
10 changes: 9 additions & 1 deletion lib/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,18 @@ impl BoltRequest {
feature = "unstable-bolt-protocol-impl-v2",
deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.")
)]
pub fn discard() -> BoltRequest {
pub fn discard_all() -> BoltRequest {
BoltRequest::Discard(discard::Discard::default())
}

#[cfg_attr(
feature = "unstable-bolt-protocol-impl-v2",
deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.")
)]
pub fn discard_all_for(query_id: i64) -> BoltRequest {
BoltRequest::Discard(discard::Discard::new(-1, query_id))
}

pub fn begin(db: Option<&str>) -> BoltRequest {
let extra = db.into_iter().map(|db| ("db".into(), db.into())).collect();
let begin = Begin::new(extra);
Expand Down
14 changes: 12 additions & 2 deletions lib/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Query {

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
{
match connection.send_recv(BoltRequest::discard()).await {
match connection.send_recv(BoltRequest::discard_all()).await {
Ok(BoltResponse::Success(_)) => Ok(()),
otherwise => wrap_error(otherwise, "DISCARD"),
}
Expand All @@ -117,7 +117,17 @@ impl Query {
Self::try_request(request, connection).await.map(|success| {
let fields: BoltList = success.get("fields").unwrap_or_default();
let qid: i64 = success.get("qid").unwrap_or(-1);
RowStream::new(qid, fields, fetch_size)

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
{
let available: i64 = success.get("t_first").unwrap_or(-1);
RowStream::new(qid, available, fields, fetch_size)
}

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
{
RowStream::new(qid, fields, fetch_size)
}
})
}

Expand Down
Loading