Skip to content

LuoShiXi/egg-nodekafka

Repository files navigation

egg-nodekafka

NPM version build status Test coverage David deps Known Vulnerabilities npm download

近期不断完善, 目前仅支持 kafka: Producer 和 KafkaClient.

Install

$ npm i egg-nodekafka --save

Usage

// {app_root}/config/plugin.js
exports.kafka = {
  enable: true,
  package: 'egg-nodekafka',
};

Configuration

// {app_root}/config/config.default.js
exports.kafka = {
  client: {
    host: '127.0.0.1:9092, 127.0.0.1:9093', // 多个ip可用逗号分隔
  },
  agent: true, // 支持在agent中使用kafka对象,如agent.kafka(名称视具体配置)
  app: true, // 同上
};

see config/config.default.js for more detail.

Example

'use strict';

const Controller = require('egg').Controller;

class HomeController extends Controller {

  async index() {
    const { app, logger, ctx } = this;
    const producer = app.kafka.producer(); // 此句为实例化一个producer对象,若在定时任务中注意实例化多次的异常情况
    let result;
    try {
      result = await producer.sendAsync([{
        topic: 'topic1',
        messages: 'test' + new Date().toISOString(),
        partition: 0,
      }]); 
    } catch (error) {
      logger.error(error);
      ctx.status = 201;
      return;
    }
    logger.info(result, '-------------------------');
    ctx.status = 200;
    ctx.body = { result: 'success', value: JSON.stringify(result) };
  }
}

module.exports = HomeController;

sendAsync(payloads)

  • payloads: Array,array of ProduceRequest, ProduceRequest is a JSON object like:

  • 更多参考:https://github.com/SOHU-Co/kafka-node#sendpayloads-cb

  • 说明: 本方法已将kafka-node官方api中send(payloads, cb),回调函数进行封装,可直接 await producer.sendAsync(payloads) 进行生产,并可得到返回结果。

{
   topic: 'topicName',
   messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
   key: 'theKey', // string or buffer, only needed when using keyed partitioner
   partition: 0, // default 0
   attributes: 2, // default: 0
   timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
}

目前插件支持的对象

  • producer
  • client

以上两个对象可以通过 app.kafka.producer(options: objectjson) 和 app.kafka.client 获取。

app.kafka.client 的options配置选项即为config.{$env}.js中exports.kafka的配置信息,options参数配置选项请查看 https://github.com/SOHU-Co/kafka-node#options ,配置方式如下:

// {app_root}/config/config.{$env}.js
exports.kafka = {
  client: {
    host: '127.0.0.1:9092, 127.0.0.1:9093', // 多个ip可用逗号分隔
    connectTimeout: 10000,
    requestTimeout: 30000,
    sslOptions: {},
    // 更多参考: https://github.com/SOHU-Co/kafka-node#options
  },
  agent: true, // 支持在agent中使用kafka对象,如agent.kafka(名称视具体配置)
  app: true, // 同上
};

Questions & Suggestions

Please open an issue here.

License

MIT