-
Notifications
You must be signed in to change notification settings - Fork 2
/
jobfile-hydro-predictions.js
134 lines (130 loc) · 4.05 KB
/
jobfile-hydro-predictions.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import _ from 'lodash'
import fs from 'fs'
import path from 'path'
import { fileURLToPath } from 'url'
import { hooks } from '@kalisio/krawler'
const outputDir = './output'
// Configuration
const dbUrl = process.env.DB_URL || 'mongodb://127.0.0.1:27017/kano'
const baseUrl = process.env.PREDIKT_URL || 'http://localhost:5000/predict'
const modelsPath = process.env.PREDIKT_MODELS_PATH || path.join('..', 'predikt', 'models', 'output', 'water_level_rnn', 'multiple', '24H')
const ttl = parseInt(process.env.TTL, 10) || (7 * 24 * 60 * 60) // duration in seconds
const timeout = parseInt(process.env.TIMEOUT, 10) || (30 * 60 * 1000) // duration in miliseconds
// Now could be HP_RNN or HP_XGB depending on underlying prediction model
const variable = process.env.VARIABLE || 'HP'
const collection = 'hubeau-hydro-observations'
// Read available models
const models = fs.readdirSync(modelsPath)
.filter(model => fs.lstatSync(path.join(modelsPath, model)).isDirectory())
// Create a custom hook to generate tasks
let generateTasks = (options) => {
return (hook) => {
let tasks = []
models.forEach(model => {
const code_station = `#${model}`
tasks.push({
id: code_station,
options: {
url: `${options.baseUrl}/${model}`
}
})
})
hook.data.tasks = tasks
return hook
}
}
hooks.registerHook('generateTasks', generateTasks)
export default {
id: 'hubeau-hydro-predictions',
store: 'fs',
options: {
workersLimit: 2,
faultTolerant: true,
timeout: timeout
},
taskTemplate: {
id: 'predictions/<%= taskId %>',
type: 'http'
},
hooks: {
tasks: {
before: {
},
after: {
readJson: {},
writeJsonMemory: {
hook: 'writeJson',
key: '<%= id %>',
store: 'memory'
},
apply: {
function: (item) => {
const predictionFeature = item.data
const times = _.get(predictionFeature, 'forecastTime.H', [])
const values = _.get(predictionFeature, 'properties.H', [])
let features = []
_.forEach(times, (time, index) => {
// Use prediction feature as a template
let feature = _.pick(predictionFeature, ['type', 'geometry', 'runTime', 'properties.code_station'])
_.set(feature, 'time', time)
_.set(feature, `properties.${variable}`, values[index])
features.push(feature)
})
if (features.length > 0) console.log('Found ' + features.length + ' new predictions on station ' + _.get(predictionFeature, 'properties.code_station'))
item.data = features
}
},
writeMongoCollection: {
chunkSize: 256,
collection,
transform: { unitMapping: { time: { asDate: 'utc' }, runTime: { asDate: 'utc' } } }
},
clearData: {}
}
},
jobs: {
before: {
createStores: [{
id: 'memory'
}, {
id: 'fs',
options: {
path: outputDir
}
}],
connectMongo: {
url: dbUrl,
// Required so that client is forwarded from job to tasks
clientPath: 'taskTemplate.client'
},
createMongoCollection: {
clientPath: 'taskTemplate.client',
collection,
indices: [
[{ time: 1 }, { expireAfterSeconds: ttl }], // days in s
{ 'properties.code_station': 1 },
[{ 'properties.code_station': 1, time: -1 }, { background: true }],
[{ 'properties.code_station': 1, 'properties.H': 1, time: -1 }, { background: true }],
{ 'properties.H': 1 },
{ geometry: '2dsphere' }
],
},
generateTasks: {
baseUrl,
}
},
after: {
disconnectMongo: {
clientPath: 'taskTemplate.client'
},
removeStores: ['memory', 'fs']
},
error: {
disconnectMongo: {
clientPath: 'taskTemplate.client'
},
removeStores: ['memory', 'fs']
}
}
}
}