Skip to content

Commit

Permalink
feat(tracing): add query tracing for the session in the `executeWithT…
Browse files Browse the repository at this point in the history
…racing` method (#43)

closes #42

Signed-off-by: Daniel Boll <[email protected]>
  • Loading branch information
Daniel-Boll authored Sep 28, 2024
1 parent a5d1252 commit 08ce2e2
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 6 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ scylla = { version = "0.13.1", features = [
] }
uuid = { version = "1.4.1", features = ["serde", "v4", "fast-rng"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
openssl = { version = "0.10", features = ["vendored"] }

[build-dependencies]
napi-build = "2.0.1"

[profile.release]
lto = true
lto = true
18 changes: 18 additions & 0 deletions examples/tracing.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Cluster } from "../index.js";

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

console.log(`Connecting to ${nodes}`);

const cluster = new Cluster({ nodes });
const session = await cluster.connect();

const { tracing } = await session.executeWithTracing(
"SELECT * FROM system_schema.scylla_tables",
[],
// {
// prepare: true,
// },
);

console.log(tracing);
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export class Metrics {
export class ScyllaSession {
metrics(): Metrics
getClusterData(): Promise<ScyllaClusterData>
executeWithTracing(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null, options?: QueryOptions | undefined | null): Promise<any>
/**
* Sends a query to the database and receives a response.\
* Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet)
Expand Down
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"
137 changes: 133 additions & 4 deletions src/session/scylla_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use crate::query::batch_statement::ScyllaBatchStatement;
use crate::query::scylla_prepared_statement::PreparedStatement;
use crate::query::scylla_query::Query;
use crate::types::uuid::Uuid;
use napi::bindgen_prelude::{Either3, Either4};
use napi::Either;
use napi::bindgen_prelude::{Either3, Either4};
use serde_json::json;

use scylla::statement::query::Query as ScyllaQuery;

use super::metrics;
use super::topology::ScyllaClusterData;
Expand Down Expand Up @@ -45,6 +48,64 @@ impl ScyllaSession {
cluster_data.into()
}

#[allow(clippy::type_complexity)]
#[napi]
pub async fn execute_with_tracing(
&self,
query: Either3<String, &Query, &PreparedStatement>,
parameters: Option<
Vec<Either4<u32, String, &Uuid, HashMap<String, Either3<u32, String, &Uuid>>>>,
>,
options: Option<QueryOptions>,
) -> napi::Result<serde_json::Value> {
let values = QueryParameter::parser(parameters.clone()).ok_or_else(|| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your query parameters. {:?}",
parameters
),
)
})?;

let should_prepare = options.map_or(false, |options| options.prepare.unwrap_or(false));

match query {
Either3::A(ref query_str) if should_prepare => {
let mut prepared = self.session.prepare(query_str.clone()).await.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong preparing your statement. - [{}]\n{}",
query_str, e
),
)
})?;
prepared.set_tracing(true);
self.execute_prepared(&prepared, values, query_str).await
}
Either3::A(query_str) => {
let mut query = ScyllaQuery::new(query_str);
query.set_tracing(true);
self.execute_query(Either::B(query), values).await
}
Either3::B(query_ref) => {
let mut query = query_ref.query.clone();
query.set_tracing(true);

self.execute_query(Either::B(query), values).await
}
Either3::C(prepared_ref) => {
let mut prepared = prepared_ref.prepared.clone();
prepared.set_tracing(true);

self
.execute_prepared(&prepared, values, prepared_ref.prepared.get_statement())
.await
}
}
}

/// Sends a query to the database and receives a response.\
/// Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet)
///
Expand Down Expand Up @@ -110,6 +171,18 @@ impl ScyllaSession {
.await
}
}
.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!("Something went wrong with your query. - \n{}", e), // TODO: handle different queries here
)
})?
.get("result")
.cloned()
.ok_or(napi::Error::new(
napi::Status::InvalidArg,
r#"Something went wrong with your query."#.to_string(), // TODO: handle different queries here
))
}

// Helper method to handle prepared statements
Expand All @@ -128,7 +201,36 @@ impl ScyllaSession {
),
)
})?;
Ok(QueryResult::parser(query_result))

let tracing = if let Some(tracing_id) = query_result.tracing_id {
Some(crate::types::tracing::TracingInfo::from(
self
.session
.get_tracing_info(&tracing_id)
.await
.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your tracing info. - [{}]\n{}",
query, e
),
)
})?,
))
} else {
None
};

let result = QueryResult::parser(query_result);

dbg!(result.clone());
dbg!(tracing.clone());

Ok(json!({
"result": result,
"tracing": tracing
}))
}

// Helper method to handle direct queries
Expand All @@ -142,7 +244,7 @@ impl ScyllaSession {
Either::B(query_ref) => self.session.query(query_ref.clone(), values).await,
}
.map_err(|e| {
let query_str = match query {
let query_str = match query.clone() {
Either::A(query_str) => query_str,
Either::B(query_ref) => query_ref.contents.clone(),
};
Expand All @@ -155,7 +257,34 @@ impl ScyllaSession {
)
})?;

Ok(QueryResult::parser(query_result))
let tracing_info = if let Some(tracing_id) = query_result.tracing_id {
Some(crate::types::tracing::TracingInfo::from(
self
.session
.get_tracing_info(&tracing_id)
.await
.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your tracing info. - [{}]\n{}",
match query {
Either::A(query_str) => query_str,
Either::B(query_ref) => query_ref.contents.clone(),
},
e
),
)
})?,
))
} else {
None
};

Ok(json!({
"result": QueryResult::parser(query_result),
"tracing": tracing_info
}))
}

#[allow(clippy::type_complexity)]
Expand Down
1 change: 1 addition & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod tracing;
pub mod uuid;
79 changes: 79 additions & 0 deletions src/types/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::{collections::HashMap, net::IpAddr};

use serde::Serialize;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CqlTimestampWrapper(pub scylla::frame::value::CqlTimestamp);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CqlTimeuuidWrapper(pub scylla::frame::value::CqlTimeuuid);

impl Serialize for CqlTimestampWrapper {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_i64(self.0.0)
}
}

impl Serialize for CqlTimeuuidWrapper {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(format!("{}", self.0).as_str())
}
}

/// Tracing info retrieved from `system_traces.sessions`
/// with all events from `system_traces.events`
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct TracingInfo {
pub client: Option<IpAddr>,
pub command: Option<String>,
pub coordinator: Option<IpAddr>,
pub duration: Option<i32>,
pub parameters: Option<HashMap<String, String>>,
pub request: Option<String>,
/// started_at is a timestamp - time since unix epoch
pub started_at: Option<CqlTimestampWrapper>,

pub events: Vec<TracingEvent>,
}

/// A single event happening during a traced query
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct TracingEvent {
pub event_id: CqlTimeuuidWrapper,
pub activity: Option<String>,
pub source: Option<IpAddr>,
pub source_elapsed: Option<i32>,
pub thread: Option<String>,
}

impl From<scylla::tracing::TracingInfo> for TracingInfo {
fn from(info: scylla::tracing::TracingInfo) -> Self {
Self {
client: info.client,
command: info.command,
coordinator: info.coordinator,
duration: info.duration,
parameters: info.parameters,
request: info.request,
started_at: info.started_at.map(CqlTimestampWrapper),
events: info.events.into_iter().map(TracingEvent::from).collect(),
}
}
}

impl From<scylla::tracing::TracingEvent> for TracingEvent {
fn from(event: scylla::tracing::TracingEvent) -> Self {
Self {
event_id: CqlTimeuuidWrapper(event.event_id),
activity: event.activity,
source: event.source,
source_elapsed: event.source_elapsed,
thread: event.thread,
}
}
}
2 changes: 1 addition & 1 deletion src/types/uuid.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use napi::Result;

#[napi()]
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Uuid {
pub(crate) uuid: uuid::Uuid,
}
Expand Down

0 comments on commit 08ce2e2

Please sign in to comment.