Skip to content

Commit

Permalink
wip: Integrate predictions based on machine learning models (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
claustres committed Oct 29, 2021
1 parent 6397c5f commit b27dbc4
Showing 1 changed file with 126 additions and 0 deletions.
126 changes: 126 additions & 0 deletions jobfile-predictions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
const krawler = require('@kalisio/krawler')
const hooks = krawler.hooks
const moment = require('moment')
const _ = require('lodash')

// Configuration
const dbUrl = process.env.DB_URL || 'mongodb://127.0.0.1:27017/kano'
const baseUrl = process.env.PREDICT_URL || 'http://localhost:5000/predict'
const ttl = parseInt(process.env.TTL) || (7 * 24 * 60 * 60) // duration in seconds
const timeout = parseInt(process.env.TIMEOUT) || (30 * 60 * 1000) // duration in miliseconds
const collection = 'hubeau-observations'
const models = ['Y1422030', 'Y1232010']

// Create a custom hook to generate tasks
let generateTasks = (options) => {
return (hook) => {
let tasks = []
models.forEach(model => {
const code_station = `#${model}01`
tasks.push({
id: code_station,
options: {
url: `${options.baseUrl}/${model}`
}
})
})
hook.data.tasks = tasks
return hook
}
}
hooks.registerHook('generateTasks', generateTasks)

module.exports = {
id: 'hubeau-predictions',
store: 'fs',
options: {
workersLimit: 2,
faultTolerant: true,
timeout: timeout
},
taskTemplate: {
id: 'hubeau/predictions/<%= taskId %>',
type: 'http'
},
hooks: {
tasks: {
before: {
},
after: {
readJson: {},
writeJsonMemory: {
hook: 'writeJson',
key: '<%= id %>',
store: 'memory'
},
apply: {
function: (item) => {
const predictionFeature = item.data
const times = _.get(predictionFeature, 'forecastTime.H', [])
const values = _.get(predictionFeature, 'properties.H', [])
let features = []
_.forEach(times, (time, index) => {
// Use prediction feature as a template
let feature = _.pick(predictionFeature, ['type', 'geometry', 'runTime', 'properties.code_station'])
_.set(feature, 'time', time)
_.set(feature, 'properties.HP', values[index])
features.push(feature)
})
if (features.length > 0) console.log('Found ' + features.length + ' new predictions on station ' + _.get(predictionFeature, 'properties.code_station'))
item.data = features
}
},
writeMongoCollection: {
chunkSize: 256,
collection,
transform: { unitMapping: { time: { asDate: 'utc' }, runTime: { asDate: 'utc' } } }
},
clearData: {}
}
},
jobs: {
before: {
createStores: [{
id: 'memory'
}, {
id: 'fs',
options: {
path: __dirname
}
}],
connectMongo: {
url: dbUrl,
// Required so that client is forwarded from job to tasks
clientPath: 'taskTemplate.client'
},
createMongoCollection: {
clientPath: 'taskTemplate.client',
collection,
indices: [
[{ time: 1 }, { expireAfterSeconds: ttl }], // days in s
{ 'properties.code_station': 1 },
[{ 'properties.code_station': 1, time: -1 }, { background: true }],
[{ 'properties.code_station': 1, 'properties.H': 1, time: -1 }, { background: true }],
{ 'properties.H': 1 },
{ geometry: '2dsphere' }
],
},
generateTasks: {
baseUrl,
}
},
after: {
disconnectMongo: {
clientPath: 'taskTemplate.client'
},
removeStores: ['memory', 'fs']
},
error: {
disconnectMongo: {
clientPath: 'taskTemplate.client'
},
removeStores: ['memory', 'fs']
}
}
}
}

0 comments on commit b27dbc4

Please sign in to comment.