Skip to content

Commit

Permalink
fix byname query
Browse files Browse the repository at this point in the history
  • Loading branch information
cdouglas committed Jul 12, 2024
1 parent 6dc35e6 commit 7e7d80c
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 39 deletions.
12 changes: 7 additions & 5 deletions incremental_transactions/tpcc/sql/byname.sql
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
CREATE VIEW cust_agg AS
SELECT ARRAY_AGG((c_id + c_w_id + c_d_id) ORDER BY c_first) AS cust_array
SELECT c_w_id, c_d_id, ARRAY_AGG(c_id ORDER BY c_first) AS cust_array
FROM (SELECT c.c_id, c.c_w_id, c.c_d_id, c.c_first
FROM customer AS c,
transaction_parameters AS t
WHERE c.c_last = t.c_last
AND c.c_d_id = t.c_d_id
AND c.c_w_id = t.c_w_id
ORDER BY c_first);
AND c.c_w_id = t.c_w_id)
GROUP BY c_w_id, c_d_id;

CREATE VIEW cust_byname AS
SELECT c.c_first, c.c_middle, c.c_id,
SELECT c.c_first, c.c_middle, c.c_last, c.c_id, c.c_w_id, c.c_d_id,
c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip,
c.c_phone, c.c_credit, c.c_credit_lim,
c.c_discount, c.c_balance, c.c_since
FROM customer as c,
cust_agg as a,
transaction_parameters as t
WHERE (c.c_id + c.c_w_id + c.c_d_id) = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1];
WHERE c.c_w_id = a.c_w_id
AND c.c_d_id = a.c_d_id
AND c.c_id = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1];
184 changes: 150 additions & 34 deletions incremental_transactions/tpcc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ mod test {

#[test]
#[allow(unused_variables)]
fn test_byname_sql() {
fn test_byname_sql_inc() {
let cconf = CircuitConfig::with_workers(1);
let (mut circuit, handles) = byname_sql::circuit(cconf).unwrap();
let (mut circuit, handles) = byname_sql_incremental::circuit(cconf).unwrap();
let (
in_warehouse_static,
in_warehouse,
Expand All @@ -88,8 +88,8 @@ mod test {
(
(
Some(i), // c_id
Some(1), // c_d_id
Some(1), // c_w_id
Some(10), // c_d_id
Some(20), // c_w_id
Some(i.to_string()), // c_first
None,
Some("Public".to_string()), // c_last
Expand Down Expand Up @@ -118,27 +118,143 @@ mod test {
in_customer.append(&mut customers);

circuit.step().unwrap();
let byname = out_cust_byname.consolidate();
let agg = out_cust_agg.consolidate();
let byname = out_cust_byname.consolidate();

println!("Loaded customers (nothing to join, should be empty):");
byname
.iter()
.for_each(|(tup, _, z_weight)| println!("{:?}: {:?}", tup.0.unwrap(), z_weight));
println!("cust_agg:");
agg.iter().for_each(|(tup, _, z_weight)| {
println!(
"{:?}: {:?}",
tup.0
.iter()
.map(|x| match x {
Some(value) => format!("{:?}, ", value),
None => "_,".to_string(),
})
.collect::<String>(),
z_weight
// agg.iter().for_each(|(tup, _, z_weight)| {
// println!(
// "{:?}: {:?}",
// tup.0
// .iter()
// .map(|x| match x {
// Some(value) => format!("{:?}, ", value),
// None => "_,".to_string(),
// })
// .collect::<String>(),
// z_weight
// )
// });

in_transaction_parameters.push(
(
Some(4344), // txn_id
None, // w_id
None, // d_id
None, // c_id
Some(20), // c_w_id
Some(10), // c_w_id
Some("Public".to_string()), // c_last
None, // h_amount
None, // h_date
None, // datetime_
)
});
.into(),
1,
);
circuit.step().unwrap();
let agg = out_cust_agg.consolidate(); // .merge(&agg);
let byname = out_cust_byname.consolidate(); // .merge(&byname);
println!("Result");
byname
.iter()
// .for_each(|(tup, _, z_weight)| println!("{:?}: {:?}", tup, z_weight));
.for_each(|(tup, _, z_weight)| println!("{:?} {:?}", tup, z_weight));
// .for_each(|(tup, _, z_weight)| println!("{:?} {:?} {:?} {:?} {:?} {:?}", tup.0.unwrap(), tup.2.unwrap(), tup.3.unwrap(), tup.4.unwrap(), tup.5.unwrap(), z_weight));
println!("cust_agg:");
agg.iter().for_each(|(tup, _, z_weight)| println!("{:?}: {:?}", tup.0.unwrap(), z_weight));
// println!("cust_agg:");
// agg.iter().for_each(|(tup, _, z_weight)| {
// println!(
// "{:?}: {:?}",
// tup.0
// .iter()
// .map(|x| match x {
// Some(value) => format!("{:?}, ", value),
// None => "_,".to_string(),
// })
// .collect::<String>(),
// z_weight
// )
// });
}

#[test]
#[allow(unused_variables)]
fn test_byname_sql() {
let cconf = CircuitConfig::with_workers(1);
let (mut circuit, handles) = byname_sql::circuit(cconf).unwrap();
let (
in_warehouse_static,
in_warehouse,
in_district_static,
in_district_next_id,
in_district_ytd,
in_customer,
in_transaction_parameters,
out_cust_agg,
out_cust_byname,
) = handles;
let mut customers: Vec<_> = (0..5)
.into_iter()
.map(|i| {
(
(
Some(i), // c_id
Some(1), // c_d_id
Some(1), // c_w_id
Some(i.to_string()), // c_first
None,
Some("Public".to_string()), // c_last
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.into(),
1,
)
.into()
})
.collect();
in_customer.append(&mut customers);

// circuit.step().unwrap();
// let byname = out_cust_byname.consolidate();
// let agg = out_cust_agg.consolidate();

// println!("Loaded customers (nothing to join, should be empty):");
// byname
// .iter()
// .for_each(|(tup, _, z_weight)| println!("{:?}: {:?}", tup.0.unwrap(), z_weight));
// println!("cust_agg:");
// agg.iter().for_each(|(tup, _, z_weight)| {
// println!(
// "{:?}: {:?}",
// tup.0
// .iter()
// .map(|x| match x {
// Some(value) => format!("{:?}, ", value),
// None => "_,".to_string(),
// })
// .collect::<String>(),
// z_weight
// )
// });

in_transaction_parameters.push(
(
Expand All @@ -157,25 +273,25 @@ mod test {
1,
);
circuit.step().unwrap();
let byname = out_cust_byname.consolidate().merge(&byname);
let agg = out_cust_agg.consolidate().merge(&agg);
let byname = out_cust_byname.consolidate(); // .merge(&byname);
let agg = out_cust_agg.consolidate(); // .merge(&agg);
println!("Result");
byname
.iter()
.for_each(|(tup, _, z_weight)| println!("{:?}: {:?}", tup.0.unwrap(), z_weight));
println!("cust_agg:");
agg.iter().for_each(|(tup, _, z_weight)| {
println!(
"{:?}: {:?}",
tup.0
.iter()
.map(|x| match x {
Some(value) => format!("{:?}, ", value),
None => "_,".to_string(),
})
.collect::<String>(),
z_weight
)
});
// println!("cust_agg:");
// agg.iter().for_each(|(tup, _, z_weight)| {
// println!(
// "{:?}: {:?}",
// tup.0
// .iter()
// .map(|x| match x {
// Some(value) => format!("{:?}, ", value),
// None => "_,".to_string(),
// })
// .collect::<String>(),
// z_weight
// )
// });
}
}

0 comments on commit 7e7d80c

Please sign in to comment.