Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BAG extraction app for data-modules #2

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0c71cae
First commit
Jan 25, 2016
84f67d3
Merge branch 'master' of https://github.com/reinvantveer/histograph-d…
Jan 25, 2016
c85fda1
Test suite with training wheels. Does only buildings so far
Feb 4, 2016
f07d3ec
Hehe, uncomment previously run tests
Feb 4, 2016
ea19721
Working directory extraction test!
Feb 8, 2016
559b57d
Code cleanup, better unzip test
Feb 8, 2016
de558a7
Re-enable full directory test, refactor download step, add convert step
Feb 8, 2016
078cf78
Working multithreaded xml conversion :rocket:
Feb 22, 2016
32da887
Prune duplicate code, move tests onto buildingsextractor module. Add …
Feb 22, 2016
7a21cd0
Use request-progress to inform user of progress. Still a stub.
Feb 22, 2016
7aec472
Create mock out test requests, create code and test for getting down…
Mar 4, 2016
90c3ac4
Working unzip phase, closes #2
Mar 5, 2016
b993434
Work on address extraction
Mar 5, 2016
b157880
Finished work on address extraction
Mar 6, 2016
5c44378
Refactor - split off geometry helper functions to separate module
Mar 6, 2016
26fb99d
Create public spaces extractor, closes #5
Mar 6, 2016
d2ce01e
Create worker code for public spaces extraction
Mar 6, 2016
d794e90
Refactor main bag.js to use a jobs data factory mapFilesToJobs, move …
Mar 13, 2016
3b45f4f
Re-include failing test
Mar 13, 2016
3aeb384
Refactor to separate helper files and test files
Mar 14, 2016
d3bdc58
Fully running tests suite
Apr 1, 2016
a36fa84
Light refactoring
Apr 1, 2016
bde3717
Extract places, some refactoring
Apr 1, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/test/BAG_Amstelveen_2011feb01.zip
/test/extract/data-bag
/coverage
/test/buildings.ndjson
/test/unzip
/error.log
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# data-bag
Extracts Histograph data from the Base Administration for Buildings and Addresses (BAG)

# Todo:
- Extract placenames from set
11 changes: 11 additions & 0 deletions bag.dataset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"id": "bag",
"title": "Basisregistratie Adressen en Gebouwen",
"license": "http://creativecommons.org/publicdomain/mark/1.0/deed.nl",
"description": "Service Adressen, gevuld met relevante objecten uit de Basisregistratie Adressen en Gebouwen (BAG), beheerd door het Kadaster.",
"author": "Kadaster",
"website": "http://bag.kadaster.nl",
"edits": "",
"editor": "Rein van t Veer",
"creationDate": ""
}
279 changes: 279 additions & 0 deletions bag.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
'use strict';

const fs = require('fs');
const path = require('path');
const Promise = require('bluebird');
const request = require('request');
const progress = require('request-progress');
const yauzl = require('yauzl');
const mkdirp = require('mkdirp');
const highland = require('highland');
const sax = require('sax');
const saxpath = require('saxpath');
const xml2js = require('xml2js');

const workerFarm = require('worker-farm');
const NUM_CPUS = require('os').cpus().length;
const FARM_OPTIONS = {
maxConcurrentWorkers: require('os').cpus().length,
maxCallsPerWorker: Infinity,
maxConcurrentCallsPerWorker: 1
};

const buildingsworkers = workerFarm(
FARM_OPTIONS,
require.resolve('./helpers/buildingsextractor.js'),
['extractFromFile']
);

const addressworkers = workerFarm(
FARM_OPTIONS,
require.resolve('./helpers/addressesextractor.js'),
['extractFromFile']
);

const publicSpacesWorkers = workerFarm(
FARM_OPTIONS,
require.resolve('./helpers/publicspacesextractor.js'),
['extractFromFile']
);

const placesWorkers = workerFarm(
FARM_OPTIONS,
require.resolve('./helpers/placesextractor.js'),
['extractFromFile']
);

function extractDownloadSize(atomURL) {
return new Promise((resolve, reject) => {
request(atomURL,
(err, response, body) => {
if (err) return reject(err);
if (!response) return reject(new Error(`No response returned from request to ${atomURL}`));
if (response.statusCode !== 200) {
return reject(new Error(`Unexpected request to ${atomURL} response status ${response.statusCode}`));
}
if (!body) return reject(new Error(`The request to ${atomURL} did not return a response body`));

const parser = new xml2js.Parser();
return parser.parseString(body, (error, result) => {
if (error) return reject(new Error(`Error parsing body ${body} \n ${error.stack}`));
console.log(`Length: ${JSON.stringify(result.feed.entry[0].link[0].$.length, null, 2)}`);
return resolve(parseInt(result.feed.entry[0].link[0].$.length, 10));
});
}
);
});
}


function download(config, dir, writer, callback) {
console.log(`Downloading ${config.baseDownloadUrl}...`);
return extractDownloadSize(config.feedURL)
.then(size => downloadDataFile(config.baseDownloadUrl, config.datafilename, dir, size))
.then((fullPath) => {
console.log(`${new Date()} download of ${fullPath} complete!`);
return callback;
})
.catch(error => {
console.error(`${new Date()} Download failed due to ${error}`);
return callback(error);
});
}
function downloadDataFile(baseURL, filename, dir, size) {
return new Promise((resolve, reject) => {
const fullZipFilePath = path.join(dir, filename);
console.log(`Getting ${baseURL + filename}:`);
console.log(`Total size: ${size}`);

progress(request
.get(baseURL + filename), {
throttle: 2000,
delay: 1000
})
.on('progress', state => {
console.log(`Download progress: ${((state.size.transferred / size) * 100).toFixed(0)}%`);
})
.on('error', err => reject(err))
.on('end', () => {
console.log('Download progress: 100%');
resolve(fullZipFilePath);
});
});
}

function extractZipfile(zipfilename, extractdir) {
return new Promise((resolve, reject) => {
console.log('extractdir: ', extractdir, '\n');
mkdirp(extractdir);

console.log('zipfilename: ', zipfilename, '\n');
yauzl.open(zipfilename, { lazyEntries: true }, (err, zipfile) => {
if (err) reject(err);

zipfile.readEntry();

zipfile.on('entry', entry => {
if (/\/$/.test(entry.fileName)) {
// directory file names end with '/'
mkdirp(entry.fileName,
error => {
if (error) throw error;
return zipfile.readEntry();
});
}

// file entry
zipfile.openReadStream(entry, (err, readStream) => {
if (err) {
console.log(`Error reading ${entry.fileName}`);
reject(err);
}

// ensure parent directory exists
mkdirp(path.dirname(entry.fileName), err => {
if (err) reject(err);
readStream.pipe(fs.createWriteStream(path.join(extractdir, entry.fileName)));
readStream.on('end', () => {
if (entry.fileName.slice(-4) === '.zip') {
extractZipfile(path.join(extractdir, entry.fileName), extractdir)
.then(() => {
console.log(`Extracted subzip ${entry.fileName}`);
zipfile.readEntry();
});
} else {
zipfile.readEntry();
}
});

readStream.on('error', err => reject(err));
});
});
});

zipfile.on('end', () => resolve());
});
});
}

function unzip(config, dir, writer, callback) {
console.log('WARNING, make sure you have at least 45 Gb of free disk space for extraction, or press Ctrl-c to abort.');
console.log('The unzip phase itself can take up to an hour and will extract about 4.000 XML files.');
console.log('Since the zipfile consists of sub-zipfiles of unknown size, there cannot be given an estimation of remaining time.');
console.log('The process will appear to be frozen for quite some time, especially on the ***PND***.zip file.');
console.log('However, this will at least spare you the logging of about 4000 file names.');
return extractZipfile(path.join(dir, config.datafilename), dir)
.then(() => {
console.log(`${new Date()} extraction complete!`);
return callback;
})
.catch(error => {
console.error(`${new Date()} Extraction failed due to ${error}`);
return callback(error);
});
}

function mkdir(path) {
return new Promise((resolve, reject) => {
mkdirp(path, err => {
if (err) {
console.log(`Error during directory creation: ${err}`);
reject(err);
}
resolve();
});
});
}


function mapFilesToJobs(dir, extractDir) {
const fileTypes = {
PND: {
converter: buildingsworkers,
outputPITsFile: 'pand.pits.ndjson',
outputRelationsFile: 'pand.relations.ndjson'
},
NUM: {
converter: addressworkers,
outputPITsFile: 'adres.pits.ndjson',
outputRelationsFile: 'adres.relations.ndjson'
},
OPR: {
converter: publicSpacesWorkers,
outputPITsFile: 'openbareruimte.pits.ndjson',
outputRelationsFile: 'openbareruimte.relations.ndjson'
},
WPL: {
converter: placesWorkers,
outputPITsFile: 'woonplaats.pits.ndjson',
outputRelationsFile: 'woonplaats.relations.ndjson'
}
};

return fs.readdirSync(dir)
.filter(file => file.slice(-4) === '.xml')
.map(file => {
const type = file.slice(4, 7);
const job = {};
if (!fileTypes[type]) return null;
job.converter = fileTypes[type].converter;
job.inputFile = path.resolve(path.join(dir, file));
job.outputPITsFile = path.resolve(path.join(extractDir, fileTypes[type].outputPITsFile));
job.outputRelationsFile = path.resolve(path.join(extractDir, fileTypes[type].outputRelationsFile));
return job;
})
.filter(job => (job));
}

function convert(config, dir, writer, callback) {
const extractDir = path.join(config.data.generatedDataDir, 'data-bag');
console.log('WARNING, make sure you have at least 45 Gb of free disk space for conversion, or press Ctrl-c to abort.');
const jobs = mapFilesToJobs(dir, extractDir);

mkdir(extractDir)
.then(() => {
const jobStream = highland(jobs);

jobStream
.map(job => {
console.log(`Processing ${job.inputFile} to output to ${job.outputPITsFile} and ${job.outputRelationsFile}`);
return highland(wrapJob(job.converter.extractFromFile, job.inputFile, job.outputPITsFile, job.outputRelationsFile));
})
.parallel(NUM_CPUS - 1)
.errors(err => {
fs.appendFileSync(path.join(__dirname, 'error.log'), JSON.stringify(err));
return console.log('Stream threw error. Wrote error to error.log.');
})
.toArray(result => {
console.log('Done processing all files!');
return callback(null, result)
});

})
.catch(err => callback(err, null));
}

function wrapJob(jobFunction, sourceFile, pitsFile, relationsFile) {
return new Promise((resolve, reject) => {
jobFunction(sourceFile, pitsFile, relationsFile, (err, result) => {
if (err) return reject(err);
return resolve(result);
});
});
}

module.exports = {
download,
extractDownloadSize,
downloadDataFile,
unzip,
extractZipfile,
convert,
mapFilesToJobs,
mkdir,
steps: [
download,
unzip,
convert
]
};
73 changes: 73 additions & 0 deletions helpers/addressesextractor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
'use strict';
const xml2js = require('xml2js');
const fs = require('fs');
const sax = require('sax');
const saxpath = require('saxpath');
const highland = require('highland');
const writer = require('./bagwriter.js');

module.exports = {
title: 'BAG',
url: 'http://bag.kadaster.nl',
extractFromFile: extractFromFile
};

function extractFromFile(inputFileName, outputPITsFile, outputRelationsFile, callback) {
console.log(`Processing ${inputFileName}`);
const nodes = [];
const edges = [];
const parser = new xml2js.Parser();
const strict = true;

const saxStream = sax.createStream(strict);
fs.createReadStream(inputFileName, { encoding: 'utf8' })
.pipe(saxStream);

const streamer = new saxpath.SaXPath(saxStream, '//bag_LVC:Nummeraanduiding');

streamer.on('match', xml => {
parser.parseString(xml, (err, result) => {
if (err) {
console.error(`Error parsing xml element ${xml} \n ${err.stack}`);
return callback(err);
}

nodes.push({
uri: module.exports.url + '/nummeraanduiding/' + result['bag_LVC:Nummeraanduiding']['bag_LVC:identificatie'][0],
id: result['bag_LVC:Nummeraanduiding']['bag_LVC:identificatie'][0],
huisnummer: result['bag_LVC:Nummeraanduiding']['bag_LVC:huisnummer'] ?
result['bag_LVC:Nummeraanduiding']['bag_LVC:huisnummer'][0] : null,
huisletter: result['bag_LVC:Nummeraanduiding']['bag_LVC:huisletter'] ?
result['bag_LVC:Nummeraanduiding']['bag_LVC:huisletter'] : null,
postcode: result['bag_LVC:Nummeraanduiding']['bag_LVC:postcode'] ?
result['bag_LVC:Nummeraanduiding']['bag_LVC:postcode'][0] : null,
startDate: result['bag_LVC:Nummeraanduiding']['bag_LVC:tijdvakgeldigheid'][0]['bagtype:begindatumTijdvakGeldigheid'] ?
result['bag_LVC:Nummeraanduiding']['bag_LVC:tijdvakgeldigheid'][0]['bagtype:begindatumTijdvakGeldigheid'][0] : null,
endDate: result['bag_LVC:Nummeraanduiding']['bag_LVC:tijdvakgeldigheid'][0]['bagtype:einddatumTijdvakGeldigheid'] ?
result['bag_LVC:Nummeraanduiding']['bag_LVC:tijdvakgeldigheid'][0]['bagtype:einddatumTijdvakGeldigheid'][0] : null
});

if (result['bag_LVC:Nummeraanduiding']['bag_LVC:gerelateerdeOpenbareRuimte']) {
edges.push({
from: module.exports.url + '/nummeraanduiding/' + result['bag_LVC:Nummeraanduiding']['bag_LVC:identificatie'][0],
to: module.exports.url + '/openbareruimte/' + result['bag_LVC:Nummeraanduiding']['bag_LVC:gerelateerdeOpenbareRuimte'][0]['bag_LVC:identificatie'],
type: 'hg:related'
});
}
});
});

saxStream.on('error', err => {
console.error(`saxStream threw error ${err.stack}`);

// clear the error
this._parser.error = null;
this._parser.resume();
});

saxStream.on('end', () => writer.write(nodes, edges, outputPITsFile, outputRelationsFile)
.then(result => callback(null, result))
.catch(err => callback(err))
);

}
Loading