diff --git a/updater/payload.json.sample b/updater/payload.json.sample index 8e47792..c98c417 100755 --- a/updater/payload.json.sample +++ b/updater/payload.json.sample @@ -1,9 +1,10 @@ { "esURL": "http://esurl.com", "logLevel": "info", - "groupings": [ - {"pattern": "2016", "mapboxAccount": "astrodigital", "mapboxID": "sentinel_meta_2016"}, - {"pattern": "2015", "mapboxAccount": "astrodigital", "mapboxID": "sentinel_meta_2015"} + "types": [ + {"baseName": "sentinel_meta_test", "startYear": 2015, "type": "sentinel2"}, + {"baseName": "landsat_meta_test", "startYear": 2013, "type": "landsat8"} ], + "mapboxAccount": "account_name", "mapboxToken": "write_token" } diff --git a/updater/updater.js b/updater/updater.js index 6a9620f..df1de84 100755 --- a/updater/updater.js +++ b/updater/updater.js @@ -6,8 +6,8 @@ import elasticsearch from 'elasticsearch'; import iron_worker from 'iron_worker'; // eslint-disable-line camelcase -import { crossTest, warpArray } from './warp.js'; -import { series } from 'async'; +import { crossTest, warpArray } from './warp.js'; // eslint-disable-line +import { parallelLimit } from 'async'; import { writeFileSync } from 'fs'; import upload from 'mapbox-upload'; import moment from 'moment'; @@ -22,8 +22,10 @@ let winston = new (Winston.Logger)({ }); const params = iron_worker.params() || {}; -const groupings = params.groupings || []; +const types = params.types || []; const mapboxToken = params.mapboxToken; +const mapboxAccount = params.mapboxAccount; +const taskLimit = params.taskLimit || 1; const esURL = params.esURL || ''; const queryLimit = params.queryLimit || 1000; @@ -57,12 +59,24 @@ export function zp (n, c) { */ export function doTheThing () { // Grab all daytime items given our regex pattern and return the geojson - let buildGeoJSON = function (pattern, skipToday = false, cb) { + let buildGeoJSON = function (pattern, type, cb) { + // Build query based on type + let query = `date:${pattern}`; + if (type === 'landsat8') { + // Add a filter for day/night + query = `${query} AND dayOrNight:'DAY'`; + } else if (type === 'sentinel2') { + // Nothing extra needed + } else { + winston.error('Unknown data search type'); + process.exit(1); + } + const searchExec = function searchExec (from, callback) { es.search({ index: 'sat-api', - type: 'sentinel2', - q: `date:[${pattern}-01-01 TO ${pattern}-12-31]`, + type: type, + q: query, from: from, size: queryLimit }, callback); @@ -78,19 +92,44 @@ export function doTheThing () { rs.on('data', (d) => { d = d._source; - // Make sure we have all needed fields - if (d.cloud_coverage === undefined || !d.date || !d.tile_geometry || - d.utm_zone === undefined || !d.latitude_band || !d.grid_square || !d.path) { - return; + // Make sure we have all needed fields, dependent on type + if (type === 'landsat8') { + if (d.cloud_coverage === undefined || !d.date || !d.data_geometry || + d.path === undefined || d.row === undefined) { + return; + } + } else if (type === 'sentinel2') { + if (d.cloud_coverage === undefined || !d.date || !d.tile_geometry || + d.utm_zone === undefined || !d.latitude_band || !d.grid_square || !d.path) { + return; + } + } + + // Get properties dependent on type + let geometry; + let scene; + let date; + let year; + if (type === 'landsat8') { + geometry = d.data_geometry; + scene = `${d.sceneID.substring(1, 2)}${d.sceneID.substring(18, 19)}${d.sceneID.substring(20, 21)}${zp(d.path, 3)}${zp(d.row, 3)}`; + date = Number(d.sceneID.substring(13, 16)); + year = d.sceneID.substring(11, 13); + } else if (type === 'sentinel2') { + geometry = d.tile_geometry; + scene = `${zp(d.utm_zone, 2)}${d.latitude_band}${d.grid_square}${d.path.slice(-1)}`; + date = Number(moment(d.date, 'YYYY-MM-DD').format('DDD')); + year = d.date.substring(2, 4); } const feature = { type: 'Feature', - geometry: d.tile_geometry, + geometry: geometry, properties: { c: d.cloud_coverage, - d: Number(moment(d.date, 'YYYY-MM-DD').format('DDD')), - s: `${zp(d.utm_zone, 2)}${d.latitude_band}${d.grid_square}${d.path.slice(-1)}` + d: date, + s: scene, + y: year } }; @@ -99,7 +138,6 @@ export function doTheThing () { count++; if (count % 1000 === 0) { winston.info(`Processed ${count} records.`); - // cb(geojson); } }); rs.on('error', (e) => { @@ -112,8 +150,17 @@ export function doTheThing () { const coordArray = f.geometry.coordinates[0]; const crosses = crossTest(coordArray); if (crosses) { - f.geometry.coordinates[0] = warpArray(coordArray); + // TODO: temporary, delete anything crossing + delete geojson.features[f]; + // f.geometry.coordinates[0] = warpArray(coordArray); } + // TODO: remove after testing + f.geometry.coordinates[0].forEach((c) => { + if (c[0] >= 180 || c[0] <= -180) { + console.log('Out of bounds!'); + console.log(JSON.stringify(f)); + } + }); } winston.verbose('World wrapping complete'); cb(geojson); @@ -121,10 +168,27 @@ export function doTheThing () { }; // Build up task groups + let groupings = []; + types.forEach((t) => { + // Starting from Jan 1 of the start year and go to end of month of current + // month. + let date = moment([t.startYear, 0, 1]); + groupings.push({ + pattern: `[${date.startOf('month').format('YYYY-MM-DD')} TO ${moment().endOf('month').format('YYYY-MM-DD')}]`, + mapboxID: t.baseName, + type: t.type + }); + }); let groups = groupings.map((g) => { return function (done) { - winston.info(`Running for grouping ${g.pattern} and uploading to ${g.mapboxID}`); - buildGeoJSON(g.pattern, g.skipToday, (geojson) => { + winston.info(`Running for grouping ${g.type} matching ${g.pattern} and uploading to ${g.mapboxID}`); + buildGeoJSON(g.pattern, g.type, (geojson) => { + // If geojson is empty, we can exit now + if (geojson.features.length === 0) { + winston.info(`No features found for ${g.type} matching ${g.pattern}`); + return done(null); + } + // Save it to disk so we can upload to Mapbox, we're already blocked // here so just do it sync let filename = `${g.mapboxID}.geojson`; @@ -132,12 +196,12 @@ export function doTheThing () { writeFileSync(filename, JSON.stringify(geojson)); // We have the geojson, upload to Mapbox - winston.info(`Started uploading to ${g.mapboxAccount}.${g.mapboxID}`); + winston.info(`Started uploading to ${mapboxAccount}.${g.mapboxID}`); let progress = upload({ file: join(__dirname, `/${filename}`), - account: g.mapboxAccount, + account: mapboxAccount, accesstoken: mapboxToken, - mapid: `${g.mapboxAccount}.${g.mapboxID}` + mapid: `${mapboxAccount}.${g.mapboxID}` }); progress.once('error', (err) => { @@ -157,7 +221,7 @@ export function doTheThing () { }); // Run in a series - series(groups, (err, results) => { + parallelLimit(groups, taskLimit, (err, results) => { if (err) { winston.error('Exiting with an error'); winston.error(err);