diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 6811079f4..7561e36b4 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -1,7 +1,5 @@ 'use strict'; // eslint-disable-line -const async = require('async'); - const Logger = require('werelogs').Logger; const errors = require('arsenal').errors; const { replicationBackends, emptyFileMd5 } = require('arsenal').constants; @@ -15,16 +13,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer'); const QueueEntry = require('../../lib/models/QueueEntry'); const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry'); const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry'); -const MetricsProducer = require('../../lib/MetricsProducer'); -const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } = - require('../ingestion/constants'); + const getContentType = require('./utils/contentTypeHelper'); const BucketMemState = require('./utils/BucketMemState'); const MongoProcessorMetrics = require('./MongoProcessorMetrics'); -// batch metrics by location and send to kafka metrics topic every 5 seconds -const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000; - // TODO - ADD PREFIX BASED ON SOURCE // april 6, 2018 @@ -76,19 +69,6 @@ class MongoQueueProcessor { this._mongoClient = new MongoClient(this.mongoClientConfig); this._bucketMemState = new BucketMemState(Config); - // in-mem batch of metrics, we only track total entry count by location - // this._accruedMetrics = { zenko-location: 10 } - this._accruedMetrics = {}; - - setInterval(() => { - this._sendMetrics(); - }, METRIC_REPORT_INTERVAL_MS); - } - - _setupMetricsClients(cb) { - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig); - this._mProducer.setupProducer(cb); } /** @@ -97,32 +77,15 @@ class MongoQueueProcessor { * @return {undefined} */ start() { - this.logger.info('starting mongo queue processor'); - async.series([ - next => this._setupMetricsClients(err => { - if (err) { - this.logger.error('error setting up metrics client', { - method: 'MongoQueueProcessor.start', - error: err, - }); - } - return next(err); - }), - next => this._mongoClient.setup(err => { - if (err) { - this.logger.error('could not connect to MongoDB', { - method: 'MongoQueueProcessor.start', - error: err.message, - }); - } - return next(err); - }), - ], error => { - if (error) { + this._mongoClient.setup(err => { + if (err) { + this.logger.error('could not connect to MongoDB', { + method: 'MongoQueueProcessor.start', + error: err.message, + }); this.logger.fatal('error starting mongo queue processor'); process.exit(1); } - this._bootstrapList = Config.getBootstrapList(); Config.on('bootstrap-list-update', () => { this._bootstrapList = Config.getBootstrapList(); @@ -164,32 +127,17 @@ class MongoQueueProcessor { * @return {undefined} */ stop(done) { - async.parallel([ - next => { - if (this._consumer) { - this.logger.debug('closing kafka consumer', { - method: 'MongoQueueProcessor.stop', - }); - return this._consumer.close(next); - } - this.logger.debug('no kafka consumer to close', { - method: 'MongoQueueProcessor.stop', - }); - return next(); - }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'MongoQueueProcessor.stop', - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'MongoQueueProcessor.stop', - }); - return next(); - }, - ], done); + if (this._consumer) { + this.logger.debug('closing kafka consumer', { + method: 'MongoQueueProcessor.stop', + }); + this._consumer.close(done); + } else { + this.logger.debug('no kafka consumer to close', { + method: 'MongoQueueProcessor.stop', + }); + done(); + } } _getZenkoObjectMetadata(log, entry, bucketInfo, done) { @@ -408,7 +356,6 @@ class MongoQueueProcessor { return this._mongoClient.deleteObject(bucket, key, options, log, err => { if (err) { - this._normalizePendingMetric(location); log.end().error('error deleting object metadata ' + 'from mongo', { bucket, @@ -418,7 +365,6 @@ class MongoQueueProcessor { }); return done(err); } - this._produceMetricCompletionEntry(location); log.end().info('object metadata deleted from mongo', { entry: sourceEntry.getLogInfo(), location, @@ -443,7 +389,6 @@ class MongoQueueProcessor { this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { if (err) { - this._normalizePendingMetric(location); log.end().error('error processing object queue entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', entry: sourceEntry.getLogInfo(), @@ -454,7 +399,6 @@ class MongoQueueProcessor { const content = getContentType(sourceEntry, zenkoObjMd); if (content.length === 0) { - this._normalizePendingMetric(location); log.end().debug('skipping duplicate entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', entry: sourceEntry.getLogInfo(), @@ -498,7 +442,6 @@ class MongoQueueProcessor { return this._mongoClient.putObject(bucket, key, objVal, params, this.logger, err => { if (err) { - this._normalizePendingMetric(location); log.end().error('error putting object metadata ' + 'to mongo', { bucket, @@ -509,7 +452,6 @@ class MongoQueueProcessor { }); return done(err); } - this._produceMetricCompletionEntry(location); log.end().info('object metadata put to mongo', { entry: sourceEntry.getLogInfo(), location, @@ -519,49 +461,6 @@ class MongoQueueProcessor { }); } - /** - * Send accrued metrics by location to kafka - * @return {undefined} - */ - _sendMetrics() { - Object.keys(this._accruedMetrics).forEach(loc => { - const count = this._accruedMetrics[loc]; - - // only report metrics if something has been recorded for location - if (count > 0) { - this._accruedMetrics[loc] = 0; - const metric = { [loc]: { ops: count } }; - this._mProducer.publishMetrics(metric, metricsTypeCompleted, - metricsExtension, () => {}); - } - }); - } - - /** - * Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS - * @param {string} location - zenko storage location name - * @return {undefined} - */ - _produceMetricCompletionEntry(location) { - if (this._accruedMetrics[location]) { - this._accruedMetrics[location] += 1; - } else { - this._accruedMetrics[location] = 1; - } - } - - /** - * For cases where we experience an error or skip an entry, we need to - * normalize pending metric. This means we will see pending metrics stuck - * above 0 and will need to bring those metrics down - * @param {string} location - location constraint name - * @return {undefined} - */ - _normalizePendingMetric(location) { - const metric = { [location]: { ops: 1 } }; - this._mProducer.publishMetrics(metric, metricsTypePendingOnly, - metricsExtension, () => {}); - } /** * Get bucket info in memoize state if exists, otherwise fetch from Mongo @@ -639,7 +538,6 @@ class MongoQueueProcessor { entryType: sourceEntry.constructor.name, method: 'MongoQueueProcessor.processKafkaEntry', }); - this._normalizePendingMetric(location); return process.nextTick(done); }); } diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 7d59cb282..c7317ee92 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -28,7 +28,6 @@ const EchoBucket = require('../tasks/EchoBucket'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry'); const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); -const MetricsProducer = require('../../../lib/MetricsProducer'); const libConstants = require('../../../lib/constants'); const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); @@ -222,7 +221,6 @@ class QueueProcessor extends EventEmitter { this.replicationStatusProducer = null; this._consumer = null; this._dataMoverConsumer = null; - this._mProducer = null; this.site = site; this.mConfig = mConfig; this.serviceName = this.isReplayTopic ? @@ -695,7 +693,6 @@ class QueueProcessor extends EventEmitter { vaultclientCache: this.vaultclientCache, accountCredsCache: this.accountCredsCache, replicationStatusProducer: this.replicationStatusProducer, - mProducer: this._mProducer, logger: this.logger, site: this.site, consumer: this._consumer, @@ -722,16 +719,7 @@ class QueueProcessor extends EventEmitter { * @return {undefined} */ start(options) { - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); return async.parallel([ - done => this._mProducer.setupProducer(err => { - if (err) { - this.logger.info('error setting up metrics producer', - { error: err.message }); - process.exit(1); - } - return done(); - }), done => this._setupProducer(err => { if (err) { this.logger.info('error setting up kafka producer', @@ -842,20 +830,6 @@ class QueueProcessor extends EventEmitter { }); return next(); }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'QueueProcessor.stop', - site: this.site, - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'QueueProcessor.stop', - site: this.site, - }); - return next(); - }, ], done); } diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index 4c366473c..06f22fe06 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -16,7 +16,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const FailedCRRProducer = require('../failedCRR/FailedCRRProducer'); const ReplayProducer = require('../replay/ReplayProducer'); -const MetricsProducer = require('../../../lib/MetricsProducer'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); // StatsClient constant default for site metrics @@ -216,7 +215,6 @@ class ReplicationStatusProcessor { this.gcConfig = gcConfig; this._consumer = null; this._gcProducer = null; - this._mProducer = null; this.logger = new Logger('Backbeat:Replication:ReplicationStatusProcessor'); @@ -336,7 +334,6 @@ class ReplicationStatusProcessor { sourceHTTPAgent: this.sourceHTTPAgent, vaultclientCache: this.vaultclientCache, gcProducer: this._gcProducer, - mProducer: this._mProducer, statsClient: this._statsClient, failedCRRProducer: this._failedCRRProducer, replayProducers: this._ReplayProducers, @@ -379,11 +376,6 @@ class ReplicationStatusProcessor { done(); } }, - done => { - this._mProducer = new MetricsProducer(this.kafkaConfig, - this.mConfig); - this._mProducer.setupProducer(done); - }, done => { let consumerReady = false; this._consumer = new BackbeatConsumer({ @@ -434,18 +426,6 @@ class ReplicationStatusProcessor { method: 'ReplicationStatusProcessor.stop', }); return next(); - }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'ReplicationStatusProcessor.stop', - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'ReplicationStatusProcessor.stop', - }); - return next(); } ], done); } diff --git a/extensions/replication/tasks/CopyLocationTask.js b/extensions/replication/tasks/CopyLocationTask.js index 9d197f5f5..d17056c55 100644 --- a/extensions/replication/tasks/CopyLocationTask.js +++ b/extensions/replication/tasks/CopyLocationTask.js @@ -8,15 +8,12 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); const BackbeatClient = require('../../../lib/clients/BackbeatClient'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); const { LifecycleMetrics } = require('../../lifecycle/LifecycleMetrics'); -const ReplicationMetric = require('../ReplicationMetric'); const ReplicationMetrics = require('../ReplicationMetrics'); const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils'); const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials'); const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); -const { metricsExtension, metricsTypeQueued, metricsTypeCompleted } = - require('../constants'); const MPU_GCP_MAX_PARTS = 1024; @@ -41,10 +38,6 @@ class CopyLocationTask extends BackbeatTask { this.retryParams = retryParams; } } - this._replicationMetric = new ReplicationMetric() - .withProducer(this.mProducer.getProducer()) - .withSite(this.site) - .withExtension(metricsExtension); } _validateActionCredentials(actionEntry) { @@ -195,11 +188,6 @@ class CopyLocationTask extends BackbeatTask { _getAndPutObject(actionEntry, objMD, log, cb) { const objectLogger = this.logger.newRequestLogger(log.getUids()); - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeQueued) - .withObjectSize(objMD.getContentLength()) - .publish(); this.retry({ actionDesc: 'stream object data', logFields: { entry: actionEntry.getLogInfo() }, @@ -335,11 +323,6 @@ class CopyLocationTask extends BackbeatTask { actionEntry.setSuccess({ location: data.location, }); - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeCompleted) - .withObjectSize(size) - .publish(); return cb(null, data); }); } @@ -591,11 +574,6 @@ class CopyLocationTask extends BackbeatTask { } return doneOnce(err); } - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeCompleted) - .withObjectSize(size) - .publish(); return doneOnce(null, data); }); } @@ -786,11 +764,6 @@ class CopyLocationTask extends BackbeatTask { if (err) { return cb(err); } - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeQueued) - .withObjectSize(objMD.getContentLength()) - .publish(); return this._completeRangedMPU(actionEntry, objMD, uploadId, log, cb); }); diff --git a/extensions/replication/tasks/MultipleBackendTask.js b/extensions/replication/tasks/MultipleBackendTask.js index 2d98aa2e2..0f45df2c3 100644 --- a/extensions/replication/tasks/MultipleBackendTask.js +++ b/extensions/replication/tasks/MultipleBackendTask.js @@ -8,8 +8,6 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const ReplicateObject = require('./ReplicateObject'); const { attachReqUids } = require('../../../lib/clients/utils'); -const getExtMetrics = require('../utils/getExtMetrics'); -const { metricsExtension, metricsTypeQueued } = require('../constants'); const MPU_GCP_MAX_PARTS = 1024; @@ -702,10 +700,6 @@ class MultipleBackendTask extends ReplicateObject { if (err) { return doneOnce(err); } - const extMetrics = getExtMetrics(this.site, - sourceEntry.getContentLength(), sourceEntry); - this.mProducer.publishMetrics(extMetrics, metricsTypeQueued, - metricsExtension, () => {}); return this._completeRangedMPU(sourceEntry, uploadId, log, doneOnce); }); @@ -713,10 +707,6 @@ class MultipleBackendTask extends ReplicateObject { _getAndPutObject(sourceEntry, log, cb) { const partLogger = this.logger.newRequestLogger(log.getUids()); - const extMetrics = getExtMetrics(this.site, - sourceEntry.getContentLength(), sourceEntry); - this.mProducer.publishMetrics(extMetrics, metricsTypeQueued, - metricsExtension, () => {}); if (BACKBEAT_INJECT_REPLICATION_ERROR_COPYOBJ) { if (Math.random() < BACKBEAT_INJECT_REPLICATION_ERROR_RATE) { diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index d0b926e1c..bb3edb7f9 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -10,11 +10,10 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); const mapLimitWaitPendingIfError = require('../../../lib/util/mapLimitWaitPendingIfError'); const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils'); -const getExtMetrics = require('../utils/getExtMetrics'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials'); const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); -const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants'); +const { replicationStages } = require('../constants'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); @@ -434,9 +433,6 @@ class ReplicateObject extends BackbeatTask { location: this.site, replicationContent: 'data', }); - const extMetrics = getExtMetrics(this.site, size, sourceEntry); - this.mProducer.publishMetrics(extMetrics, - metricsTypeCompleted, metricsExtension, () => {}); } _publishMetadataWriteMetrics(buffer, writeStartTime) { @@ -758,10 +754,6 @@ class ReplicateObject extends BackbeatTask { // Get data from source bucket and put it on the target bucket next => { if (!mdOnly) { - const extMetrics = getExtMetrics(this.site, - sourceEntry.getContentLength(), sourceEntry); - this.mProducer.publishMetrics(extMetrics, - metricsTypeQueued, metricsExtension, () => {}); return this._getAndPutData(sourceEntry, destEntry, log, next); } diff --git a/extensions/replication/tasks/UpdateReplicationStatus.js b/extensions/replication/tasks/UpdateReplicationStatus.js index cc326226c..bd88068ca 100644 --- a/extensions/replication/tasks/UpdateReplicationStatus.js +++ b/extensions/replication/tasks/UpdateReplicationStatus.js @@ -8,11 +8,6 @@ const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); -const { - metricsExtension, - metricsTypeCompleted, - metricsTypeFailed, -} = require('../constants'); const { getSortedSetMember, getSortedSetKey, @@ -106,39 +101,6 @@ class UpdateReplicationStatus extends BackbeatTask { this.failedCRRProducer.publishFailedCRREntry(JSON.stringify(message)); } - /** - * Report CRR metrics - * @param {ObjectQueueEntry} sourceEntry - The original entry - * @param {ObjectQueueEntry} updatedSourceEntry - updated object entry - * @return {undefined} - */ - _reportMetrics(sourceEntry, updatedSourceEntry) { - const content = updatedSourceEntry.getReplicationContent(); - const contentLength = updatedSourceEntry.getContentLength(); - const bytes = content.includes('DATA') ? contentLength : 0; - const data = {}; - const site = sourceEntry.getSite(); - data[site] = { ops: 1, bytes }; - const status = sourceEntry.getReplicationSiteStatus(site); - // Report to MetricsProducer with completed/failed metrics. - if (status === 'COMPLETED' || status === 'FAILED') { - const entryType = status === 'COMPLETED' ? - metricsTypeCompleted : metricsTypeFailed; - - this.mProducer.publishMetrics(data, entryType, metricsExtension, - err => { - if (err) { - this.logger.trace('error occurred in publishing metrics', { - error: err, - method: 'UpdateReplicationStatus._reportMetrics', - }); - } - }); - // TODO: update ZenkoMetrics - } - return undefined; - } - /** * Get the appropriate source metadata for a non-versioned bucket. If the * object metadata has changed since we performed CRR, then we want to @@ -445,7 +407,6 @@ class UpdateReplicationStatus extends BackbeatTask { return done(err); } - this._reportMetrics(sourceEntry, updatedSourceEntry); return this._handleGarbageCollection( updatedSourceEntry, log, done); }); diff --git a/lib/MetricsConsumer.js b/lib/MetricsConsumer.js deleted file mode 100644 index 9fdc4efda..000000000 --- a/lib/MetricsConsumer.js +++ /dev/null @@ -1,234 +0,0 @@ -'use strict'; // eslint-disable-line strict - -const Logger = require('werelogs').Logger; -const { RedisClient, StatsModel } = require('arsenal').metrics; -const errors = require('arsenal').errors; - -const BackbeatConsumer = require('./BackbeatConsumer'); -const { - redisKeys: crrRedisKeys, - metricsExtension: crrExtension, -} = require('../extensions/replication/constants'); -const { - redisKeys: ingestionRedisKeys, - metricsExtension: ingestionExtension, -} = require('../extensions/ingestion/constants'); - -// StatsClient constant defaults for site metrics -const INTERVAL = 300; // 5 minutes; -const EXPIRY = 86400; // 24 hours - -// BackbeatConsumer constant defaults -const CONSUMER_FETCH_MAX_BYTES = 5000020; -const CONCURRENCY = 10; - -class MetricsConsumer { - /** - * @constructor - * @param {object} rConfig - redis ha configuration - * @param {string} rConfig.host - redis ha host - * @param {number} rConfig.port - redis ha port - * @param {object} mConfig - metrics configurations - * @param {string} mConfig.topic - metrics topic name - * @param {object} kafkaConfig - kafka configurations - * @param {string} kafkaConfig.hosts - kafka hosts - * as "host:port[/chroot]" - * @param {string} id - identifier used for filtering metrics entries - */ - constructor(rConfig, mConfig, kafkaConfig, id) { - this.mConfig = mConfig; - this.kafkaConfig = kafkaConfig; - this._id = id; - - this._consumer = null; - - this.logger = new Logger('Backbeat:MetricsConsumer'); - const redisClient = new RedisClient(rConfig, this.logger); - this._statsClient = new StatsModel(redisClient, INTERVAL, EXPIRY); - } - - /** - * List of valid "type" field values for metric kafka entries - * @param {string} type - type to check - * @return {boolean} true if type is a valid metric type - */ - static isValidMetricType(type) { - const validTypes = ['completed', 'failed', 'queued', 'pendingOnly']; - return validTypes.includes(type); - } - - start() { - let consumerReady = false; - const consumer = new BackbeatConsumer({ - kafka: { - hosts: this.kafkaConfig.hosts, - site: this.kafkaConfig.site, - }, - topic: this.mConfig.topic, - groupId: `${this.mConfig.groupIdPrefix}-${this._id}`, - concurrency: CONCURRENCY, - queueProcessor: this.processKafkaEntry.bind(this), - fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES, - }); - consumer.on('error', () => { - if (!consumerReady) { - this.logger.fatal('error starting metrics consumer'); - process.exit(1); - } - }); - consumer.on('ready', () => { - consumerReady = true; - consumer.subscribe(); - this._consumer = consumer; - this.logger.info('metrics processor is ready to consume entries'); - }); - } - - _getRedisKeys(extension) { - switch (extension) { - case crrExtension: return crrRedisKeys; - case ingestionExtension: return ingestionRedisKeys; - default: - throw errors.InternalError.customizeDescription( - `${extension} is not a valid extension`); - } - } - - _reportPending(site, redisKeys, ops, bytes) { - if (ops > 0) { - this._sendRequest('incrementKey', site, redisKeys, 'opsPending', - ops); - } - if (ops < 0) { - this._sendRequest('decrementKey', site, redisKeys, 'opsPending', - Math.abs(ops)); - } - if (bytes > 0) { - this._sendRequest('incrementKey', site, redisKeys, 'bytesPending', - bytes); - } - if (bytes < 0) { - this._sendRequest('decrementKey', site, redisKeys, 'bytesPending', - Math.abs(bytes)); - } - } - - _sendSiteLevelRequests(data) { - const { type, site, ops, bytes, extension } = data; - let redisKeys; - try { - redisKeys = this._getRedisKeys(extension); - } catch (err) { - return this.logger.error('error consuming metric entry', { - method: 'MetricsConsumer._sendSiteLevelRequests', - site, - type, - }); - } - if (type === 'completed') { - // Pending metrics - this._reportPending(site, redisKeys, -ops, -bytes); - // Other metrics - this._sendRequest('reportNewRequest', site, redisKeys, 'opsDone', - ops); - this._sendRequest('reportNewRequest', site, redisKeys, 'bytesDone', - bytes); - } else if (type === 'failed') { - // Pending metrics - this._reportPending(site, redisKeys, -ops, -bytes); - // Other metrics - this._sendRequest('reportNewRequest', site, redisKeys, 'opsFail', - ops); - this._sendRequest('reportNewRequest', site, redisKeys, 'bytesFail', - bytes); - } else if (type === 'queued') { - // Pending metrics - this._reportPending(site, redisKeys, ops, bytes); - // Other metrics - this._sendRequest('reportNewRequest', site, redisKeys, 'ops', ops); - this._sendRequest('reportNewRequest', site, redisKeys, 'bytes', - bytes); - } else if (type === 'pendingOnly') { - this._reportPending(site, redisKeys, ops, bytes); - } - return undefined; - } - - _sendObjectLevelRequests(data) { - const { type, site, bytes, extension, - bucketName, objectKey, versionId } = data; - const redisKeys = this._getRedisKeys(extension); - if (type === 'completed') { - const key = `${site}:${bucketName}:${objectKey}:` + - `${versionId}:${redisKeys.objectBytesDone}`; - this._sendObjectRequest(key, bytes); - } else if (type === 'queued') { - const key = `${site}:${bucketName}:${objectKey}:` + - `${versionId}:${redisKeys.objectBytes}`; - this._sendObjectRequest(key, bytes); - } - return undefined; - } - - processKafkaEntry(kafkaEntry, done) { - const log = this.logger.newRequestLogger(); - let data; - try { - data = JSON.parse(kafkaEntry.value); - } catch (err) { - log.error('error processing metrics entry', { - method: 'MetricsConsumer.processKafkaEntry', - error: err, - }); - log.end(); - return done(); - } - /* - data = { - timestamp: 1509416671977, - ops: 5, - bytes: 195, - extension: 'crr', - type: 'queued' - } - */ - // filter metric entries by service, i.e. 'crr', 'ingestion' - if (this._id !== data.extension) { - return done(); - } - const isValidType = MetricsConsumer.isValidMetricType(data.type); - if (!isValidType) { - log.error('unknown type field encountered in metrics consumer', { - method: 'MetricsConsumer.processKafkaEntry', - dataType: data.type, - data, - }); - log.end(); - return done(); - } - if (data.bucketName && data.objectKey && data.versionId) { - this._sendObjectLevelRequests(data); - } else { - this._sendSiteLevelRequests(data); - } - log.end(); - return done(); - } - - _sendRequest(action, site, redisKeys, keyType, value) { - if (redisKeys[keyType]) { - this._statsClient[action](`${site}:${redisKeys[keyType]}`, - value || 0); - } - } - - _sendObjectRequest(key, value) { - this._statsClient.reportNewRequest(key, value); - } - - close(cb) { - this._consumer.close(cb); - } -} - -module.exports = MetricsConsumer; diff --git a/lib/MetricsProducer.js b/lib/MetricsProducer.js deleted file mode 100644 index 0493234b1..000000000 --- a/lib/MetricsProducer.js +++ /dev/null @@ -1,77 +0,0 @@ -'use strict'; // eslint-disable-line strict - -const async = require('async'); -const { Logger } = require('werelogs'); - -const BackbeatProducer = require('./BackbeatProducer'); -const MetricsModel = require('./models/MetricsModel'); - -class MetricsProducer { - /** - * @constructor - * @param {Object} kafkaConfig - kafka connection config - * @param {Object} mConfig - metrics configurations - */ - constructor(kafkaConfig, mConfig) { - this._kafkaConfig = kafkaConfig; - this._topic = mConfig.topic; - - this._producer = null; - this._log = new Logger('MetricsProducer'); - } - - setupProducer(done) { - const producer = new BackbeatProducer({ - kafka: { hosts: this._kafkaConfig.hosts }, - maxRequestSize: this._kafkaConfig.maxRequestSize, - topic: this._topic, - }); - producer.once('error', done); - producer.once('ready', () => { - producer.removeAllListeners('error'); - producer.on('error', err => { - this._log.error('error from backbeat producer', - { error: err }); - }); - this._producer = producer; - done(); - }); - } - - getProducer() { - return this._producer; - } - - /** - * @param {Object} extMetrics - an object where keys are all sites for a - * given extension and values are the metrics for the site - * (i.e. { my-site: { ops: 1, bytes: 124 }, awsbackend: { ... } } ) - * @param {String} type - type of metric (queueud or processed) - * @param {String} ext - extension (i.e. 'crr') - * @param {function} cb - callback - * @return {undefined} - */ - publishMetrics(extMetrics, type, ext, cb) { - async.each(Object.keys(extMetrics), (siteName, done) => { - const { ops, bytes, bucketName, objectKey, versionId } = - extMetrics[siteName]; - const message = new MetricsModel(ops, bytes, ext, type, - siteName, bucketName, objectKey, versionId).serialize(); - this._producer.send([{ message }], err => { - if (err) { - // Using trace here because errors are already logged in - // BackbeatProducer. This is to log to see source of caller - this._log.trace(`error publishing ${type} metrics for` + - `extension metrics ${ext}`, { error: err }); - } - done(); - }); - }, cb); - } - - close(cb) { - this._producer.close(cb); - } -} - -module.exports = MetricsProducer; diff --git a/lib/queuePopulator/BucketFileLogReader.js b/lib/queuePopulator/BucketFileLogReader.js index dcf5b7dad..34e1b4132 100644 --- a/lib/queuePopulator/BucketFileLogReader.js +++ b/lib/queuePopulator/BucketFileLogReader.js @@ -6,10 +6,10 @@ const LogReader = require('./LogReader'); class BucketFileLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, dmdConfig, logger, - extensions, metricsProducer, metricsHandler } = params; + extensions, metricsHandler } = params; super({ zkClient, kafkaConfig, logConsumer: null, logId: `bucketFile_${dmdConfig.logName}`, logger, extensions, - metricsProducer, metricsHandler }); + metricsHandler }); this._dmdConfig = dmdConfig; this._log = logger; diff --git a/lib/queuePopulator/IngestionPopulator.js b/lib/queuePopulator/IngestionPopulator.js index c077ce0ef..99cedc76a 100644 --- a/lib/queuePopulator/IngestionPopulator.js +++ b/lib/queuePopulator/IngestionPopulator.js @@ -9,9 +9,6 @@ const Logger = require('werelogs').Logger; const config = require('../Config'); const IngestionReader = require('./IngestionReader'); const BackbeatProducer = require('../BackbeatProducer'); -const MetricsConsumer = require('../MetricsConsumer'); -const MetricsProducer = require('../MetricsProducer'); -const { metricsExtension } = require('../../extensions/ingestion/constants'); const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics'); const { startCircuitBreakerMetricsExport, @@ -79,11 +76,6 @@ class IngestionPopulator { // shared producer across readers this._producer = null; - // metrics clients - this._mProducer = null; - this._mConsumer = null; - this._redis = null; - // all ingestion readers (including paused ones) // i.e.: { zenko-bucket-name: IngestionReader() } this._ingestionSources = {}; @@ -152,17 +144,6 @@ class IngestionPopulator { }); } - _setupMetricsClients(cb) { - // Metrics Consumer - this._mConsumer = new MetricsConsumer(this.rConfig, this.mConfig, - this.kafkaConfig, metricsExtension); - this._mConsumer.start(); - - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); - this._mProducer.setupProducer(cb); - } - _setupProducer(cb) { if (this._producer) { return process.nextTick(cb); @@ -580,7 +561,6 @@ class IngestionPopulator { logger: this.log, extensions: [this._extension], producer: this._producer, - metricsProducer: this._mProducer, qpConfig: this.qpConfig, s3Config: this.s3Config, }); @@ -662,30 +642,6 @@ class IngestionPopulator { */ close(done) { async.series([ - next => { - if (this._mProducer) { - this.log.debug('closing metrics producer', { - method: 'IngestionPopulator.close', - }); - return this._mProducer.close(next); - } - this.log.debug('no metrics producer to close', { - method: 'IngestionPopulator.close', - }); - return next(); - }, - next => { - if (this._mConsumer) { - this.log.debug('closing metrics consumer', { - method: 'IngestionPopulator.close', - }); - return this._mConsumer.close(next); - } - this.log.debug('no metrics consumer to close', { - method: 'IngestionPopulator.close', - }); - return next(); - }, next => { if (this._producer) { this.log.debug('closing producer', { diff --git a/lib/queuePopulator/IngestionReader.js b/lib/queuePopulator/IngestionReader.js index 14f8360c3..8a40e65c5 100644 --- a/lib/queuePopulator/IngestionReader.js +++ b/lib/queuePopulator/IngestionReader.js @@ -6,10 +6,6 @@ const VID_SEP = require('arsenal').versioning.VersioningConstants const IngestionProducer = require('./IngestionProducer'); const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics'); const LogReader = require('./LogReader'); -const { - metricsExtension, - metricsTypeQueued -} = require('../../extensions/ingestion/constants'); const { transformKey } = require('../util/entry'); function _isVersionedLogKey(key) { @@ -19,9 +15,9 @@ function _isVersionedLogKey(key) { class IngestionReader extends LogReader { constructor(params) { const { zkClient, ingestionConfig, kafkaConfig, bucketdConfig, qpConfig, - logger, extensions, producer, metricsProducer, s3Config } = params; + logger, extensions, producer, s3Config } = params; super({ zkClient, kafkaConfig, logConsumer: {}, logId: '', logger, - extensions, metricsProducer, zkMetricsHandler: IngestionPopulatorMetrics }); + extensions, zkMetricsHandler: IngestionPopulatorMetrics }); this._ingestionConfig = ingestionConfig; this.qpConfig = qpConfig; this.s3Config = s3Config; @@ -423,7 +419,6 @@ class IngestionReader extends LogReader { if (err) { return done(err); } - this._publishMetrics(); return done(); }); } @@ -446,18 +441,6 @@ class IngestionReader extends LogReader { ], done); } - _publishMetrics() { - // Ingestion extensions is a single IngestionQueuePopulatorExt - const extension = this._extensions[0]; - const location = this.getLocationConstraint(); - const metric = extension.getAndResetMetrics(this._targetZenkoBucket); - if (metric && metric.ops > 0) { - const value = { [location]: metric }; - this._mProducer.publishMetrics(value, metricsTypeQueued, - metricsExtension, () => {}); - } - } - /** * Bucket configs have user editable fields: credentials, endpoint diff --git a/lib/queuePopulator/KafkaLogReader.js b/lib/queuePopulator/KafkaLogReader.js index a5be6cae4..18a3631ce 100644 --- a/lib/queuePopulator/KafkaLogReader.js +++ b/lib/queuePopulator/KafkaLogReader.js @@ -12,14 +12,12 @@ class KafkaLogReader extends LogReader { * @param {Object} params.qpKafkaConfig - queue populator kafka configuration * @param {QueuePopulatorExtension[]} params.extensions - array of * queue populator extension modules - * @param {MetricsProducer} params.metricsProducer - instance of metrics - * producer * @param {MetricsHandler} params.metricsHandler - instance of metrics * handler */ constructor(params) { const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, metricsHandler } = params; // conf contains global kafka and queuePoplator kafka configs const conf = { hosts: kafkaConfig.hosts, @@ -31,7 +29,7 @@ class KafkaLogReader extends LogReader { const logConsumer = new LogConsumer(conf, logger); super({ zkClient, kafkaConfig, zkConfig, logConsumer, logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions, - metricsProducer, metricsHandler }); + metricsHandler }); this._kafkaConfig = conf; } diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index b941582f8..4ab422929 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -4,11 +4,6 @@ const jsutil = require('arsenal').jsutil; const config = require('../Config'); const BackbeatProducer = require('../BackbeatProducer'); -const ReplicationQueuePopulator = - require('../../extensions/replication/ReplicationQueuePopulator'); - -const { metricsExtension, metricsTypeQueued } = - require('../../extensions/replication/constants'); const { transformKey } = require('../util/entry'); class LogReader { @@ -28,8 +23,6 @@ class LogReader { * @param {Logger} params.logger - logger object * @param {QueuePopulatorExtension[]} params.extensions - array of * queue populator extension modules - * @param {MetricsProducer} params.metricsProducer - instance of metrics - * producer * @param {MetricsHandler} params.metricsHandler - instance of metrics * handler * @param {ZkMetricsHandler} params.zkMetricsHandler - instance of zookeeper @@ -45,7 +38,6 @@ class LogReader { this.log = params.logger; this._producers = {}; this._extensions = params.extensions; - this._mProducer = params.metricsProducer; // internal variable to carry over a tailable cursor across batches this._openLog = null; @@ -675,17 +667,6 @@ class LogReader { if (err) { return done(err); } - // Find the CRR Class extension - const crrExtension = this._extensions.find(ext => ( - ext instanceof ReplicationQueuePopulator - )); - if (crrExtension) { - const extMetrics = crrExtension.getAndResetMetrics(); - if (Object.keys(extMetrics).length > 0) { - this._mProducer.publishMetrics(extMetrics, - metricsTypeQueued, metricsExtension, () => { }); - } - } return done(); }); } diff --git a/lib/queuePopulator/MongoLogReader.js b/lib/queuePopulator/MongoLogReader.js index ac9a93568..95081ed25 100644 --- a/lib/queuePopulator/MongoLogReader.js +++ b/lib/queuePopulator/MongoLogReader.js @@ -5,14 +5,14 @@ const LogReader = require('./LogReader'); class MongoLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, zkConfig, mongoConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, metricsHandler } = params; logger.info('initializing mongo log reader', { method: 'MongoLogReader.constructor', mongoConfig }); const logConsumer = new LogConsumer(mongoConfig, logger); super({ zkClient, kafkaConfig, zkConfig, logConsumer, logId: `mongo_${mongoConfig.logName}`, logger, extensions, - metricsProducer, metricsHandler }); + metricsHandler }); this._mongoConfig = mongoConfig; } diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 7fdc07350..ac962864b 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -6,13 +6,10 @@ const { State: ZKState } = require('node-zookeeper-client'); const ProvisionDispatcher = require('../provisioning/ProvisionDispatcher'); const RaftLogReader = require('./RaftLogReader'); const BucketFileLogReader = require('./BucketFileLogReader'); -const MetricsProducer = require('../MetricsProducer'); -const MetricsConsumer = require('../MetricsConsumer'); const FailedCRRConsumer = require('../../extensions/replication/failedCRR/FailedCRRConsumer'); const MongoLogReader = require('./MongoLogReader'); const KafkaLogReader = require('./KafkaLogReader'); -const { metricsExtension } = require('../../extensions/replication/constants'); const NotificationConfigManager = require('../../extensions/notification/NotificationConfigManager'); const { ZenkoMetrics } = require('arsenal').metrics; @@ -171,9 +168,6 @@ class QueuePopulator { // list of updated log readers, if any this.logReadersUpdate = null; - // metrics clients - this._mProducer = null; - this._mConsumer = null; // bucket notification configuration manager this.bnConfigManager = null; this._loadedExtensions = []; @@ -189,15 +183,6 @@ class QueuePopulator { */ open(cb) { async.series([ - next => this._setupMetricsClients(err => { - if (err) { - this.log.error('error setting up metrics client', { - method: 'QueuePopulator.open', - error: err, - }); - } - return next(err); - }), next => this._setupFailedCRRClients(next), next => this._setupNotificationConfigManager(next), next => this._setupZookeeper(err => { @@ -235,17 +220,6 @@ class QueuePopulator { }); } - _setupMetricsClients(cb) { - // Metrics Consumer - this._mConsumer = new MetricsConsumer(this.rConfig, - this.mConfig, this.kafkaConfig, metricsExtension); - this._mConsumer.start(); - - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); - this._mProducer.setupProducer(cb); - } - /** * Set up and start the consumer for retrying failed CRR operations. * @param {Function} cb - The callback function @@ -273,30 +247,6 @@ class QueuePopulator { this._circuitBreaker.stop(); return next(); }, - next => { - if (this._mProducer) { - this.log.debug('closing metrics producer', { - method: 'QueuePopulator.close', - }); - return this._mProducer.close(next); - } - this.log.debug('no metrics producer to close', { - method: 'QueuePopulator.close', - }); - return next(); - }, - next => { - if (this._mConsumer) { - this.log.debug('closing metrics consumer', { - method: 'QueuePopulator.close', - }); - return this._mConsumer.close(next); - } - this.log.debug('no metrics consumer to close', { - method: 'QueuePopulator.close', - }); - return next(); - } ], cb); } @@ -317,7 +267,6 @@ class QueuePopulator { mongoConfig: this.qpConfig.mongo, logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, }), ]; @@ -334,7 +283,6 @@ class QueuePopulator { ), logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, }), ]; @@ -347,7 +295,6 @@ class QueuePopulator { dmdConfig: this.qpConfig.dmd, logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, }), ]; @@ -385,7 +332,6 @@ class QueuePopulator { raftId: token, logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, })); return undefined; diff --git a/lib/queuePopulator/RaftLogReader.js b/lib/queuePopulator/RaftLogReader.js index 24061e037..b96ec0105 100644 --- a/lib/queuePopulator/RaftLogReader.js +++ b/lib/queuePopulator/RaftLogReader.js @@ -7,7 +7,7 @@ const LogReader = require('./LogReader'); class RaftLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, bucketdConfig, httpsConfig, - raftId, logger, extensions, metricsProducer, metricsHandler } = params; + raftId, logger, extensions, metricsHandler } = params; const { host, port } = bucketdConfig; logger.info('initializing raft log reader', { method: 'RaftLogReader.constructor', @@ -25,7 +25,7 @@ class RaftLogReader extends LogReader { raftSession: raftId, logger }); super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`, - logger, extensions, metricsProducer, metricsHandler }); + logger, extensions, metricsHandler }); this.raftId = raftId; } diff --git a/tests/functional/ingestion/IngestionReader.js b/tests/functional/ingestion/IngestionReader.js index 892b1b0e6..67624d610 100644 --- a/tests/functional/ingestion/IngestionReader.js +++ b/tests/functional/ingestion/IngestionReader.js @@ -238,7 +238,6 @@ describe('ingestion reader tests with mock', function fD() { qpConfig: testConfig.queuePopulator, logger: dummyLogger, extensions: [ingestionQP], - metricsProducer: { publishMetrics: () => { } }, s3Config: testConfig.s3, producer, }); @@ -404,7 +403,6 @@ describe('ingestion reader tests with mock', function fD() { qpConfig: testConfig.queuePopulator, logger: dummyLogger, extensions: [ingestionQP], - metricsProducer: { publishMetrics: () => { } }, s3Config: testConfig.s3, producer, }); diff --git a/tests/functional/ingestion/MongoQueueProcessor.js b/tests/functional/ingestion/MongoQueueProcessor.js index e37abea75..f0167ad9e 100644 --- a/tests/functional/ingestion/MongoQueueProcessor.js +++ b/tests/functional/ingestion/MongoQueueProcessor.js @@ -148,12 +148,6 @@ class MongoQueueProcessorMock extends MongoQueueProcessor { start() { // mocks this._mongoClient = new MongoClientMock(); - this._mProducer = { - close: () => {}, - publishMetrics: (metric, type, ext) => { - this.addToMetricsStore({ metric, type, ext }); - }, - }; this._bootstrapList = bootstrapList; this._metricsStore = []; } @@ -162,19 +156,11 @@ class MongoQueueProcessorMock extends MongoQueueProcessor { return this._consumer.sendMockEntry(entry, cb); } - addToMetricsStore(obj) { - this._metricsStore.push(obj); - } - reset() { this._accruedMetrics = {}; this._mongoClient.reset(); } - resetMetricsStore() { - this._metricsStore = []; - } - getAdded() { return this._mongoClient.getAdded(); } @@ -182,10 +168,6 @@ class MongoQueueProcessorMock extends MongoQueueProcessor { getDeleted() { return this._mongoClient.getDeleted(); } - - getMetricsStore() { - return this._metricsStore; - } } describe('MongoQueueProcessor', function mqp() { @@ -282,31 +264,6 @@ describe('MongoQueueProcessor', function mqp() { }); describe('::_processObjectQueueEntry', () => { - function validateMetricReport(type, done) { - // only 2 types of metric type reports - assert(type === 'completed' || type === 'pendingOnly'); - - const expectedMetricStore = [{ - ext: 'ingestion', - metric: { - [LOCATION]: { ops: 1 }, - }, - type, - }]; - - const checker = setInterval(() => { - const ms = mqp.getMetricsStore(); - if (ms.length !== 0) { - clearInterval(checker); - assert.deepStrictEqual(expectedMetricStore, ms); - done(); - } - }, 1000); - } - - afterEach(() => { - mqp.resetMetricsStore(); - }); it('should save to mongo a new version entry and update fields', done => { @@ -367,7 +324,7 @@ describe('MongoQueueProcessor', function mqp() { assert.strictEqual(repInfo.storageType, 'aws_s3'); assert.strictEqual(repInfo.dataStoreVersionId, ''); - validateMetricReport('completed', done); + done(); }); }); @@ -432,7 +389,7 @@ describe('MongoQueueProcessor', function mqp() { assert.strictEqual(loc.dataStoreETag, `1:${contentMD5}`); assert.strictEqual(decode(loc.dataStoreVersionId), NEW_VERSION_ID); - validateMetricReport('completed', done); + done(); }); }); @@ -459,7 +416,7 @@ describe('MongoQueueProcessor', function mqp() { const added = mqp.getAdded(); assert.strictEqual(added.length, 0); - validateMetricReport('pendingOnly', done); + done(); }); }); @@ -486,7 +443,7 @@ describe('MongoQueueProcessor', function mqp() { assert.deepStrictEqual(objVal.replicationInfo.content, ['METADATA', 'DELETE_TAGGING']); - validateMetricReport('completed', done); + done(); }); }); @@ -513,8 +470,7 @@ describe('MongoQueueProcessor', function mqp() { assert.strictEqual(added.length, 1); assert.deepStrictEqual(objVal.replicationInfo.content, ['METADATA', 'PUT_TAGGING']); - - validateMetricReport('completed', done); + done(); }); }); @@ -541,7 +497,7 @@ describe('MongoQueueProcessor', function mqp() { const loc = objVal.location[0]; assert.strictEqual(decode(loc.dataStoreVersionId), nullVersionId); - validateMetricReport('completed', done); + done(); }); }); }); diff --git a/tests/unit/ingestion/mongoQueueProcessor.spec.js b/tests/unit/ingestion/mongoQueueProcessor.spec.js new file mode 100644 index 000000000..4a01cab93 --- /dev/null +++ b/tests/unit/ingestion/mongoQueueProcessor.spec.js @@ -0,0 +1,97 @@ +'use strict'; // eslint-disable-line + +const assert = require('assert'); +const sinon = require('sinon'); +const { EventEmitter } = require('events'); +const MongoQueueProcessor = require('../../../extensions/mongoProcessor/MongoQueueProcessor'); +const BackbeatConsumer = require('../../../lib/BackbeatConsumer'); + +describe('MongoQueueProcessor', () => { + let mqp; + + beforeEach(() => { + mqp = new MongoQueueProcessor( + { hosts: 'localhost:9092', site: 'test-site' }, + { topic: 'test-topic', groupId: 'test-group' }, + { logName: 's3-recordlog', replicaSetHosts: 'localhost:27018' }, + {} + ); + // eslint-disable-next-line no-console + console.log('mqp', mqp.logger); + // eslint-disable-next-line no-console + console.log('mqp', mqp); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('::start', () => { + it('should log an error and exit if MongoDB connection fails', done => { + const loggerErrorStub = sinon.stub(mqp.logger, 'error'); + const loggerFatalStub = sinon.stub(mqp.logger, 'fatal'); + const processExitStub = sinon.stub(process, 'exit'); + + sinon.stub(mqp._mongoClient, 'setup').callsFake(callback => { + callback(new Error('Simulated connection failure')); + }); + + mqp.start(); + + setTimeout(() => { + assert(loggerErrorStub.calledOnce); + assert(loggerErrorStub.calledWith('could not connect to MongoDB')); + assert(loggerFatalStub.calledOnce); + assert(loggerFatalStub.calledWith('error starting mongo queue processor')); + assert(processExitStub.calledOnce); + assert(processExitStub.calledWith(1)); + done(); + }, 100); + }); + + it.skip('should initialize and start the Kafka consumer', done => { + const consumerStub = sinon.stub(BackbeatConsumer.prototype, 'on'); + const subscribeStub = sinon.stub(BackbeatConsumer.prototype, 'subscribe'); + + sinon.stub(mqp._mongoClient, 'setup').callsFake(callback => { + callback(null); + }); + + mqp.start(); + + setTimeout(() => { + assert(consumerStub.calledTwice); + assert(subscribeStub.calledOnce); + done(); + }, 100); + }); + }); + + describe('::stop', () => { + it('should close the Kafka consumer if it exists', done => { + mqp._consumer = new EventEmitter(); + mqp._consumer.close = sinon.stub().callsFake(callback => { + callback(); + }); + + const loggerDebugStub = sinon.stub(mqp.logger, 'debug'); + + mqp.stop(() => { + assert(loggerDebugStub.calledOnce); + assert(loggerDebugStub.calledWith('closing kafka consumer')); + assert(mqp._consumer.close.calledOnce); + done(); + }); + }); + + it('should log a message if there is no Kafka consumer to close', done => { + const loggerDebugStub = sinon.stub(mqp.logger, 'debug'); + + mqp.stop(() => { + assert(loggerDebugStub.calledOnce); + assert(loggerDebugStub.calledWith('no kafka consumer to close')); + done(); + }); + }); + }); +}); diff --git a/tests/unit/lib/queuePopulator/KafkaLogReader.spec.js b/tests/unit/lib/queuePopulator/KafkaLogReader.spec.js new file mode 100644 index 000000000..9ecf52eaa --- /dev/null +++ b/tests/unit/lib/queuePopulator/KafkaLogReader.spec.js @@ -0,0 +1,93 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { Logger } = require('werelogs'); +const ZookeeperMock = require('zookeeper-mock'); +const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader'); + +class MockLogConsumer { + constructor(params) { + this.params = params || {}; + } + + setup(callback) { + process.nextTick(() => { + if (this.params.setupError) { + callback(this.params.setupError); + } else { + callback(null); + } + }); + } +} + +describe('KafkaLogReader', () => { + let kafkaLogReader; + let zkMock; + let logger; + + beforeEach(() => { + zkMock = new ZookeeperMock(); + logger = new Logger('test:KafkaLogReader'); + kafkaLogReader = new KafkaLogReader({ + zkClient: zkMock.createClient('localhost:2181'), + kafkaConfig: { hosts: 'localhost:9092' }, + zkConfig: { connectionString: 'localhost:2181' }, + qpKafkaConfig: { logName: 'test-log' }, + logger, + extensions: [], + metricsHandler: {}, + }); + kafkaLogReader.logConsumer = new MockLogConsumer(); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should initialize KafkaLogReader correctly', () => { + assert(kafkaLogReader); + assert(kafkaLogReader.logConsumer instanceof MockLogConsumer); + assert.strictEqual(kafkaLogReader._kafkaConfig.hosts, 'localhost:9092'); + assert.strictEqual(kafkaLogReader._kafkaConfig.logName, 'test-log'); + }); + }); + + describe('setup', () => { + it('should setup log consumer successfully', done => { + const setupStub = sinon.stub(kafkaLogReader.logConsumer, 'setup').callsFake(callback => callback(null)); + kafkaLogReader.setup(err => { + assert.ifError(err); + assert(setupStub.calledOnce); + done(); + }); + }); + + it('should handle log consumer setup error', done => { + const setupStub = sinon.stub(kafkaLogReader.logConsumer, 'setup') + .callsFake(callback => callback(new Error('setup error'))); + kafkaLogReader.setup(err => { + assert.strictEqual(err.message, 'setup error'); + assert(setupStub.calledOnce); + done(); + }); + }); + }); + + describe('getLogInfo', () => { + it('should return log info', () => { + const logInfo = kafkaLogReader.getLogInfo(); + assert.deepStrictEqual(logInfo, { logName: 'test-log' }); + }); + }); + + describe('getMetricLabels', () => { + it('should return metric labels', () => { + const metricLabels = kafkaLogReader.getMetricLabels(); + assert.deepStrictEqual(metricLabels, { + logName: 'kafka-log', + logId: 'test-log', + }); + }); + }); +}); diff --git a/tests/unit/lib/queuePopulator/MongoLogReader.spec.js b/tests/unit/lib/queuePopulator/MongoLogReader.spec.js new file mode 100644 index 000000000..c2d42e5a0 --- /dev/null +++ b/tests/unit/lib/queuePopulator/MongoLogReader.spec.js @@ -0,0 +1,93 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { Logger } = require('werelogs'); +const ZookeeperMock = require('zookeeper-mock'); +const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader'); + +class MockLogConsumer { + constructor(params) { + this.params = params || {}; + } + + connectMongo(callback) { + process.nextTick(() => { + if (this.params.connectMongoError) { + callback(this.params.connectMongoError); + } else { + callback(null); + } + }); + } +} + +describe('MongoLogReader', () => { + let mongoLogReader; + let zkMock; + let logger; + + beforeEach(() => { + zkMock = new ZookeeperMock(); + logger = new Logger('test:MongoLogReader'); + mongoLogReader = new MongoLogReader({ + zkClient: zkMock.createClient('localhost:2181'), + kafkaConfig: { hosts: 'localhost:9092' }, + zkConfig: { connectionString: 'localhost:2181' }, + mongoConfig: { logName: 'test-log' }, + logger, + extensions: [], + metricsHandler: {}, + }); + mongoLogReader.setLogConsumer(new MockLogConsumer()); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should initialize MongoLogReader correctly', () => { + assert(mongoLogReader); + assert(mongoLogReader.logConsumer instanceof MockLogConsumer); + assert.strictEqual(mongoLogReader._mongoConfig.logName, 'test-log'); + }); + }); + + describe('setup', () => { + it('should setup MongoDB connection successfully', done => { + const connectMongoStub = sinon.stub(mongoLogReader.logConsumer, 'connectMongo') + .callsFake(callback => callback(null)); + mongoLogReader.setup(err => { + assert.ifError(err); + assert(connectMongoStub.calledOnce); + done(); + }); + }); + + it('should handle MongoDB connection error', done => { + const connectMongoStub = sinon.stub(mongoLogReader.logConsumer, 'connectMongo') + .callsFake(callback => callback(new Error('connection error'))); + mongoLogReader.setup(err => { + assert.strictEqual(err.message, 'connection error'); + assert(connectMongoStub.calledOnce); + done(); + }); + }); + }); + + describe('getLogInfo', () => { + it('should return log info', () => { + const logInfo = mongoLogReader.getLogInfo(); + assert.deepStrictEqual(logInfo, { logName: 'test-log' }); + }); + }); + + describe('getMetricLabels', () => { + it('should return metric labels', () => { + const metricLabels = mongoLogReader.getMetricLabels(); + assert.deepStrictEqual(metricLabels, { + logName: 'mongo-log', + logId: 'test-log', + }); + }); + }); +}); diff --git a/tests/unit/lib/util/circuitBreaker.spec.js b/tests/unit/lib/util/circuitBreaker.spec.js index 37c43469b..61d3f01d6 100644 --- a/tests/unit/lib/util/circuitBreaker.spec.js +++ b/tests/unit/lib/util/circuitBreaker.spec.js @@ -165,6 +165,10 @@ describe('updateCircuitBreakerConfigForImplicitOutputQueue', () => { }); describe('startCircuitBreakerMetricsExport', () => { + beforeEach(() => { + circuitBreakerGauge.reset(); + circuitBreakerCounter.reset(); + }); it('should export circuit breaker state and not increment counter', done => { const cb = { state: 1234, failedProbes: false }; startCircuitBreakerMetricsExport(cb, 'test', 10);