Skip to content

Commit

Permalink
Optionally remove existing geocode columns
Browse files Browse the repository at this point in the history
If the geocoder would introduce columns that already exist in the file,
we do one of two things:

1. By default, we exit with an error.
2. If the user passes `--replace`, we strip any column with the same
   name as a geocoding output column, and add the output columns
   normally.

This is fairly tricky code, and I wrote it for a deadline while tired.
So please review it carefully and treat it with caution, and don't use
it as an example of "what to do".
  • Loading branch information
emk committed Nov 18, 2019
1 parent e50bb34 commit ad6a506
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 12 deletions.
63 changes: 62 additions & 1 deletion src/addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
use csv::StringRecord;
use failure::{format_err, ResultExt};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::HashMap, fs::File, path::Path};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fs::File,
path::Path,
};

use crate::structure::Structure;
use crate::Result;

/// An address record that we can pass to SmartyStreets.
Expand Down Expand Up @@ -176,6 +182,61 @@ impl<Key: Default + Eq> AddressColumnSpec<Key> {
pub fn get(&self, prefix: &str) -> Option<&AddressColumnKeys<Key>> {
self.address_columns_by_prefix.get(prefix)
}

/// What column indices should we remove from the input records in order
/// to prevent duplicate columns?
///
/// These indices will always be returned in order.
pub fn column_indices_to_remove(
&self,
structure: &Structure,
header: &StringRecord,
) -> Result<Vec<usize>> {
// Get all our column names for all prefixes, and insert them into a
// hash table.
let mut output_column_names = HashSet::new();
for prefix in self.prefixes() {
for name in structure.output_column_names(prefix)? {
if !output_column_names.insert(name.clone()) {
return Err(format_err!("duplicate column name {:?}", name));
}
}
}

// Decide which columns of `header` need to be removed.
let mut indices_to_remove = vec![];
for (i, col) in header.iter().enumerate() {
if output_column_names.contains(col) {
indices_to_remove.push(i);
}
}
Ok(indices_to_remove)
}
}

#[test]
fn find_columns_to_remove() {
use std::iter::FromIterator;

let address_column_spec_json = r#"{
"home": {
"house_number_and_street": ["home_number", "home_street"],
"city": "home_city",
"state": "home_state",
"postcode": "home_zip"
},
"work": {
"address": "work_address"
}
}"#;
let spec: AddressColumnSpec<String> =
serde_json::from_str(address_column_spec_json).unwrap();

let structure = Structure::complete().unwrap();
let header =
StringRecord::from_iter(&["existing", "home_addressee", "work_addressee"]);
let indices = spec.column_indices_to_remove(&structure, &header).unwrap();
assert_eq!(indices, vec![1, 2]);
}

impl AddressColumnSpec<String> {
Expand Down
72 changes: 66 additions & 6 deletions src/geocoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use failure::{format_err, ResultExt};
use futures::{compat::Future01CompatExt, future, FutureExt, TryFutureExt};
use hyper::Client;
use hyper_tls::HttpsConnector;
use log::{debug, error, trace};
use std::{cmp::max, io, sync::Arc, thread::sleep, time::Duration};
use log::{debug, error, trace, warn};
use std::{
cmp::max, io, iter::FromIterator, sync::Arc, thread::sleep, time::Duration,
};
use tokio::{
prelude::*,
sync::mpsc::{self, Receiver, Sender},
Expand Down Expand Up @@ -63,6 +65,7 @@ enum Message {
pub async fn geocode_stdio(
spec: AddressColumnSpec<String>,
match_strategy: MatchStrategy,
replace_existing_columns: bool,
structure: Structure,
) -> Result<()> {
// Set up bounded channels for communication between the sync and async
Expand All @@ -73,7 +76,7 @@ pub async fn geocode_stdio(
// Hook up our inputs and outputs, which are synchronous functions running
// in their own threads.
let read_fut = run_sync_fn_in_background("read CSV".to_owned(), move || {
read_csv_from_stdin(spec, structure, in_tx)
read_csv_from_stdin(spec, structure, replace_existing_columns, in_tx)
});
let write_fut = run_sync_fn_in_background("write CSV".to_owned(), move || {
write_csv_to_stdout(out_rx)
Expand Down Expand Up @@ -149,15 +152,56 @@ pub async fn geocode_stdio(
fn read_csv_from_stdin(
spec: AddressColumnSpec<String>,
structure: Structure,
replace_existing_columns: bool,
mut tx: Sender<Message>,
) -> Result<()> {
// Open up our CSV file and get the headers.
let stdin = io::stdin();
let mut rdr = csv::Reader::from_reader(stdin.lock());
let in_headers = rdr.headers()?.to_owned();
let mut in_headers = rdr.headers()?.to_owned();
debug!("input headers: {:?}", in_headers);

// Look for duplicate input columns, and decide what to do.
let column_indices_to_remove =
spec.column_indices_to_remove(&structure, &in_headers)?;
let remove_column_flags = if column_indices_to_remove.is_empty() {
// No columns to remove!
None
} else {
// We have to remove columns, so make a human-readable list...
let mut removed_names = vec![];
for i in &column_indices_to_remove {
removed_names.push(&in_headers[*i]);
}
let removed_names = removed_names.join(" ");

// And decide whether we're actually allowed to remove them or not.
if replace_existing_columns {
warn!("removing input columns: {}", removed_names);

// Build the vector of bools specifying whether columns should
// stay or go.
let mut flags = vec![false; in_headers.len()];
for i in column_indices_to_remove {
flags[i] = true;
}
Some(flags)
} else {
return Err(format_err!(
"input columns would conflict with geocoding columns: {}",
removed_names,
));
}
};

// Remove any duplicate columns from our input headers.
if let Some(remove_column_flags) = &remove_column_flags {
in_headers = remove_columns(&in_headers, &remove_column_flags);
}

// Convert our column spec from using header names to header indices.
//
// This needs to use "post-removal" indices!
let spec = spec.convert_to_indices_using_headers(&in_headers)?;

// Decide how big to make our chunks. We want to geocode no more
Expand All @@ -183,7 +227,11 @@ fn read_csv_from_stdin(
let mut sent_chunk = false;
let mut rows = Vec::with_capacity(chunk_size);
for row in rdr.records() {
let row = row?;
let mut row = row?;
if let Some(remove_column_flags) = &remove_column_flags {
// Strip out any duplicate columns.
row = remove_columns(&row, remove_column_flags);
}
rows.push(row);
if rows.len() >= chunk_size {
trace!("sending {} input rows", rows.len());
Expand Down Expand Up @@ -346,7 +394,7 @@ async fn geocode_chunk(
};
trace!("geocoded {} addresses", addresses_len);

// Add address information to our , output rows.
// Add address information to our output rows.
for geocoded_for_prefix in geocoded.chunks(chunk.rows.len()) {
assert_eq!(geocoded_for_prefix.len(), chunk.rows.len());
for (response, row) in geocoded_for_prefix.iter().zip(&mut chunk.rows) {
Expand All @@ -362,3 +410,15 @@ async fn geocode_chunk(
}
Ok(chunk)
}

fn remove_columns(row: &StringRecord, remove_column_flags: &[bool]) -> StringRecord {
StringRecord::from_iter(row.iter().zip(remove_column_flags).filter_map(
|(value, &remove)| {
if remove {
None
} else {
Some(value.to_owned())
}
},
))
}
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ struct Opt {
#[structopt(long = "match", default_value = "strict")]
match_strategy: MatchStrategy,

/// Replace any existing columns with the same name as our geocoding
/// columns.
#[structopt(long = "replace")]
replace: bool,

/// A JSON file describing what columns to geocode.
#[structopt(long = "spec")]
spec_path: PathBuf,
Expand All @@ -53,7 +58,7 @@ fn run() -> Result<()> {
let structure = Structure::complete()?;

// Call our geocoder asynchronously.
let geocode_fut = geocode_stdio(spec, opt.match_strategy, structure);
let geocode_fut = geocode_stdio(spec, opt.match_strategy, opt.replace, structure);

// Pass our future to our async runtime.
let mut runtime =
Expand Down
26 changes: 22 additions & 4 deletions src/structure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,35 @@ impl Structure {
Ok(structure)
}

/// Given a column `prefix` and the path to a colum in our
/// [`structure::Structure`], return the column name we should use. This
/// will panic if `path` is empty, because that should be impossible.
fn column_name(&self, prefix: &str, path: &[&str]) -> String {
let last = path
.last()
.expect("should always have at least one path element");
format!("{}_{}", prefix, last)
}

/// Return all the columns that this structure will add to a CSV file.
pub fn output_column_names(&self, prefix: &str) -> Result<Vec<String>> {
let mut columns = vec![];
self.traverse(|path| {
let name = self.column_name(prefix, path);
columns.push(name);
Ok(())
})?;
Ok(columns)
}

/// Add the column names specified in this `Structure` to a CSV header row.
pub fn add_header_columns(
&self,
prefix: &str,
header: &mut StringRecord,
) -> Result<()> {
self.traverse(|path| {
let last = path
.last()
.expect("should always have at least one path element");
header.push_field(&format!("{}_{}", prefix, last));
header.push_field(&self.column_name(prefix, path));
Ok(())
})
}
Expand Down

0 comments on commit ad6a506

Please sign in to comment.