-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
4 changed files
with
287 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
'use strict'; // eslint-disable-line | ||
|
||
const assert = require('assert'); | ||
const sinon = require('sinon'); | ||
const { EventEmitter } = require('events'); | ||
const MongoQueueProcessor = require('../../../extensions/mongoProcessor/MongoQueueProcessor'); | ||
const BackbeatConsumer = require('../../../lib/BackbeatConsumer'); | ||
|
||
describe('MongoQueueProcessor', () => { | ||
let mqp; | ||
|
||
beforeEach(() => { | ||
mqp = new MongoQueueProcessor( | ||
{ hosts: 'localhost:9092', site: 'test-site' }, | ||
{ topic: 'test-topic', groupId: 'test-group' }, | ||
{ logName: 's3-recordlog', replicaSetHosts: 'localhost:27018' }, | ||
{} | ||
); | ||
// eslint-disable-next-line no-console | ||
console.log('mqp', mqp.logger); | ||
// eslint-disable-next-line no-console | ||
console.log('mqp', mqp); | ||
}); | ||
|
||
afterEach(() => { | ||
sinon.restore(); | ||
}); | ||
|
||
describe('::start', () => { | ||
it('should log an error and exit if MongoDB connection fails', done => { | ||
const loggerErrorStub = sinon.stub(mqp.logger, 'error'); | ||
const loggerFatalStub = sinon.stub(mqp.logger, 'fatal'); | ||
const processExitStub = sinon.stub(process, 'exit'); | ||
|
||
sinon.stub(mqp._mongoClient, 'setup').callsFake(callback => { | ||
callback(new Error('Simulated connection failure')); | ||
}); | ||
|
||
mqp.start(); | ||
|
||
setTimeout(() => { | ||
assert(loggerErrorStub.calledOnce); | ||
assert(loggerErrorStub.calledWith('could not connect to MongoDB')); | ||
assert(loggerFatalStub.calledOnce); | ||
assert(loggerFatalStub.calledWith('error starting mongo queue processor')); | ||
assert(processExitStub.calledOnce); | ||
assert(processExitStub.calledWith(1)); | ||
done(); | ||
}, 100); | ||
}); | ||
|
||
it.skip('should initialize and start the Kafka consumer', done => { | ||
const consumerStub = sinon.stub(BackbeatConsumer.prototype, 'on'); | ||
const subscribeStub = sinon.stub(BackbeatConsumer.prototype, 'subscribe'); | ||
|
||
sinon.stub(mqp._mongoClient, 'setup').callsFake(callback => { | ||
callback(null); | ||
}); | ||
|
||
mqp.start(); | ||
|
||
setTimeout(() => { | ||
assert(consumerStub.calledTwice); | ||
assert(subscribeStub.calledOnce); | ||
done(); | ||
}, 100); | ||
}); | ||
}); | ||
|
||
describe('::stop', () => { | ||
it('should close the Kafka consumer if it exists', done => { | ||
mqp._consumer = new EventEmitter(); | ||
mqp._consumer.close = sinon.stub().callsFake(callback => { | ||
callback(); | ||
}); | ||
|
||
const loggerDebugStub = sinon.stub(mqp.logger, 'debug'); | ||
|
||
mqp.stop(() => { | ||
assert(loggerDebugStub.calledOnce); | ||
assert(loggerDebugStub.calledWith('closing kafka consumer')); | ||
assert(mqp._consumer.close.calledOnce); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should log a message if there is no Kafka consumer to close', done => { | ||
const loggerDebugStub = sinon.stub(mqp.logger, 'debug'); | ||
|
||
mqp.stop(() => { | ||
assert(loggerDebugStub.calledOnce); | ||
assert(loggerDebugStub.calledWith('no kafka consumer to close')); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
const assert = require('assert'); | ||
const sinon = require('sinon'); | ||
const { Logger } = require('werelogs'); | ||
const ZookeeperMock = require('zookeeper-mock'); | ||
const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader'); | ||
|
||
class MockLogConsumer { | ||
constructor(params) { | ||
this.params = params || {}; | ||
} | ||
|
||
setup(callback) { | ||
process.nextTick(() => { | ||
if (this.params.setupError) { | ||
callback(this.params.setupError); | ||
} else { | ||
callback(null); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
describe('KafkaLogReader', () => { | ||
let kafkaLogReader; | ||
let zkMock; | ||
let logger; | ||
|
||
beforeEach(() => { | ||
zkMock = new ZookeeperMock(); | ||
logger = new Logger('test:KafkaLogReader'); | ||
kafkaLogReader = new KafkaLogReader({ | ||
zkClient: zkMock.createClient('localhost:2181'), | ||
kafkaConfig: { hosts: 'localhost:9092' }, | ||
zkConfig: { connectionString: 'localhost:2181' }, | ||
qpKafkaConfig: { logName: 'test-log' }, | ||
logger, | ||
extensions: [], | ||
metricsHandler: {}, | ||
}); | ||
kafkaLogReader.logConsumer = new MockLogConsumer(); | ||
}); | ||
|
||
afterEach(() => { | ||
sinon.restore(); | ||
}); | ||
|
||
describe('constructor', () => { | ||
it('should initialize KafkaLogReader correctly', () => { | ||
assert(kafkaLogReader); | ||
assert(kafkaLogReader.logConsumer instanceof MockLogConsumer); | ||
assert.strictEqual(kafkaLogReader._kafkaConfig.hosts, 'localhost:9092'); | ||
assert.strictEqual(kafkaLogReader._kafkaConfig.logName, 'test-log'); | ||
}); | ||
}); | ||
|
||
describe('setup', () => { | ||
it('should setup log consumer successfully', done => { | ||
const setupStub = sinon.stub(kafkaLogReader.logConsumer, 'setup').callsFake(callback => callback(null)); | ||
kafkaLogReader.setup(err => { | ||
assert.ifError(err); | ||
assert(setupStub.calledOnce); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should handle log consumer setup error', done => { | ||
const setupStub = sinon.stub(kafkaLogReader.logConsumer, 'setup') | ||
.callsFake(callback => callback(new Error('setup error'))); | ||
kafkaLogReader.setup(err => { | ||
assert.strictEqual(err.message, 'setup error'); | ||
assert(setupStub.calledOnce); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
|
||
describe('getLogInfo', () => { | ||
it('should return log info', () => { | ||
const logInfo = kafkaLogReader.getLogInfo(); | ||
assert.deepStrictEqual(logInfo, { logName: 'test-log' }); | ||
}); | ||
}); | ||
|
||
describe('getMetricLabels', () => { | ||
it('should return metric labels', () => { | ||
const metricLabels = kafkaLogReader.getMetricLabels(); | ||
assert.deepStrictEqual(metricLabels, { | ||
logName: 'kafka-log', | ||
logId: 'test-log', | ||
}); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
const assert = require('assert'); | ||
const sinon = require('sinon'); | ||
const { Logger } = require('werelogs'); | ||
const ZookeeperMock = require('zookeeper-mock'); | ||
const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader'); | ||
|
||
class MockLogConsumer { | ||
constructor(params) { | ||
this.params = params || {}; | ||
} | ||
|
||
connectMongo(callback) { | ||
process.nextTick(() => { | ||
if (this.params.connectMongoError) { | ||
callback(this.params.connectMongoError); | ||
} else { | ||
callback(null); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
describe('MongoLogReader', () => { | ||
let mongoLogReader; | ||
let zkMock; | ||
let logger; | ||
|
||
beforeEach(() => { | ||
zkMock = new ZookeeperMock(); | ||
logger = new Logger('test:MongoLogReader'); | ||
mongoLogReader = new MongoLogReader({ | ||
zkClient: zkMock.createClient('localhost:2181'), | ||
kafkaConfig: { hosts: 'localhost:9092' }, | ||
zkConfig: { connectionString: 'localhost:2181' }, | ||
mongoConfig: { logName: 'test-log' }, | ||
logger, | ||
extensions: [], | ||
metricsHandler: {}, | ||
}); | ||
mongoLogReader.setLogConsumer(new MockLogConsumer()); | ||
}); | ||
|
||
afterEach(() => { | ||
sinon.restore(); | ||
}); | ||
|
||
describe('constructor', () => { | ||
it('should initialize MongoLogReader correctly', () => { | ||
assert(mongoLogReader); | ||
assert(mongoLogReader.logConsumer instanceof MockLogConsumer); | ||
assert.strictEqual(mongoLogReader._mongoConfig.logName, 'test-log'); | ||
}); | ||
}); | ||
|
||
describe('setup', () => { | ||
it('should setup MongoDB connection successfully', done => { | ||
const connectMongoStub = sinon.stub(mongoLogReader.logConsumer, 'connectMongo') | ||
.callsFake(callback => callback(null)); | ||
mongoLogReader.setup(err => { | ||
assert.ifError(err); | ||
assert(connectMongoStub.calledOnce); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should handle MongoDB connection error', done => { | ||
const connectMongoStub = sinon.stub(mongoLogReader.logConsumer, 'connectMongo') | ||
.callsFake(callback => callback(new Error('connection error'))); | ||
mongoLogReader.setup(err => { | ||
assert.strictEqual(err.message, 'connection error'); | ||
assert(connectMongoStub.calledOnce); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
|
||
describe('getLogInfo', () => { | ||
it('should return log info', () => { | ||
const logInfo = mongoLogReader.getLogInfo(); | ||
assert.deepStrictEqual(logInfo, { logName: 'test-log' }); | ||
}); | ||
}); | ||
|
||
describe('getMetricLabels', () => { | ||
it('should return metric labels', () => { | ||
const metricLabels = mongoLogReader.getMetricLabels(); | ||
assert.deepStrictEqual(metricLabels, { | ||
logName: 'mongo-log', | ||
logId: 'test-log', | ||
}); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters