diff --git a/Cargo.toml b/Cargo.toml index dff17cc..5a0fa63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,5 @@ shapefile = "0.6.0" postgres-protocol = "0.6.6" postgres = { version = "0.19.7", features = ["with-serde_json-1"] } geo = "0.28.0" +wkb = "0.7.1" +bytes = "1.6.0" diff --git a/src/pg/binary_copy.rs b/src/pg/binary_copy.rs new file mode 100644 index 0000000..af2a42f --- /dev/null +++ b/src/pg/binary_copy.rs @@ -0,0 +1,86 @@ +use crate::Result; +use bytes::BytesMut; +use postgres::types::to_sql_checked; +use postgres::types::{IsNull, ToSql, Type}; +use std::error::Error; +use postgres::Statement; + +use postgres::binary_copy::BinaryCopyInWriter; +use postgres::CopyInWriter; + +use crate::utils::{NewTableTypes, Rows}; +use crate::pg::crud::create_connection; + +#[derive(Debug)] +pub struct Wkb { + pub geometry: Vec, +} + +impl ToSql for Wkb { + fn to_sql( + &self, + _: &Type, + out: &mut BytesMut, + ) -> std::result::Result> { + out.extend_from_slice(&self.geometry); + Ok(IsNull::No) + } + + fn accepts(ty: &Type) -> bool { + ty.name() == "geometry" + } + + to_sql_checked!(); +} + +pub fn infer_geom_type(stmt: Statement) -> Result { + let column = stmt.columns().get(0).expect("Failed to get columns"); + Ok(column.type_().clone()) +} + +pub fn insert_rows<'a>( + rows: &Rows, + config: &Vec, + geom_type: Type, + uri: &str, + schema: &Option, + table: &str, +) -> Result<()> { + // Create connection + let mut client = create_connection(uri)?; + + // Prepare types for binary copy + let mut types: Vec = Vec::new(); + for column in config.iter() { + types.push(column.data_type.clone()); + } + types.push(geom_type); + + // Binary copy in writer + let mut query = String::from("COPY "); + if let Some(schema) = schema { + query.push_str(&format!("{}.{}", schema, table)); + } else { + query.push_str(&table); + } + query.push_str(" ("); + for column in config.iter() { + query.push_str(&format!("{},", column.column_name)); + } + query.push_str("geom) FROM stdin BINARY"); + let writer: CopyInWriter = client.copy_in(&query)?; + + let mut writer = BinaryCopyInWriter::new(writer, &types); + + println!("{:?}", types); + + for row in rows.rows.iter() { + // Transform row into vector of ToSql + + // writer + // .write(&[&row]) + // .expect("Failed to insert row into database"); + } + + Ok(()) +} diff --git a/src/pg/crud.rs b/src/pg/crud.rs new file mode 100644 index 0000000..e7fe457 --- /dev/null +++ b/src/pg/crud.rs @@ -0,0 +1,58 @@ +use crate::Result; +use postgres::types::Type; +use postgres::Statement; + +use postgres::{Client, NoTls}; + +use crate::utils::NewTableTypes; + +pub fn create_connection(uri: &str) -> Result { + let client = Client::connect(uri, NoTls)?; + Ok(client) +} + +pub fn create_table( + table_name: &str, + schema_name: &Option, + config: &Vec, + uri: &str, +) -> Result { + let mut query = String::from("CREATE TABLE IF NOT EXISTS "); + if let Some(schema) = schema_name { + query.push_str(&format!("{}.{} ", schema, table_name)); + } else { + query.push_str(&table_name); + } + query.push_str("("); + for column in config.iter() { + match column.data_type { + Type::INT8 => { + query.push_str(&format!("{} INT,", column.column_name)); + } + Type::FLOAT8 => { + query.push_str(&format!("{} DOUBLE,", column.column_name)); + } + Type::TEXT => { + query.push_str(&format!("{} TEXT,", column.column_name)); + } + Type::BOOL => { + query.push_str(&format!("{} BOOL,", column.column_name)); + } + _ => println!("Type currently not supported"), + } + } + query.push_str("geom Geometry(Geometry, 4326)"); + query.push_str(");"); + println!("{}", query); + + let mut client = create_connection(uri)?; + client.execute(&query, &[])?; + + let stmt = if let Some(schema) = schema_name { + client.prepare(&format!("SELECT geom FROM {}.{}", schema, table_name))? + } else { + client.prepare(&format!("SELECT geom FROM {}", table_name))? + }; + + Ok(stmt) +} diff --git a/src/pg/mod.rs b/src/pg/mod.rs index eef24f8..fc340e4 100644 --- a/src/pg/mod.rs +++ b/src/pg/mod.rs @@ -1,64 +1,2 @@ -use crate::Result; -use postgres::Statement; -use postgres::types::Type; - -use postgres::{Client, NoTls, CopyInWriter}; - -use crate::utils::NewTableTypes; - -pub fn create_connection(uri: &str) -> Result { - let client = Client::connect(uri, NoTls)?; - Ok(client) -} - -// pub fn create_table(client: &mut Client) -> Result { -// client.execute("CREATE TABLE IF NOT EXISTS pio ( -// id INT, -// properties JSONB, -// geometry geometry);", &[])?; -// -// let stmt = client.prepare("SELECT geometry FROM pio")?; -// Ok(stmt) -// } - -pub fn infer_geom_type(stmt: Statement) -> Result { - let column = stmt.columns().get(0).expect("Failed to get columns"); - Ok(column.type_().clone()) -} - -pub fn create_binary_writer<'a>(client: &'a mut Client) -> Result> { - let writer:CopyInWriter = client.copy_in("COPY pio (id, properties, geometry) FROM stdin BINARY")?; - Ok(writer) -} - -pub fn create_table(table_name: &str, schema_name: Option, config: &Vec, client: &mut Client) -> Result<()> { - let mut query = String::from("CREATE TABLE IF NOT EXISTS "); - if let Some(schema) = schema_name { - query.push_str(&format!("{}.{} ", schema, table_name)); - } else { - query.push_str(&table_name); - } - query.push_str("("); - for column in config.iter() { - match column.data_type { - Type::INT8 => { - query.push_str(&format!(" {} INT,", column.column_name)); - }, - Type::FLOAT8 => { - query.push_str(&format!(" {} DOUBLE,", column.column_name)); - }, - Type::TEXT => { - query.push_str(&format!(" {} TEXT,", column.column_name)); - }, - Type::BOOL => { - query.push_str(&format!(" {} BOOL,", column.column_name)); - }, - _ => println!("Type currently not supported"), - } - } - query.push_str("geom GEOMETRY"); - query.push_str(");"); - println!("{}", query); - client.execute(&query, &[])?; - Ok(()) -} +pub mod binary_copy; +pub mod crud; diff --git a/src/utils/cli.rs b/src/utils/cli.rs index 20464dd..c96a1ee 100644 --- a/src/utils/cli.rs +++ b/src/utils/cli.rs @@ -2,7 +2,8 @@ use crate::Result; use crate::utils::{determine_file_type, FileType}; use crate::utils::validate::validate_args; use crate::utils::shp::{read_shapefile, determine_data_types}; -use crate::pg::{create_connection, create_table}; +use crate::pg::crud::create_table; +use crate::pg::binary_copy::{infer_geom_type, insert_rows}; use clap::Parser; @@ -27,7 +28,7 @@ pub fn run() -> Result<()> { validate_args(&args)?; let file_type = determine_file_type(&args.input)?; - let data = match file_type { + let rows = match file_type { FileType::Shapefile => { read_shapefile(&args.input)? } @@ -36,9 +37,9 @@ pub fn run() -> Result<()> { } }; let config = determine_data_types(&args.input)?; - let mut client = create_connection(&args.uri)?; - create_table(&args.table, args.schema, &config, &mut client)?; - // Insert data into table using Rows + let stmt = create_table(&args.table, &args.schema, &config, &args.uri)?; + let geom_type = infer_geom_type(stmt)?; + insert_rows(&rows, &config, geom_type, &args.uri, &args.schema, &args.table)?; Ok(()) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 945b1d5..caf9b8f 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -2,8 +2,8 @@ use crate::Result; use std::path::Path; use postgres::types::Type; use geo::Coord; -use geo::geometry::Geometry; use shapefile::Shape; +use crate::pg::binary_copy::Wkb; pub mod cli; pub mod shp; @@ -15,10 +15,12 @@ pub struct NewTableTypes { pub data_type: Type, } +#[derive(Debug)] pub struct Row { pub columns: Vec, } +#[derive(Debug)] pub struct Rows { pub rows: Vec, } @@ -42,13 +44,14 @@ impl Rows { } // Enum to hold accepted data types -enum AcceptedTypes { +#[derive(Debug)] +pub enum AcceptedTypes { Int(i64), - Float(f64), + // Float(f64), Double(f64), Text(String), Bool(bool), - Geometry(Geometry), + Geometry(Wkb), } // Create enum of supported file types @@ -71,10 +74,6 @@ fn determine_file_type(input_file: &str) -> Result { } } -fn determine_column_types() -> Result<()> { - Ok(()) -} - pub fn to_geo(shape: &Shape) -> Result> { match shape { Shape::Point(p) => Ok(geo::Point::new(p.x, p.y).into()), diff --git a/src/utils/shp.rs b/src/utils/shp.rs index f7236d3..d9ddfd4 100644 --- a/src/utils/shp.rs +++ b/src/utils/shp.rs @@ -2,8 +2,10 @@ use crate::Result; use postgres::types::Type; use shapefile::dbase::FieldValue; +use crate::pg::binary_copy::Wkb; use crate::utils::to_geo; use crate::utils::{AcceptedTypes, NewTableTypes, Row, Rows}; +use wkb::geom_to_wkb; pub fn determine_data_types(file_path: &str) -> Result> { let mut table_config: Vec = Vec::new(); @@ -57,7 +59,6 @@ pub fn determine_data_types(file_path: &str) -> Result> { } pub fn read_shapefile(file_path: &str) -> Result { - // Create a new vector that can hold any data type from below let mut rows = Rows::new(); let mut reader = shapefile::Reader::from_path(file_path)?; for shape_record in reader.iter_shapes_and_records() { @@ -95,9 +96,9 @@ pub fn read_shapefile(file_path: &str) -> Result { } } - // Transform shape to geometry then push - let geometry = to_geo(&shape)?; - row.add(AcceptedTypes::Geometry(geometry)); + let geom = to_geo(&shape)?; + let wkb = geom_to_wkb(&geom).expect("Failed to insert node into database"); + row.add(AcceptedTypes::Geometry(Wkb { geometry: wkb })); rows.add(row); }