diff --git a/tests/unit/ingestion/mongoQueueProcessor.spec.js b/tests/unit/ingestion/mongoQueueProcessor.spec.js new file mode 100644 index 000000000..4a01cab93 --- /dev/null +++ b/tests/unit/ingestion/mongoQueueProcessor.spec.js @@ -0,0 +1,97 @@ +'use strict'; // eslint-disable-line + +const assert = require('assert'); +const sinon = require('sinon'); +const { EventEmitter } = require('events'); +const MongoQueueProcessor = require('../../../extensions/mongoProcessor/MongoQueueProcessor'); +const BackbeatConsumer = require('../../../lib/BackbeatConsumer'); + +describe('MongoQueueProcessor', () => { + let mqp; + + beforeEach(() => { + mqp = new MongoQueueProcessor( + { hosts: 'localhost:9092', site: 'test-site' }, + { topic: 'test-topic', groupId: 'test-group' }, + { logName: 's3-recordlog', replicaSetHosts: 'localhost:27018' }, + {} + ); + // eslint-disable-next-line no-console + console.log('mqp', mqp.logger); + // eslint-disable-next-line no-console + console.log('mqp', mqp); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('::start', () => { + it('should log an error and exit if MongoDB connection fails', done => { + const loggerErrorStub = sinon.stub(mqp.logger, 'error'); + const loggerFatalStub = sinon.stub(mqp.logger, 'fatal'); + const processExitStub = sinon.stub(process, 'exit'); + + sinon.stub(mqp._mongoClient, 'setup').callsFake(callback => { + callback(new Error('Simulated connection failure')); + }); + + mqp.start(); + + setTimeout(() => { + assert(loggerErrorStub.calledOnce); + assert(loggerErrorStub.calledWith('could not connect to MongoDB')); + assert(loggerFatalStub.calledOnce); + assert(loggerFatalStub.calledWith('error starting mongo queue processor')); + assert(processExitStub.calledOnce); + assert(processExitStub.calledWith(1)); + done(); + }, 100); + }); + + it.skip('should initialize and start the Kafka consumer', done => { + const consumerStub = sinon.stub(BackbeatConsumer.prototype, 'on'); + const subscribeStub = sinon.stub(BackbeatConsumer.prototype, 'subscribe'); + + sinon.stub(mqp._mongoClient, 'setup').callsFake(callback => { + callback(null); + }); + + mqp.start(); + + setTimeout(() => { + assert(consumerStub.calledTwice); + assert(subscribeStub.calledOnce); + done(); + }, 100); + }); + }); + + describe('::stop', () => { + it('should close the Kafka consumer if it exists', done => { + mqp._consumer = new EventEmitter(); + mqp._consumer.close = sinon.stub().callsFake(callback => { + callback(); + }); + + const loggerDebugStub = sinon.stub(mqp.logger, 'debug'); + + mqp.stop(() => { + assert(loggerDebugStub.calledOnce); + assert(loggerDebugStub.calledWith('closing kafka consumer')); + assert(mqp._consumer.close.calledOnce); + done(); + }); + }); + + it('should log a message if there is no Kafka consumer to close', done => { + const loggerDebugStub = sinon.stub(mqp.logger, 'debug'); + + mqp.stop(() => { + assert(loggerDebugStub.calledOnce); + assert(loggerDebugStub.calledWith('no kafka consumer to close')); + done(); + }); + }); + }); +}); diff --git a/tests/unit/lib/queuePopulator/KafkaLogReader.spec.js b/tests/unit/lib/queuePopulator/KafkaLogReader.spec.js new file mode 100644 index 000000000..9ecf52eaa --- /dev/null +++ b/tests/unit/lib/queuePopulator/KafkaLogReader.spec.js @@ -0,0 +1,93 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { Logger } = require('werelogs'); +const ZookeeperMock = require('zookeeper-mock'); +const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader'); + +class MockLogConsumer { + constructor(params) { + this.params = params || {}; + } + + setup(callback) { + process.nextTick(() => { + if (this.params.setupError) { + callback(this.params.setupError); + } else { + callback(null); + } + }); + } +} + +describe('KafkaLogReader', () => { + let kafkaLogReader; + let zkMock; + let logger; + + beforeEach(() => { + zkMock = new ZookeeperMock(); + logger = new Logger('test:KafkaLogReader'); + kafkaLogReader = new KafkaLogReader({ + zkClient: zkMock.createClient('localhost:2181'), + kafkaConfig: { hosts: 'localhost:9092' }, + zkConfig: { connectionString: 'localhost:2181' }, + qpKafkaConfig: { logName: 'test-log' }, + logger, + extensions: [], + metricsHandler: {}, + }); + kafkaLogReader.logConsumer = new MockLogConsumer(); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should initialize KafkaLogReader correctly', () => { + assert(kafkaLogReader); + assert(kafkaLogReader.logConsumer instanceof MockLogConsumer); + assert.strictEqual(kafkaLogReader._kafkaConfig.hosts, 'localhost:9092'); + assert.strictEqual(kafkaLogReader._kafkaConfig.logName, 'test-log'); + }); + }); + + describe('setup', () => { + it('should setup log consumer successfully', done => { + const setupStub = sinon.stub(kafkaLogReader.logConsumer, 'setup').callsFake(callback => callback(null)); + kafkaLogReader.setup(err => { + assert.ifError(err); + assert(setupStub.calledOnce); + done(); + }); + }); + + it('should handle log consumer setup error', done => { + const setupStub = sinon.stub(kafkaLogReader.logConsumer, 'setup') + .callsFake(callback => callback(new Error('setup error'))); + kafkaLogReader.setup(err => { + assert.strictEqual(err.message, 'setup error'); + assert(setupStub.calledOnce); + done(); + }); + }); + }); + + describe('getLogInfo', () => { + it('should return log info', () => { + const logInfo = kafkaLogReader.getLogInfo(); + assert.deepStrictEqual(logInfo, { logName: 'test-log' }); + }); + }); + + describe('getMetricLabels', () => { + it('should return metric labels', () => { + const metricLabels = kafkaLogReader.getMetricLabels(); + assert.deepStrictEqual(metricLabels, { + logName: 'kafka-log', + logId: 'test-log', + }); + }); + }); +}); diff --git a/tests/unit/lib/queuePopulator/MongoLogReader.spec.js b/tests/unit/lib/queuePopulator/MongoLogReader.spec.js new file mode 100644 index 000000000..c2d42e5a0 --- /dev/null +++ b/tests/unit/lib/queuePopulator/MongoLogReader.spec.js @@ -0,0 +1,93 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { Logger } = require('werelogs'); +const ZookeeperMock = require('zookeeper-mock'); +const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader'); + +class MockLogConsumer { + constructor(params) { + this.params = params || {}; + } + + connectMongo(callback) { + process.nextTick(() => { + if (this.params.connectMongoError) { + callback(this.params.connectMongoError); + } else { + callback(null); + } + }); + } +} + +describe('MongoLogReader', () => { + let mongoLogReader; + let zkMock; + let logger; + + beforeEach(() => { + zkMock = new ZookeeperMock(); + logger = new Logger('test:MongoLogReader'); + mongoLogReader = new MongoLogReader({ + zkClient: zkMock.createClient('localhost:2181'), + kafkaConfig: { hosts: 'localhost:9092' }, + zkConfig: { connectionString: 'localhost:2181' }, + mongoConfig: { logName: 'test-log' }, + logger, + extensions: [], + metricsHandler: {}, + }); + mongoLogReader.setLogConsumer(new MockLogConsumer()); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should initialize MongoLogReader correctly', () => { + assert(mongoLogReader); + assert(mongoLogReader.logConsumer instanceof MockLogConsumer); + assert.strictEqual(mongoLogReader._mongoConfig.logName, 'test-log'); + }); + }); + + describe('setup', () => { + it('should setup MongoDB connection successfully', done => { + const connectMongoStub = sinon.stub(mongoLogReader.logConsumer, 'connectMongo') + .callsFake(callback => callback(null)); + mongoLogReader.setup(err => { + assert.ifError(err); + assert(connectMongoStub.calledOnce); + done(); + }); + }); + + it('should handle MongoDB connection error', done => { + const connectMongoStub = sinon.stub(mongoLogReader.logConsumer, 'connectMongo') + .callsFake(callback => callback(new Error('connection error'))); + mongoLogReader.setup(err => { + assert.strictEqual(err.message, 'connection error'); + assert(connectMongoStub.calledOnce); + done(); + }); + }); + }); + + describe('getLogInfo', () => { + it('should return log info', () => { + const logInfo = mongoLogReader.getLogInfo(); + assert.deepStrictEqual(logInfo, { logName: 'test-log' }); + }); + }); + + describe('getMetricLabels', () => { + it('should return metric labels', () => { + const metricLabels = mongoLogReader.getMetricLabels(); + assert.deepStrictEqual(metricLabels, { + logName: 'mongo-log', + logId: 'test-log', + }); + }); + }); +}); diff --git a/tests/unit/lib/util/circuitBreaker.spec.js b/tests/unit/lib/util/circuitBreaker.spec.js index 37c43469b..61d3f01d6 100644 --- a/tests/unit/lib/util/circuitBreaker.spec.js +++ b/tests/unit/lib/util/circuitBreaker.spec.js @@ -165,6 +165,10 @@ describe('updateCircuitBreakerConfigForImplicitOutputQueue', () => { }); describe('startCircuitBreakerMetricsExport', () => { + beforeEach(() => { + circuitBreakerGauge.reset(); + circuitBreakerCounter.reset(); + }); it('should export circuit breaker state and not increment counter', done => { const cb = { state: 1234, failedProbes: false }; startCircuitBreakerMetricsExport(cb, 'test', 10);