Skip to content

Commit

Permalink
chore: TPCC use PrepareStatement on Test
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 26, 2024
1 parent 460c981 commit a068efa
Show file tree
Hide file tree
Showing 11 changed files with 507 additions and 134 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Order-Status : 0.492 (0.575)
Delivery : 6.109 (6.473)
Stock-Level : 0.001 (0.001)
<TpmC>
89.9205557572134 Tpmc
98 Tpmc
```
#### PG Wire Service
run `cargo run --features="net"` to start server
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::utils::lru::SharedLruCache;
use ahash::HashMap;
use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard};
use parking_lot::{RawRwLock, RwLock};
use sqlparser::ast::Statement;
use std::cell::RefCell;
use std::hash::RandomState;
use std::marker::PhantomData;
Expand All @@ -35,6 +34,7 @@ pub(crate) type ScalaFunctions = HashMap<FunctionSummary, Arc<dyn ScalarFunction
pub(crate) type TableFunctions = HashMap<FunctionSummary, Arc<dyn TableFunctionImpl>>;

pub type Args = Vec<(&'static str, DataValue)>;
pub type Statement = sqlparser::ast::Statement;

#[allow(dead_code)]
pub(crate) enum MetaDataLock {
Expand Down
14 changes: 14 additions & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,20 @@ impl LogicalType {
(LogicalType::Integer, _) | (_, LogicalType::UInteger) => Ok(LogicalType::Bigint),
(LogicalType::Smallint, _) | (_, LogicalType::USmallint) => Ok(LogicalType::Integer),
(LogicalType::Tinyint, _) | (_, LogicalType::UTinyint) => Ok(LogicalType::Smallint),
(
LogicalType::Decimal(precision_0, scale_0),
LogicalType::Decimal(precision_1, scale_1),
) => {
let fn_option = |num_0: &Option<u8>, num_1: &Option<u8>| match (num_0, num_1) {
(Some(num_0), Some(num_1)) => Some(*cmp::max(num_0, num_1)),
(Some(num), None) | (None, Some(num)) => Some(*num),
(None, None) => None,
};
Ok(LogicalType::Decimal(
fn_option(precision_0, precision_1),
fn_option(scale_0, scale_1),
))
}
_ => Err(DatabaseError::Incomparable(left.clone(), right.clone())),
}
}
Expand Down
37 changes: 37 additions & 0 deletions src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,7 @@ impl_scalar!(u8, UInt8);
impl_scalar!(u16, UInt16);
impl_scalar!(u32, UInt32);
impl_scalar!(u64, UInt64);
impl_scalar!(Decimal, Decimal);

impl From<String> for DataValue {
fn from(value: String) -> Self {
Expand All @@ -1517,6 +1518,42 @@ impl From<Option<String>> for DataValue {
}
}

impl From<&NaiveDate> for DataValue {
fn from(value: &NaiveDate) -> Self {
DataValue::Date32(Some(value.num_days_from_ce()))
}
}

impl From<Option<&NaiveDate>> for DataValue {
fn from(value: Option<&NaiveDate>) -> Self {
DataValue::Date32(value.map(|d| d.num_days_from_ce()))
}
}

impl From<&NaiveDateTime> for DataValue {
fn from(value: &NaiveDateTime) -> Self {
DataValue::Date64(Some(value.and_utc().timestamp()))
}
}

impl From<Option<&NaiveDateTime>> for DataValue {
fn from(value: Option<&NaiveDateTime>) -> Self {
DataValue::Date64(value.map(|d| d.and_utc().timestamp()))
}
}

impl From<&NaiveTime> for DataValue {
fn from(value: &NaiveTime) -> Self {
DataValue::Time(Some(value.num_seconds_from_midnight()))
}
}

impl From<Option<&NaiveTime>> for DataValue {
fn from(value: Option<&NaiveTime>) -> Self {
DataValue::Time(value.map(|d| d.num_seconds_from_midnight()))
}
}

impl From<&sqlparser::ast::Value> for DataValue {
fn from(v: &sqlparser::ast::Value) -> Self {
match v {
Expand Down
36 changes: 18 additions & 18 deletions tpcc/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ run `cargo run -p tpcc --release` to run tpcc
- YMTC PC411-1024GB-B
- Tips: TPCC currently only supports single thread
```shell
|New-Order| sc: 1084 lt: 0 fl: 11
|Payment| sc: 1062 lt: 0 fl: 0
|Order-Status| sc: 102 lt: 4 fl: 36
|Delivery| sc: 107 lt: 0 fl: 0
|Stock-Level| sc: 106 lt: 0 fl: 0
in 723 sec.
|New-Order| sc: 1182 lt: 0 fl: 13
|Payment| sc: 1155 lt: 0 fl: 0
|Order-Status| sc: 115 lt: 1 fl: 29
|Delivery| sc: 114 lt: 2 fl: 0
|Stock-Level| sc: 115 lt: 0 fl: 0
in 720 sec.
<Constraint Check> (all must be [OK])
[transaction percentage]
Payment: 43.0% (>=43.0%) [Ok]
Expand All @@ -21,23 +21,23 @@ in 723 sec.
[response time (at least 90%% passed)]
New-Order: 100.0 [OK]
Payment: 100.0 [OK]
Order-Status: 96.2 [OK]
Delivery: 100.0 [OK]
Order-Status: 99.1 [OK]
Delivery: 98.3 [OK]
Stock-Level: 100.0 [OK]
New-Order Total: 1084
Payment Total: 1062
Order-Status Total: 106
Delivery Total: 107
Stock-Level Total: 106
New-Order Total: 1182
Payment Total: 1155
Order-Status Total: 116
Delivery Total: 116
Stock-Level Total: 115

<90th Percentile RT (MaxRT)>
New-Order : 0.005 (0.007)
Payment : 0.084 (0.141)
Order-Status : 0.492 (0.575)
Delivery : 6.109 (6.473)
New-Order : 0.003 (0.011)
Payment : 0.078 (0.470)
Order-Status : 0.227 (0.240)
Delivery : 5.439 (27.702)
Stock-Level : 0.001 (0.001)
<TpmC>
89.9205557572134 Tpmc
98 Tpmc
```

## Refer to
Expand Down
91 changes: 71 additions & 20 deletions tpcc/src/delivery.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::load::DIST_PER_WARE;
use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction};
use chrono::Utc;
use fnck_sql::db::DBTransaction;
use fnck_sql::db::{DBTransaction, Statement};
use fnck_sql::storage::Storage;
use fnck_sql::types::value::DataValue;
use rand::prelude::ThreadRng;
use rand::Rng;

Expand All @@ -24,37 +25,86 @@ pub(crate) struct DeliveryTest;
impl<S: Storage> TpccTransaction<S> for Delivery {
type Args = DeliveryArgs;

fn run(tx: &mut DBTransaction<S>, args: &Self::Args) -> Result<(), TpccError> {
let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string();
fn run(
tx: &mut DBTransaction<S>,
args: &Self::Args,
statements: &[Statement],
) -> Result<(), TpccError> {
let now = Utc::now().naive_utc();

for d_id in 1..DIST_PER_WARE + 1 {
// "SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ? AND no_w_id = ?"
let (_, tuple) = tx.run(format!("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = {} AND no_w_id = {}", d_id, args.w_id))?;
let no_o_id = tuple[0].values[0].i32().unwrap();
let (_, tuples) = tx.execute(
&statements[0],
vec![
("?1", DataValue::Int8(Some(d_id as i8))),
("?2", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
let no_o_id = tuples[0].values[0].i32().unwrap();

if no_o_id == 0 {
continue;
}
// "DELETE FROM new_orders WHERE no_o_id = ? AND no_d_id = ? AND no_w_id = ?"
let _ = tx.run(format!(
"DELETE FROM new_orders WHERE no_o_id = {} AND no_d_id = {} AND no_w_id = {}",
no_o_id, d_id, args.w_id
))?;
let (_, tuples) = tx.execute(
&statements[1],
vec![
("?1", DataValue::Int32(Some(no_o_id))),
("?2", DataValue::Int8(Some(d_id as i8))),
("?3", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
// "SELECT o_c_id FROM orders WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?"
let (_, tuple) = tx.run(format!(
"SELECT o_c_id FROM orders WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}",
no_o_id, d_id, args.w_id
))?;
let c_id = tuple[0].values[0].i32().unwrap();
let (_, tuples) = tx.execute(
&statements[2],
vec![
("?1", DataValue::Int32(Some(no_o_id))),
("?2", DataValue::Int8(Some(d_id as i8))),
("?3", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
let c_id = tuples[0].values[0].i32().unwrap();
// "UPDATE orders SET o_carrier_id = ? WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?"
let _ = tx.run(format!("UPDATE orders SET o_carrier_id = {} WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", args.o_carrier_id, no_o_id, d_id, args.w_id))?;
let (_, tuples) = tx.execute(
&statements[3],
vec![
("?1", DataValue::Int8(Some(args.o_carrier_id as i8))),
("?2", DataValue::Int32(Some(no_o_id))),
("?3", DataValue::Int8(Some(d_id as i8))),
("?4", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
// "UPDATE order_line SET ol_delivery_d = ? WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?"
let _ = tx.run(format!("UPDATE order_line SET ol_delivery_d = '{}' WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", now, no_o_id, d_id, args.w_id))?;
let (_, tuples) = tx.execute(
&statements[4],
vec![
("?1", DataValue::from(&now)),
("?2", DataValue::Int32(Some(no_o_id))),
("?3", DataValue::Int8(Some(d_id as i8))),
("?4", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
// "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?"
let (_, tuple) = tx.run(format!("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", no_o_id, d_id, args.w_id))?;
let ol_total = tuple[0].values[0].decimal().unwrap();
let (_, tuples) = tx.execute(
&statements[5],
vec![
("?1", DataValue::Int32(Some(no_o_id))),
("?2", DataValue::Int8(Some(d_id as i8))),
("?3", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
let ol_total = tuples[0].values[0].decimal().unwrap();
// "UPDATE customer SET c_balance = c_balance + ? , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ? AND c_d_id = ? AND c_w_id = ?"
let _ = tx.run(format!("UPDATE customer SET c_balance = c_balance + {} , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = {} AND c_d_id = {} AND c_w_id = {}", ol_total, c_id, d_id, args.w_id))?;
let (_, tuples) = tx.execute(
&statements[6],
vec![
("?1", DataValue::Decimal(Some(ol_total))),
("?2", DataValue::Int32(Some(c_id))),
("?3", DataValue::Int8(Some(d_id as i8))),
("?4", DataValue::Int16(Some(args.w_id as i16))),
],
)?;
}

Ok(())
Expand All @@ -72,12 +122,13 @@ impl<S: Storage> TpccTest<S> for DeliveryTest {
tx: &mut DBTransaction<S>,
num_ware: usize,
_: &TpccArgs,
statements: &[Statement],
) -> Result<(), TpccError> {
let w_id = rng.gen_range(0..num_ware) + 1;
let o_carrier_id = rng.gen_range(1..10);

let args = DeliveryArgs::new(w_id, o_carrier_id);
Delivery::run(tx, &args)?;
Delivery::run(tx, &args, statements)?;

Ok(())
}
Expand Down
63 changes: 60 additions & 3 deletions tpcc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::rt_hist::RtHist;
use crate::slev::SlevTest;
use crate::utils::SeqGen;
use clap::Parser;
use fnck_sql::db::{DBTransaction, DataBaseBuilder};
use fnck_sql::db::{DBTransaction, DataBaseBuilder, Statement};
use fnck_sql::errors::DatabaseError;
use fnck_sql::storage::Storage;
use rand::prelude::ThreadRng;
Expand Down Expand Up @@ -35,7 +35,11 @@ pub(crate) const RT_LIMITS: [Duration; 5] = [
pub(crate) trait TpccTransaction<S: Storage> {
type Args;

fn run(tx: &mut DBTransaction<S>, args: &Self::Args) -> Result<(), TpccError>;
fn run(
tx: &mut DBTransaction<S>,
args: &Self::Args,
statements: &[Statement],
) -> Result<(), TpccError>;
}

pub(crate) trait TpccTest<S: Storage> {
Expand All @@ -47,6 +51,7 @@ pub(crate) trait TpccTest<S: Storage> {
tx: &mut DBTransaction<S>,
num_ware: usize,
args: &TpccArgs,
statements: &[Statement],
) -> Result<(), TpccError>;
}

Expand Down Expand Up @@ -81,6 +86,56 @@ fn main() -> Result<(), TpccError> {
Load::load_custs(&mut rng, &database, args.num_ware)?;
Load::load_ord(&mut rng, &database, args.num_ware)?;

let test_statements = vec![
vec![
database.prepare("SELECT c.c_discount, c.c_last, c.c_credit, w.w_tax FROM customer AS c JOIN warehouse AS w ON c.c_w_id = w_id AND w.w_id = ?1 AND c.c_w_id = ?2 AND c.c_d_id = ?3 AND c.c_id = ?4")?,
database.prepare("SELECT c_discount, c_last, c_credit FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?,
database.prepare("SELECT w_tax FROM warehouse WHERE w_id = ?1")?,
database.prepare("SELECT d_next_o_id, d_tax FROM district WHERE d_id = ?1 AND d_w_id = ?2")?,
database.prepare("UPDATE district SET d_next_o_id = ?1 + 1 WHERE d_id = ?2 AND d_w_id = ?3")?,
database.prepare("INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7)")?,
database.prepare("INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES (?1,?2,?3)")?,
database.prepare("SELECT i_price, i_name, i_data FROM item WHERE i_id = ?1")?,
database.prepare("SELECT s_quantity, s_data, s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10 FROM stock WHERE s_i_id = ?1 AND s_w_id = ?2")?,
database.prepare("UPDATE stock SET s_quantity = ?1 WHERE s_i_id = ?2 AND s_w_id = ?3")?,
database.prepare("INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)")?,
],
vec![
database.prepare("UPDATE warehouse SET w_ytd = w_ytd + ?1 WHERE w_id = ?2")?,
database.prepare("SELECT w_street_1, w_street_2, w_city, w_state, w_zip, w_name FROM warehouse WHERE w_id = ?1")?,
database.prepare("UPDATE district SET d_ytd = d_ytd + ?1 WHERE d_w_id = ?2 AND d_id = ?3")?,
database.prepare("SELECT d_street_1, d_street_2, d_city, d_state, d_zip, d_name FROM district WHERE d_w_id = ?1 AND d_id = ?2")?,
database.prepare("SELECT count(c_id) FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3")?,
database.prepare("SELECT c_id FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3 ORDER BY c_first")?,
database.prepare("SELECT c_first, c_middle, c_last, c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_credit, c_credit_lim, c_discount, c_balance, c_since FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?,
database.prepare("SELECT c_data FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?,
database.prepare("UPDATE customer SET c_balance = ?1, c_data = ?2 WHERE c_w_id = ?3 AND c_d_id = ?4 AND c_id = ?5")?,
database.prepare("UPDATE customer SET c_balance = ?1 WHERE c_w_id = ?2 AND c_d_id = ?3 AND c_id = ?4")?,
database.prepare("INSERT OVERWRITE history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?,
],
vec![
database.prepare("SELECT count(c_id) FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3")?,
database.prepare("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_last = ?3 ORDER BY c_first")?,
database.prepare("SELECT c_balance, c_first, c_middle, c_last FROM customer WHERE c_w_id = ?1 AND c_d_id = ?2 AND c_id = ?3")?,
database.prepare("SELECT o_id, o_entry_d, COALESCE(o_carrier_id,0) FROM orders WHERE o_w_id = ?1 AND o_d_id = ?2 AND o_c_id = ?3 AND o_id = (SELECT MAX(o_id) FROM orders WHERE o_w_id = ?4 AND o_d_id = ?5 AND o_c_id = ?6)")?,
database.prepare("SELECT ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_delivery_d FROM order_line WHERE ol_w_id = ?1 AND ol_d_id = ?2 AND ol_o_id = ?3")?
],
vec![
database.prepare("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ?1 AND no_w_id = ?2")?,
database.prepare("DELETE FROM new_orders WHERE no_o_id = ?1 AND no_d_id = ?2 AND no_w_id = ?3")?,
database.prepare("SELECT o_c_id FROM orders WHERE o_id = ?1 AND o_d_id = ?2 AND o_w_id = ?3")?,
database.prepare("UPDATE orders SET o_carrier_id = ?1 WHERE o_id = ?2 AND o_d_id = ?3 AND o_w_id = ?4")?,
database.prepare("UPDATE order_line SET ol_delivery_d = ?1 WHERE ol_o_id = ?2 AND ol_d_id = ?3 AND ol_w_id = ?4")?,
database.prepare("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ?1 AND ol_d_id = ?2 AND ol_w_id = ?3")?,
database.prepare("UPDATE customer SET c_balance = c_balance + ?1 , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ?2 AND c_d_id = ?3 AND c_w_id = ?4")?,
],
vec![
database.prepare("SELECT d_next_o_id FROM district WHERE d_id = ?1 AND d_w_id = ?2")?,
database.prepare("SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = ?1 AND ol_d_id = ?2 AND ol_o_id < ?3 AND ol_o_id >= (?4 - 20)")?,
database.prepare("SELECT count(*) FROM stock WHERE s_w_id = ?1 AND s_i_id = ?2 AND s_quantity < ?3")?,
],
];

let mut rt_hist = RtHist::new();
let mut success = [0usize; 5];
let mut late = [0usize; 5];
Expand All @@ -102,13 +157,15 @@ fn main() -> Result<(), TpccError> {
while tpcc_start.elapsed() < duration {
let i = seq_gen.get();
let tpcc_test = &tests[i];
let statement = &test_statements[i];

let mut is_succeed = false;
for j in 0..args.max_retry + 1 {
let transaction_start = Instant::now();
let mut tx = database.new_transaction()?;

if let Err(err) = tpcc_test.do_transaction(&mut rng, &mut tx, args.num_ware, &tpcc_args)
if let Err(err) =
tpcc_test.do_transaction(&mut rng, &mut tx, args.num_ware, &tpcc_args, &statement)
{
failure[i] += 1;
eprintln!(
Expand Down
Loading

0 comments on commit a068efa

Please sign in to comment.