From 5a852092f92a76cabed64711d9b61c8d96cc95a3 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Fri, 16 Dec 2016 17:45:01 +0100 Subject: [PATCH] Now emitting data --- lib/triggers/consume.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/triggers/consume.js b/lib/triggers/consume.js index fc43941..653377d 100644 --- a/lib/triggers/consume.js +++ b/lib/triggers/consume.js @@ -17,9 +17,10 @@ function processAction(msg, cfg) { const amqpURI = cfg.amqpURI; const amqpExchange = cfg.topic; const queueName = `eio_consumer_${process.env.ELASTICIO_TASK_ID}_${process.env.ELASTICIO_USER_ID}`; - const keys = (cfg.bindingKeys || '#').split(',').map((s)=>s.trim()); + const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim()); const consumer = (msg) => { - console.log('consuming message %s in generator', JSON.stringify(msg.content.toString())); + console.log('Have got message %s', JSON.stringify(msg.content.toString())); + this.emit('data', eioUtils.newMessageWithBody({})); }; co(function*() { console.log('Connecting to amqpURI=%s', amqpURI); @@ -38,7 +39,7 @@ function processAction(msg, cfg) { autoDelete: true }); - for(let key of keys) { + for (let key of keys) { console.log(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`); yield channel.bindQueue(queueName, amqpExchange, key); }