From 08ce2e24eb6505bb4bc00d202266a37eac007550 Mon Sep 17 00:00:00 2001 From: Daniel Boll <43689101+Daniel-Boll@users.noreply.github.com> Date: Sat, 28 Sep 2024 20:20:34 -0300 Subject: [PATCH] feat(tracing): add query tracing for the session in the `executeWithTracing` method (#43) closes #42 Signed-off-by: Daniel Boll --- Cargo.toml | 3 +- examples/tracing.mts | 18 +++++ index.d.ts | 1 + rust-toolchain.toml | 2 + src/session/scylla_session.rs | 137 +++++++++++++++++++++++++++++++++- src/types/mod.rs | 1 + src/types/tracing.rs | 79 ++++++++++++++++++++ src/types/uuid.rs | 2 +- 8 files changed, 237 insertions(+), 6 deletions(-) create mode 100644 examples/tracing.mts create mode 100644 rust-toolchain.toml create mode 100644 src/types/tracing.rs diff --git a/Cargo.toml b/Cargo.toml index 341ab60..9d1ef98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,10 +24,11 @@ scylla = { version = "0.13.1", features = [ ] } uuid = { version = "1.4.1", features = ["serde", "v4", "fast-rng"] } serde_json = "1.0" +serde = { version = "1.0", features = ["derive"] } openssl = { version = "0.10", features = ["vendored"] } [build-dependencies] napi-build = "2.0.1" [profile.release] -lto = true +lto = true \ No newline at end of file diff --git a/examples/tracing.mts b/examples/tracing.mts new file mode 100644 index 0000000..74ce275 --- /dev/null +++ b/examples/tracing.mts @@ -0,0 +1,18 @@ +import { Cluster } from "../index.js"; + +const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"]; + +console.log(`Connecting to ${nodes}`); + +const cluster = new Cluster({ nodes }); +const session = await cluster.connect(); + +const { tracing } = await session.executeWithTracing( + "SELECT * FROM system_schema.scylla_tables", + [], + // { + // prepare: true, + // }, +); + +console.log(tracing); diff --git a/index.d.ts b/index.d.ts index 4f8002d..d96ad44 100644 --- a/index.d.ts +++ b/index.d.ts @@ -157,6 +157,7 @@ export class Metrics { export class ScyllaSession { metrics(): Metrics getClusterData(): Promise + executeWithTracing(query: string | Query | PreparedStatement, parameters?: Array> | undefined | null, options?: QueryOptions | undefined | null): Promise /** * Sends a query to the database and receives a response.\ * Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet) diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..5d56faf --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" diff --git a/src/session/scylla_session.rs b/src/session/scylla_session.rs index 5b9e215..9e535bf 100644 --- a/src/session/scylla_session.rs +++ b/src/session/scylla_session.rs @@ -6,8 +6,11 @@ use crate::query::batch_statement::ScyllaBatchStatement; use crate::query::scylla_prepared_statement::PreparedStatement; use crate::query::scylla_query::Query; use crate::types::uuid::Uuid; -use napi::bindgen_prelude::{Either3, Either4}; use napi::Either; +use napi::bindgen_prelude::{Either3, Either4}; +use serde_json::json; + +use scylla::statement::query::Query as ScyllaQuery; use super::metrics; use super::topology::ScyllaClusterData; @@ -45,6 +48,64 @@ impl ScyllaSession { cluster_data.into() } + #[allow(clippy::type_complexity)] + #[napi] + pub async fn execute_with_tracing( + &self, + query: Either3, + parameters: Option< + Vec>>>, + >, + options: Option, + ) -> napi::Result { + let values = QueryParameter::parser(parameters.clone()).ok_or_else(|| { + napi::Error::new( + napi::Status::InvalidArg, + format!( + "Something went wrong with your query parameters. {:?}", + parameters + ), + ) + })?; + + let should_prepare = options.map_or(false, |options| options.prepare.unwrap_or(false)); + + match query { + Either3::A(ref query_str) if should_prepare => { + let mut prepared = self.session.prepare(query_str.clone()).await.map_err(|e| { + napi::Error::new( + napi::Status::InvalidArg, + format!( + "Something went wrong preparing your statement. - [{}]\n{}", + query_str, e + ), + ) + })?; + prepared.set_tracing(true); + self.execute_prepared(&prepared, values, query_str).await + } + Either3::A(query_str) => { + let mut query = ScyllaQuery::new(query_str); + query.set_tracing(true); + self.execute_query(Either::B(query), values).await + } + Either3::B(query_ref) => { + let mut query = query_ref.query.clone(); + query.set_tracing(true); + + self.execute_query(Either::B(query), values).await + } + Either3::C(prepared_ref) => { + let mut prepared = prepared_ref.prepared.clone(); + prepared.set_tracing(true); + + self + .execute_prepared(&prepared, values, prepared_ref.prepared.get_statement()) + .await + } + } + } + /// Sends a query to the database and receives a response.\ /// Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet) /// @@ -110,6 +171,18 @@ impl ScyllaSession { .await } } + .map_err(|e| { + napi::Error::new( + napi::Status::InvalidArg, + format!("Something went wrong with your query. - \n{}", e), // TODO: handle different queries here + ) + })? + .get("result") + .cloned() + .ok_or(napi::Error::new( + napi::Status::InvalidArg, + r#"Something went wrong with your query."#.to_string(), // TODO: handle different queries here + )) } // Helper method to handle prepared statements @@ -128,7 +201,36 @@ impl ScyllaSession { ), ) })?; - Ok(QueryResult::parser(query_result)) + + let tracing = if let Some(tracing_id) = query_result.tracing_id { + Some(crate::types::tracing::TracingInfo::from( + self + .session + .get_tracing_info(&tracing_id) + .await + .map_err(|e| { + napi::Error::new( + napi::Status::InvalidArg, + format!( + "Something went wrong with your tracing info. - [{}]\n{}", + query, e + ), + ) + })?, + )) + } else { + None + }; + + let result = QueryResult::parser(query_result); + + dbg!(result.clone()); + dbg!(tracing.clone()); + + Ok(json!({ + "result": result, + "tracing": tracing + })) } // Helper method to handle direct queries @@ -142,7 +244,7 @@ impl ScyllaSession { Either::B(query_ref) => self.session.query(query_ref.clone(), values).await, } .map_err(|e| { - let query_str = match query { + let query_str = match query.clone() { Either::A(query_str) => query_str, Either::B(query_ref) => query_ref.contents.clone(), }; @@ -155,7 +257,34 @@ impl ScyllaSession { ) })?; - Ok(QueryResult::parser(query_result)) + let tracing_info = if let Some(tracing_id) = query_result.tracing_id { + Some(crate::types::tracing::TracingInfo::from( + self + .session + .get_tracing_info(&tracing_id) + .await + .map_err(|e| { + napi::Error::new( + napi::Status::InvalidArg, + format!( + "Something went wrong with your tracing info. - [{}]\n{}", + match query { + Either::A(query_str) => query_str, + Either::B(query_ref) => query_ref.contents.clone(), + }, + e + ), + ) + })?, + )) + } else { + None + }; + + Ok(json!({ + "result": QueryResult::parser(query_result), + "tracing": tracing_info + })) } #[allow(clippy::type_complexity)] diff --git a/src/types/mod.rs b/src/types/mod.rs index c8d27ed..1622fdc 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1 +1,2 @@ +pub mod tracing; pub mod uuid; diff --git a/src/types/tracing.rs b/src/types/tracing.rs new file mode 100644 index 0000000..fcffd5d --- /dev/null +++ b/src/types/tracing.rs @@ -0,0 +1,79 @@ +use std::{collections::HashMap, net::IpAddr}; + +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CqlTimestampWrapper(pub scylla::frame::value::CqlTimestamp); +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CqlTimeuuidWrapper(pub scylla::frame::value::CqlTimeuuid); + +impl Serialize for CqlTimestampWrapper { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_i64(self.0.0) + } +} + +impl Serialize for CqlTimeuuidWrapper { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(format!("{}", self.0).as_str()) + } +} + +/// Tracing info retrieved from `system_traces.sessions` +/// with all events from `system_traces.events` +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct TracingInfo { + pub client: Option, + pub command: Option, + pub coordinator: Option, + pub duration: Option, + pub parameters: Option>, + pub request: Option, + /// started_at is a timestamp - time since unix epoch + pub started_at: Option, + + pub events: Vec, +} + +/// A single event happening during a traced query +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct TracingEvent { + pub event_id: CqlTimeuuidWrapper, + pub activity: Option, + pub source: Option, + pub source_elapsed: Option, + pub thread: Option, +} + +impl From for TracingInfo { + fn from(info: scylla::tracing::TracingInfo) -> Self { + Self { + client: info.client, + command: info.command, + coordinator: info.coordinator, + duration: info.duration, + parameters: info.parameters, + request: info.request, + started_at: info.started_at.map(CqlTimestampWrapper), + events: info.events.into_iter().map(TracingEvent::from).collect(), + } + } +} + +impl From for TracingEvent { + fn from(event: scylla::tracing::TracingEvent) -> Self { + Self { + event_id: CqlTimeuuidWrapper(event.event_id), + activity: event.activity, + source: event.source, + source_elapsed: event.source_elapsed, + thread: event.thread, + } + } +} diff --git a/src/types/uuid.rs b/src/types/uuid.rs index abe453b..35350b1 100644 --- a/src/types/uuid.rs +++ b/src/types/uuid.rs @@ -1,7 +1,7 @@ use napi::Result; #[napi()] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct Uuid { pub(crate) uuid: uuid::Uuid, }