Skip to content

Commit

Permalink
add crud
Browse files Browse the repository at this point in the history
  • Loading branch information
jjcfrancisco committed Jul 7, 2024
1 parent 631a73a commit e8865b1
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 81 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ shapefile = "0.6.0"
postgres-protocol = "0.6.6"
postgres = { version = "0.19.7", features = ["with-serde_json-1"] }
geo = "0.28.0"
wkb = "0.7.1"
bytes = "1.6.0"
86 changes: 86 additions & 0 deletions src/pg/binary_copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use crate::Result;
use bytes::BytesMut;
use postgres::types::to_sql_checked;
use postgres::types::{IsNull, ToSql, Type};
use std::error::Error;
use postgres::Statement;

use postgres::binary_copy::BinaryCopyInWriter;
use postgres::CopyInWriter;

use crate::utils::{NewTableTypes, Rows};
use crate::pg::crud::create_connection;

#[derive(Debug)]
pub struct Wkb {
pub geometry: Vec<u8>,
}

impl ToSql for Wkb {
fn to_sql(
&self,
_: &Type,
out: &mut BytesMut,
) -> std::result::Result<IsNull, Box<dyn Error + Send + Sync>> {
out.extend_from_slice(&self.geometry);
Ok(IsNull::No)
}

fn accepts(ty: &Type) -> bool {
ty.name() == "geometry"
}

to_sql_checked!();
}

pub fn infer_geom_type(stmt: Statement) -> Result<Type> {
let column = stmt.columns().get(0).expect("Failed to get columns");
Ok(column.type_().clone())
}

pub fn insert_rows<'a>(
rows: &Rows,
config: &Vec<NewTableTypes>,
geom_type: Type,
uri: &str,
schema: &Option<String>,
table: &str,
) -> Result<()> {
// Create connection
let mut client = create_connection(uri)?;

// Prepare types for binary copy
let mut types: Vec<Type> = Vec::new();
for column in config.iter() {
types.push(column.data_type.clone());
}
types.push(geom_type);

// Binary copy in writer
let mut query = String::from("COPY ");
if let Some(schema) = schema {
query.push_str(&format!("{}.{}", schema, table));
} else {
query.push_str(&table);
}
query.push_str(" (");
for column in config.iter() {
query.push_str(&format!("{},", column.column_name));
}
query.push_str("geom) FROM stdin BINARY");
let writer: CopyInWriter = client.copy_in(&query)?;

let mut writer = BinaryCopyInWriter::new(writer, &types);

println!("{:?}", types);

for row in rows.rows.iter() {
// Transform row into vector of ToSql

// writer
// .write(&[&row])
// .expect("Failed to insert row into database");
}

Ok(())
}
58 changes: 58 additions & 0 deletions src/pg/crud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::Result;
use postgres::types::Type;
use postgres::Statement;

use postgres::{Client, NoTls};

use crate::utils::NewTableTypes;

pub fn create_connection(uri: &str) -> Result<Client> {
let client = Client::connect(uri, NoTls)?;
Ok(client)
}

pub fn create_table(
table_name: &str,
schema_name: &Option<String>,
config: &Vec<NewTableTypes>,
uri: &str,
) -> Result<Statement> {
let mut query = String::from("CREATE TABLE IF NOT EXISTS ");
if let Some(schema) = schema_name {
query.push_str(&format!("{}.{} ", schema, table_name));
} else {
query.push_str(&table_name);
}
query.push_str("(");
for column in config.iter() {
match column.data_type {
Type::INT8 => {
query.push_str(&format!("{} INT,", column.column_name));
}
Type::FLOAT8 => {
query.push_str(&format!("{} DOUBLE,", column.column_name));
}
Type::TEXT => {
query.push_str(&format!("{} TEXT,", column.column_name));
}
Type::BOOL => {
query.push_str(&format!("{} BOOL,", column.column_name));
}
_ => println!("Type currently not supported"),
}
}
query.push_str("geom Geometry(Geometry, 4326)");
query.push_str(");");
println!("{}", query);

let mut client = create_connection(uri)?;
client.execute(&query, &[])?;

let stmt = if let Some(schema) = schema_name {
client.prepare(&format!("SELECT geom FROM {}.{}", schema, table_name))?
} else {
client.prepare(&format!("SELECT geom FROM {}", table_name))?
};

Ok(stmt)
}
66 changes: 2 additions & 64 deletions src/pg/mod.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,2 @@
use crate::Result;
use postgres::Statement;
use postgres::types::Type;

use postgres::{Client, NoTls, CopyInWriter};

use crate::utils::NewTableTypes;

pub fn create_connection(uri: &str) -> Result<Client> {
let client = Client::connect(uri, NoTls)?;
Ok(client)
}

// pub fn create_table(client: &mut Client) -> Result<Statement> {
// client.execute("CREATE TABLE IF NOT EXISTS pio (
// id INT,
// properties JSONB,
// geometry geometry);", &[])?;
//
// let stmt = client.prepare("SELECT geometry FROM pio")?;
// Ok(stmt)
// }

pub fn infer_geom_type(stmt: Statement) -> Result<Type> {
let column = stmt.columns().get(0).expect("Failed to get columns");
Ok(column.type_().clone())
}

pub fn create_binary_writer<'a>(client: &'a mut Client) -> Result<CopyInWriter<'a>> {
let writer:CopyInWriter = client.copy_in("COPY pio (id, properties, geometry) FROM stdin BINARY")?;
Ok(writer)
}

pub fn create_table(table_name: &str, schema_name: Option<String>, config: &Vec<NewTableTypes>, client: &mut Client) -> Result<()> {
let mut query = String::from("CREATE TABLE IF NOT EXISTS ");
if let Some(schema) = schema_name {
query.push_str(&format!("{}.{} ", schema, table_name));
} else {
query.push_str(&table_name);
}
query.push_str("(");
for column in config.iter() {
match column.data_type {
Type::INT8 => {
query.push_str(&format!(" {} INT,", column.column_name));
},
Type::FLOAT8 => {
query.push_str(&format!(" {} DOUBLE,", column.column_name));
},
Type::TEXT => {
query.push_str(&format!(" {} TEXT,", column.column_name));
},
Type::BOOL => {
query.push_str(&format!(" {} BOOL,", column.column_name));
},
_ => println!("Type currently not supported"),
}
}
query.push_str("geom GEOMETRY");
query.push_str(");");
println!("{}", query);
client.execute(&query, &[])?;
Ok(())
}
pub mod binary_copy;
pub mod crud;
11 changes: 6 additions & 5 deletions src/utils/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::Result;
use crate::utils::{determine_file_type, FileType};
use crate::utils::validate::validate_args;
use crate::utils::shp::{read_shapefile, determine_data_types};
use crate::pg::{create_connection, create_table};
use crate::pg::crud::create_table;
use crate::pg::binary_copy::{infer_geom_type, insert_rows};

use clap::Parser;

Expand All @@ -27,7 +28,7 @@ pub fn run() -> Result<()> {
validate_args(&args)?;

let file_type = determine_file_type(&args.input)?;
let data = match file_type {
let rows = match file_type {
FileType::Shapefile => {
read_shapefile(&args.input)?
}
Expand All @@ -36,9 +37,9 @@ pub fn run() -> Result<()> {
}
};
let config = determine_data_types(&args.input)?;
let mut client = create_connection(&args.uri)?;
create_table(&args.table, args.schema, &config, &mut client)?;
// Insert data into table using Rows
let stmt = create_table(&args.table, &args.schema, &config, &args.uri)?;
let geom_type = infer_geom_type(stmt)?;
insert_rows(&rows, &config, geom_type, &args.uri, &args.schema, &args.table)?;

Ok(())
}
15 changes: 7 additions & 8 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::Result;
use std::path::Path;
use postgres::types::Type;
use geo::Coord;
use geo::geometry::Geometry;
use shapefile::Shape;
use crate::pg::binary_copy::Wkb;

pub mod cli;
pub mod shp;
Expand All @@ -15,10 +15,12 @@ pub struct NewTableTypes {
pub data_type: Type,
}

#[derive(Debug)]
pub struct Row {
pub columns: Vec<AcceptedTypes>,
}

#[derive(Debug)]
pub struct Rows {
pub rows: Vec<Row>,
}
Expand All @@ -42,13 +44,14 @@ impl Rows {
}

// Enum to hold accepted data types
enum AcceptedTypes {
#[derive(Debug)]
pub enum AcceptedTypes {
Int(i64),
Float(f64),
// Float(f64),
Double(f64),
Text(String),
Bool(bool),
Geometry(Geometry<f64>),
Geometry(Wkb),
}

// Create enum of supported file types
Expand All @@ -71,10 +74,6 @@ fn determine_file_type(input_file: &str) -> Result<FileType> {
}
}

fn determine_column_types() -> Result<()> {
Ok(())
}

pub fn to_geo(shape: &Shape) -> Result<geo::Geometry<f64>> {
match shape {
Shape::Point(p) => Ok(geo::Point::new(p.x, p.y).into()),
Expand Down
9 changes: 5 additions & 4 deletions src/utils/shp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::Result;
use postgres::types::Type;
use shapefile::dbase::FieldValue;

use crate::pg::binary_copy::Wkb;
use crate::utils::to_geo;
use crate::utils::{AcceptedTypes, NewTableTypes, Row, Rows};
use wkb::geom_to_wkb;

pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
let mut table_config: Vec<NewTableTypes> = Vec::new();
Expand Down Expand Up @@ -57,7 +59,6 @@ pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
}

pub fn read_shapefile(file_path: &str) -> Result<Rows> {
// Create a new vector that can hold any data type from below
let mut rows = Rows::new();
let mut reader = shapefile::Reader::from_path(file_path)?;
for shape_record in reader.iter_shapes_and_records() {
Expand Down Expand Up @@ -95,9 +96,9 @@ pub fn read_shapefile(file_path: &str) -> Result<Rows> {
}
}

// Transform shape to geometry then push
let geometry = to_geo(&shape)?;
row.add(AcceptedTypes::Geometry(geometry));
let geom = to_geo(&shape)?;
let wkb = geom_to_wkb(&geom).expect("Failed to insert node into database");
row.add(AcceptedTypes::Geometry(Wkb { geometry: wkb }));
rows.add(row);
}

Expand Down

0 comments on commit e8865b1

Please sign in to comment.