From 6ea1eb92f4db5fcd742d00d9770328931e6086f1 Mon Sep 17 00:00:00 2001 From: Myf Ma Date: Mon, 21 Apr 2014 16:16:30 -0400 Subject: [PATCH] splitting file into lga level csv ref #2 --- .gitignore | 3 +++ csv_split.js | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 11 +++++++++ 3 files changed, 81 insertions(+) create mode 100644 .gitignore create mode 100644 csv_split.js create mode 100644 package.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b31ffc2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +node_modules +raw_data +output diff --git a/csv_split.js b/csv_split.js new file mode 100644 index 0000000..d149996 --- /dev/null +++ b/csv_split.js @@ -0,0 +1,67 @@ +var fs = require("fs"), + through = require("through"), + csv = require("csv-streamify"), + path = require("path"), + out_dir = "output", + write_streams = {}, + header = '', + group_sequence; + + +var abs_resolve = function(relative_location) { + return path.resolve(__dirname, relative_location); +}; + +var write_csv_line = function(arr) { + //adding quotes + var quoted_arr = arr.map(function(item) { + return '\"' + item + '\"'; + }); + return quoted_arr.toString() + '\n'; +}; + + +var split = through(function(data) { + var name = data[group_sequence]; + var write_obj = {"name" : name, + "data" : data + }; + this.queue(write_obj); +}); + +var drain = function(data) { + var name = data.name; + var outfile = path.resolve(out_dir, name + ".csv"); + if (write_streams[name]) { + write_streams[name].write(write_csv_line(data.data)); + } else { + var ws = fs.createWriteStream(outfile); + write_streams[name] = ws; + ws.write(write_csv_line(header)); + ws.write(write_csv_line(data.data)); + + + } +}; + +var parse = function(group_name) { + var parser = csv({objectMode: true}); + parser.on('readable', function() { + if (parser.lineNo === 0) { + header = parser.read(); + group_sequence = header.indexOf(group_name); + } + }); + return parser; +}; + + + +var read_file = "raw_data/education_mopup_2014_04_11_12_56_16.csv"; +var read_stream = fs.createReadStream(abs_resolve(read_file)); +read_stream + .pipe(parse('lga')) + .pipe(split) + .on('data', function(data){ + drain(data); + }); diff --git a/package.json b/package.json new file mode 100644 index 0000000..6f9be9b --- /dev/null +++ b/package.json @@ -0,0 +1,11 @@ +{ + "name" : "mopup_stream", + "main" : "data.js", + "dependencies" : { + "event-stream": "*", + "readable-stream": "*", + "csv-streamify": "*", + "JSONStream": "*", + "through" : "*" + } +}