Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add query options and improve query execution #39

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions examples/basic.mts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cluster } from "../index.js"
import { Cluster } from "../index.js";

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

Expand All @@ -7,15 +7,25 @@ console.log(`Connecting to ${nodes}`);
const cluster = new Cluster({ nodes });
const session = await cluster.connect();

await session.execute("CREATE KEYSPACE IF NOT EXISTS basic WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
await session.execute(
"CREATE KEYSPACE IF NOT EXISTS basic WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }",
);
await session.useKeyspace("basic");

await session.execute("CREATE TABLE IF NOT EXISTS basic (a int, b int, c text, primary key (a, b))");
await session.execute(
"CREATE TABLE IF NOT EXISTS basic (a int, b int, c text, primary key (a, b))",
);

await session.execute("INSERT INTO basic (a, b, c) VALUES (1, 2, 'abc')");
await session.execute("INSERT INTO basic (a, b, c) VALUES (?, ?, ?)", [3, 4, "def"]);

const prepared = await session.prepare("INSERT INTO basic (a, b, c) VALUES (?, 7, ?)");
await session.execute("INSERT INTO basic (a, b, c) VALUES (?, ?, ?)", [
3,
4,
"def",
]);

const prepared = await session.prepare(
"INSERT INTO basic (a, b, c) VALUES (?, 7, ?)",
);
await session.execute(prepared, [42, "I'm prepared!"]);
await session.execute(prepared, [43, "I'm prepared 2!"]);
await session.execute(prepared, [44, "I'm prepared 3!"]);
Expand All @@ -25,7 +35,7 @@ interface RowData {
b: number;
c: string;
}
const result = await session.execute<RowData>("SELECT a, b, c FROM basic");
const result = await session.execute("SELECT a, b, c FROM basic");
console.log(result);

const metrics = session.metrics();
Expand All @@ -34,4 +44,4 @@ console.log(`Iter queries requested: ${metrics.getQueriesIterNum()}`);
console.log(`Errors occurred: ${metrics.getErrorsNum()}`);
console.log(`Iter errors occurred: ${metrics.getErrorsIterNum()}`);
console.log(`Average latency: ${metrics.getLatencyAvgMs()}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
38 changes: 38 additions & 0 deletions examples/prepared.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Cluster } from "../index.js";

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

console.log(`Connecting to ${nodes}`);

const cluster = new Cluster({ nodes });
const session = await cluster.connect();

await session.execute(
"CREATE KEYSPACE IF NOT EXISTS prepared WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }",
);
await session.useKeyspace("prepared");

await session.execute(
"CREATE TABLE IF NOT EXISTS prepared (a int, b int, c text, primary key (a, b))",
);

const prepared = await session.prepare(
"INSERT INTO basic (a, b, c) VALUES (?, 7, ?)",
);
await session.execute(prepared, [42, "I'm prepared!"]);
await session.execute(prepared, [43, "I'm prepared 2!"]);
await session.execute(prepared, [44, "I'm prepared 3!"]);

await session.execute(
"INSERT INTO basic (a, b, c) VALUES (?, 7, ?)",
[45, "I'm also prepared"],
{ prepare: true },
);

const metrics = session.metrics();
console.log(`Queries requested: ${metrics.getQueriesNum()}`);
console.log(`Iter queries requested: ${metrics.getQueriesIterNum()}`);
console.log(`Errors occurred: ${metrics.getErrorsNum()}`);
console.log(`Iter errors occurred: ${metrics.getErrorsIterNum()}`);
console.log(`Average latency: ${metrics.getLatencyAvgMs()}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
21 changes: 20 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export const enum VerifyMode {
None = 0,
Peer = 1
}
export interface QueryOptions {
prepare?: boolean
}
export interface ScyllaKeyspace {
strategy: ScyllaStrategy
tables: Record<string, ScyllaTable>
Expand Down Expand Up @@ -154,7 +157,23 @@ export class Metrics {
export class ScyllaSession {
metrics(): Metrics
getClusterData(): Promise<ScyllaClusterData>
execute(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null): Promise<any>
/**
* Sends a query to the database and receives a response.\
* Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet)
*
* This is the easiest way to make a query, but performance is worse than that of prepared queries.
*
* It is discouraged to use this method with non-empty values argument. In such case, query first needs to be prepared (on a single connection), so
* driver will perform 2 round trips instead of 1. Please use `PreparedStatement` object or `{ prepared: true }` option instead.
*
* # Notes
*
* ## UDT
* Order of fields in the object must match the order of fields as defined in the UDT. The
* driver does not check it by itself, so incorrect data will be written if the order is
* wrong.
*/
execute(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null, options?: QueryOptions | undefined | null): Promise<any>
query(scyllaQuery: Query, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null): Promise<any>
prepare(query: string): Promise<PreparedStatement>
/**
Expand Down
112 changes: 98 additions & 14 deletions src/session/scylla_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ use crate::query::scylla_prepared_statement::PreparedStatement;
use crate::query::scylla_query::Query;
use crate::types::uuid::Uuid;
use napi::bindgen_prelude::{Either3, Either4};
use napi::Either;

use super::metrics;
use super::topology::ScyllaClusterData;

#[napi(object)]
pub struct QueryOptions {
pub prepare: Option<bool>,
}

#[napi]
pub struct ScyllaSession {
session: scylla::Session,
Expand Down Expand Up @@ -39,6 +45,20 @@ impl ScyllaSession {
cluster_data.into()
}

/// Sends a query to the database and receives a response.\
/// Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet)
///
/// This is the easiest way to make a query, but performance is worse than that of prepared queries.
///
/// It is discouraged to use this method with non-empty values argument. In such case, query first needs to be prepared (on a single connection), so
/// driver will perform 2 round trips instead of 1. Please use `PreparedStatement` object or `{ prepared: true }` option instead.
///
/// # Notes
///
/// ## UDT
/// Order of fields in the object must match the order of fields as defined in the UDT. The
/// driver does not check it by itself, so incorrect data will be written if the order is
/// wrong.
#[allow(clippy::type_complexity)]
#[napi]
pub async fn execute(
Expand All @@ -47,27 +67,91 @@ impl ScyllaSession {
parameters: Option<
Vec<Either4<u32, String, &Uuid, HashMap<String, Either3<u32, String, &Uuid>>>>,
>,
options: Option<QueryOptions>,
) -> napi::Result<serde_json::Value> {
let values = QueryParameter::parser(parameters.clone()).ok_or(napi::Error::new(
napi::Status::InvalidArg,
format!("Something went wrong with your query parameters. {parameters:?}"),
))?;
let values = QueryParameter::parser(parameters.clone()).ok_or_else(|| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your query parameters. {:?}",
parameters
),
)
})?;

let should_prepare = options.map_or(false, |options| options.prepare.unwrap_or(false));

let query_result = match query.clone() {
Either3::A(query) => self.session.query(query, values).await,
Either3::B(query) => self.session.query(query.query.clone(), values).await,
Either3::C(prepared) => self.session.execute(&prepared.prepared, values).await,
match query {
Either3::A(ref query_str) if should_prepare => {
let prepared = self.session.prepare(query_str.clone()).await.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong preparing your statement. - [{}]\n{}",
query_str, e
),
)
})?;
self.execute_prepared(&prepared, values, query_str).await
}
Either3::A(query_str) => self.execute_query(Either::A(query_str), values).await,
Either3::B(query_ref) => {
self
.execute_query(Either::B(query_ref.query.clone()), values)
.await
}
Either3::C(prepared_ref) => {
self
.execute_prepared(
&prepared_ref.prepared,
values,
prepared_ref.prepared.get_statement(),
)
.await
}
}
}

// Helper method to handle prepared statements
async fn execute_prepared(
&self,
prepared: &scylla::prepared_statement::PreparedStatement,
values: QueryParameter<'_>,
query: &str,
) -> napi::Result<serde_json::Value> {
let query_result = self.session.execute(prepared, values).await.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your prepared statement. - [{}]\n{}",
query, e
),
)
})?;
Ok(QueryResult::parser(query_result))
}

// Helper method to handle direct queries
async fn execute_query(
&self,
query: Either<String, scylla::query::Query>,
values: QueryParameter<'_>,
) -> napi::Result<serde_json::Value> {
let query_result = match &query {
Either::A(query_str) => self.session.query(query_str.clone(), values).await,
Either::B(query_ref) => self.session.query(query_ref.clone(), values).await,
}
.map_err(|e| {
let query = match query {
Either3::A(query) => query,
Either3::B(query) => query.query.contents.clone(),
Either3::C(prepared) => prepared.prepared.get_statement().to_string(),
let query_str = match query {
Either::A(query_str) => query_str,
Either::B(query_ref) => query_ref.contents.clone(),
};

napi::Error::new(
napi::Status::InvalidArg,
format!("Something went wrong with your query. - [{query}] - {parameters:?}\n{e}"),
format!(
"Something went wrong with your query. - [{}]\n{}",
query_str, e
),
)
})?;

Expand Down
Loading