forked from arithmetric/lambda-stash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
86 lines (77 loc) · 2.51 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
var _ = require('lodash');
exports.handler = function(config, event, context, callback) {
var taskNames = [];
var eventType = '';
if (event.hasOwnProperty('Records') && event.Records.length &&
event.Records[0].eventSource === 'aws:s3') {
config.S3 = {
srcBucket: event.Records[0].s3.bucket.name,
srcKey: event.Records[0].s3.object.key
};
eventType = 'S3';
taskNames.push('getS3Object');
console.log('Handling event for s3://' + config.S3.srcBucket + '/' +
config.S3.srcKey);
} else if (event.hasOwnProperty('awslogs') &&
event.awslogs.hasOwnProperty('data')) {
config.data = event.awslogs.data;
eventType = 'CloudWatch';
taskNames.push('decodeBase64');
taskNames.push('decompressGzip');
taskNames.push('parseJson');
console.log('Handling event for CloudWatch logs');
}
var currentMapping;
if (config.mappings) {
_.some(config.mappings, function(item) {
if (item.type === eventType ||
(config.S3 && item.bucket === config.S3.srcBucket)) {
currentMapping = item;
console.log('Selected mapping for S3 event:', item);
if (item.hasOwnProperty('processors')) {
taskNames = taskNames.concat(item.processors);
}
config = _.merge({}, config, item);
return true;
}
});
delete config.mappings;
}
if (!currentMapping) {
console.log('Event did not match any mappings.');
return callback(null, 'Event did not match any mappings.');
}
console.log('Running ' + taskNames.length + ' handlers with config:', config);
var tasks = [];
var processor;
_.some(taskNames, function(taskName) {
if (_.isFunction(taskName)) {
tasks.push(taskName);
return false;
}
try {
processor = require('./handlers/' + taskName);
} catch (err) {
context.fail(err);
return true;
}
if (processor.hasOwnProperty('process')) {
tasks.push(processor.process);
}
});
console.log('Starting to run processor tasks...');
Promise.series(tasks, config)
.then(function(/* config */) {
console.log('Successfully shipped data!');
callback(null, 'Successfully shipped data!');
})
.catch(function(err) {
console.log('Error occurred while preparing to ship data:', err);
context.fail('Error occurred while preparing to ship data');
});
};
Promise.series = function(promises, initValue) {
return promises.reduce(function(chain, promise) {
return chain.then(promise);
}, Promise.resolve(initValue));
};