-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improvement/bb 614 remove redis metrics #2588
base: development/9.0
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,7 +1,5 @@ | ||||||||||||||||||||
'use strict'; // eslint-disable-line | ||||||||||||||||||||
|
||||||||||||||||||||
const async = require('async'); | ||||||||||||||||||||
|
||||||||||||||||||||
const Logger = require('werelogs').Logger; | ||||||||||||||||||||
const errors = require('arsenal').errors; | ||||||||||||||||||||
const { replicationBackends, emptyFileMd5 } = require('arsenal').constants; | ||||||||||||||||||||
|
@@ -15,16 +13,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer'); | |||||||||||||||||||
const QueueEntry = require('../../lib/models/QueueEntry'); | ||||||||||||||||||||
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry'); | ||||||||||||||||||||
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry'); | ||||||||||||||||||||
const MetricsProducer = require('../../lib/MetricsProducer'); | ||||||||||||||||||||
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } = | ||||||||||||||||||||
require('../ingestion/constants'); | ||||||||||||||||||||
|
||||||||||||||||||||
const getContentType = require('./utils/contentTypeHelper'); | ||||||||||||||||||||
const BucketMemState = require('./utils/BucketMemState'); | ||||||||||||||||||||
const MongoProcessorMetrics = require('./MongoProcessorMetrics'); | ||||||||||||||||||||
|
||||||||||||||||||||
// batch metrics by location and send to kafka metrics topic every 5 seconds | ||||||||||||||||||||
const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000; | ||||||||||||||||||||
|
||||||||||||||||||||
// TODO - ADD PREFIX BASED ON SOURCE | ||||||||||||||||||||
// april 6, 2018 | ||||||||||||||||||||
|
||||||||||||||||||||
|
@@ -76,19 +69,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
this._mongoClient = new MongoClient(this.mongoClientConfig); | ||||||||||||||||||||
this._bucketMemState = new BucketMemState(Config); | ||||||||||||||||||||
|
||||||||||||||||||||
// in-mem batch of metrics, we only track total entry count by location | ||||||||||||||||||||
// this._accruedMetrics = { zenko-location: 10 } | ||||||||||||||||||||
this._accruedMetrics = {}; | ||||||||||||||||||||
|
||||||||||||||||||||
setInterval(() => { | ||||||||||||||||||||
this._sendMetrics(); | ||||||||||||||||||||
}, METRIC_REPORT_INTERVAL_MS); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
_setupMetricsClients(cb) { | ||||||||||||||||||||
// Metrics Producer | ||||||||||||||||||||
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig); | ||||||||||||||||||||
this._mProducer.setupProducer(cb); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
|
@@ -97,32 +77,15 @@ class MongoQueueProcessor { | |||||||||||||||||||
* @return {undefined} | ||||||||||||||||||||
*/ | ||||||||||||||||||||
start() { | ||||||||||||||||||||
this.logger.info('starting mongo queue processor'); | ||||||||||||||||||||
async.series([ | ||||||||||||||||||||
next => this._setupMetricsClients(err => { | ||||||||||||||||||||
if (err) { | ||||||||||||||||||||
this.logger.error('error setting up metrics client', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.start', | ||||||||||||||||||||
error: err, | ||||||||||||||||||||
}); | ||||||||||||||||||||
} | ||||||||||||||||||||
return next(err); | ||||||||||||||||||||
}), | ||||||||||||||||||||
next => this._mongoClient.setup(err => { | ||||||||||||||||||||
if (err) { | ||||||||||||||||||||
this.logger.error('could not connect to MongoDB', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.start', | ||||||||||||||||||||
error: err.message, | ||||||||||||||||||||
}); | ||||||||||||||||||||
} | ||||||||||||||||||||
return next(err); | ||||||||||||||||||||
}), | ||||||||||||||||||||
], error => { | ||||||||||||||||||||
if (error) { | ||||||||||||||||||||
this._mongoClient.setup(err => { | ||||||||||||||||||||
if (err) { | ||||||||||||||||||||
this.logger.error('could not connect to MongoDB', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.start', | ||||||||||||||||||||
error: err.message, | ||||||||||||||||||||
}); | ||||||||||||||||||||
this.logger.fatal('error starting mongo queue processor'); | ||||||||||||||||||||
Comment on lines
+82
to
86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to log twice I would say, just log the fatal message with all the info
Suggested change
|
||||||||||||||||||||
process.exit(1); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
this._bootstrapList = Config.getBootstrapList(); | ||||||||||||||||||||
Config.on('bootstrap-list-update', () => { | ||||||||||||||||||||
this._bootstrapList = Config.getBootstrapList(); | ||||||||||||||||||||
|
@@ -164,32 +127,17 @@ class MongoQueueProcessor { | |||||||||||||||||||
* @return {undefined} | ||||||||||||||||||||
*/ | ||||||||||||||||||||
stop(done) { | ||||||||||||||||||||
async.parallel([ | ||||||||||||||||||||
next => { | ||||||||||||||||||||
if (this._consumer) { | ||||||||||||||||||||
this.logger.debug('closing kafka consumer', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.stop', | ||||||||||||||||||||
}); | ||||||||||||||||||||
return this._consumer.close(next); | ||||||||||||||||||||
} | ||||||||||||||||||||
this.logger.debug('no kafka consumer to close', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.stop', | ||||||||||||||||||||
}); | ||||||||||||||||||||
return next(); | ||||||||||||||||||||
}, | ||||||||||||||||||||
next => { | ||||||||||||||||||||
if (this._mProducer) { | ||||||||||||||||||||
this.logger.debug('closing metrics producer', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.stop', | ||||||||||||||||||||
}); | ||||||||||||||||||||
return this._mProducer.close(next); | ||||||||||||||||||||
} | ||||||||||||||||||||
this.logger.debug('no metrics producer to close', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.stop', | ||||||||||||||||||||
}); | ||||||||||||||||||||
return next(); | ||||||||||||||||||||
}, | ||||||||||||||||||||
], done); | ||||||||||||||||||||
if (this._consumer) { | ||||||||||||||||||||
this.logger.debug('closing kafka consumer', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.stop', | ||||||||||||||||||||
}); | ||||||||||||||||||||
this._consumer.close(done); | ||||||||||||||||||||
} else { | ||||||||||||||||||||
this.logger.debug('no kafka consumer to close', { | ||||||||||||||||||||
method: 'MongoQueueProcessor.stop', | ||||||||||||||||||||
}); | ||||||||||||||||||||
done(); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
_getZenkoObjectMetadata(log, entry, bucketInfo, done) { | ||||||||||||||||||||
|
@@ -408,7 +356,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
return this._mongoClient.deleteObject(bucket, key, options, log, | ||||||||||||||||||||
err => { | ||||||||||||||||||||
if (err) { | ||||||||||||||||||||
this._normalizePendingMetric(location); | ||||||||||||||||||||
log.end().error('error deleting object metadata ' + | ||||||||||||||||||||
'from mongo', { | ||||||||||||||||||||
bucket, | ||||||||||||||||||||
|
@@ -418,7 +365,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
}); | ||||||||||||||||||||
return done(err); | ||||||||||||||||||||
} | ||||||||||||||||||||
this._produceMetricCompletionEntry(location); | ||||||||||||||||||||
log.end().info('object metadata deleted from mongo', { | ||||||||||||||||||||
entry: sourceEntry.getLogInfo(), | ||||||||||||||||||||
location, | ||||||||||||||||||||
|
@@ -443,7 +389,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, | ||||||||||||||||||||
(err, zenkoObjMd) => { | ||||||||||||||||||||
if (err) { | ||||||||||||||||||||
this._normalizePendingMetric(location); | ||||||||||||||||||||
log.end().error('error processing object queue entry', { | ||||||||||||||||||||
method: 'MongoQueueProcessor._processObjectQueueEntry', | ||||||||||||||||||||
entry: sourceEntry.getLogInfo(), | ||||||||||||||||||||
|
@@ -454,7 +399,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
|
||||||||||||||||||||
const content = getContentType(sourceEntry, zenkoObjMd); | ||||||||||||||||||||
if (content.length === 0) { | ||||||||||||||||||||
this._normalizePendingMetric(location); | ||||||||||||||||||||
log.end().debug('skipping duplicate entry', { | ||||||||||||||||||||
method: 'MongoQueueProcessor._processObjectQueueEntry', | ||||||||||||||||||||
entry: sourceEntry.getLogInfo(), | ||||||||||||||||||||
|
@@ -498,7 +442,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
return this._mongoClient.putObject(bucket, key, objVal, params, | ||||||||||||||||||||
this.logger, err => { | ||||||||||||||||||||
if (err) { | ||||||||||||||||||||
this._normalizePendingMetric(location); | ||||||||||||||||||||
log.end().error('error putting object metadata ' + | ||||||||||||||||||||
'to mongo', { | ||||||||||||||||||||
bucket, | ||||||||||||||||||||
|
@@ -509,7 +452,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
}); | ||||||||||||||||||||
return done(err); | ||||||||||||||||||||
} | ||||||||||||||||||||
this._produceMetricCompletionEntry(location); | ||||||||||||||||||||
log.end().info('object metadata put to mongo', { | ||||||||||||||||||||
entry: sourceEntry.getLogInfo(), | ||||||||||||||||||||
location, | ||||||||||||||||||||
|
@@ -519,49 +461,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
}); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
* Send accrued metrics by location to kafka | ||||||||||||||||||||
* @return {undefined} | ||||||||||||||||||||
*/ | ||||||||||||||||||||
_sendMetrics() { | ||||||||||||||||||||
Object.keys(this._accruedMetrics).forEach(loc => { | ||||||||||||||||||||
const count = this._accruedMetrics[loc]; | ||||||||||||||||||||
|
||||||||||||||||||||
// only report metrics if something has been recorded for location | ||||||||||||||||||||
if (count > 0) { | ||||||||||||||||||||
this._accruedMetrics[loc] = 0; | ||||||||||||||||||||
const metric = { [loc]: { ops: count } }; | ||||||||||||||||||||
this._mProducer.publishMetrics(metric, metricsTypeCompleted, | ||||||||||||||||||||
metricsExtension, () => {}); | ||||||||||||||||||||
} | ||||||||||||||||||||
}); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS | ||||||||||||||||||||
* @param {string} location - zenko storage location name | ||||||||||||||||||||
* @return {undefined} | ||||||||||||||||||||
*/ | ||||||||||||||||||||
_produceMetricCompletionEntry(location) { | ||||||||||||||||||||
if (this._accruedMetrics[location]) { | ||||||||||||||||||||
this._accruedMetrics[location] += 1; | ||||||||||||||||||||
} else { | ||||||||||||||||||||
this._accruedMetrics[location] = 1; | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
* For cases where we experience an error or skip an entry, we need to | ||||||||||||||||||||
* normalize pending metric. This means we will see pending metrics stuck | ||||||||||||||||||||
* above 0 and will need to bring those metrics down | ||||||||||||||||||||
* @param {string} location - location constraint name | ||||||||||||||||||||
* @return {undefined} | ||||||||||||||||||||
*/ | ||||||||||||||||||||
_normalizePendingMetric(location) { | ||||||||||||||||||||
const metric = { [location]: { ops: 1 } }; | ||||||||||||||||||||
this._mProducer.publishMetrics(metric, metricsTypePendingOnly, | ||||||||||||||||||||
metricsExtension, () => {}); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
* Get bucket info in memoize state if exists, otherwise fetch from Mongo | ||||||||||||||||||||
|
@@ -639,7 +538,6 @@ class MongoQueueProcessor { | |||||||||||||||||||
entryType: sourceEntry.constructor.name, | ||||||||||||||||||||
method: 'MongoQueueProcessor.processKafkaEntry', | ||||||||||||||||||||
}); | ||||||||||||||||||||
this._normalizePendingMetric(location); | ||||||||||||||||||||
return process.nextTick(done); | ||||||||||||||||||||
}); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry'); | |
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); | ||
const FailedCRRProducer = require('../failedCRR/FailedCRRProducer'); | ||
const ReplayProducer = require('../replay/ReplayProducer'); | ||
const MetricsProducer = require('../../../lib/MetricsProducer'); | ||
const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); | ||
|
||
// StatsClient constant default for site metrics | ||
|
@@ -216,7 +215,6 @@ class ReplicationStatusProcessor { | |
this.gcConfig = gcConfig; | ||
this._consumer = null; | ||
this._gcProducer = null; | ||
this._mProducer = null; | ||
|
||
this.logger = | ||
new Logger('Backbeat:Replication:ReplicationStatusProcessor'); | ||
|
@@ -336,7 +334,6 @@ class ReplicationStatusProcessor { | |
sourceHTTPAgent: this.sourceHTTPAgent, | ||
vaultclientCache: this.vaultclientCache, | ||
gcProducer: this._gcProducer, | ||
mProducer: this._mProducer, | ||
statsClient: this._statsClient, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same in other files. |
||
failedCRRProducer: this._failedCRRProducer, | ||
replayProducers: this._ReplayProducers, | ||
|
@@ -379,11 +376,6 @@ class ReplicationStatusProcessor { | |
done(); | ||
} | ||
}, | ||
done => { | ||
this._mProducer = new MetricsProducer(this.kafkaConfig, | ||
this.mConfig); | ||
this._mProducer.setupProducer(done); | ||
}, | ||
done => { | ||
let consumerReady = false; | ||
this._consumer = new BackbeatConsumer({ | ||
|
@@ -434,18 +426,6 @@ class ReplicationStatusProcessor { | |
method: 'ReplicationStatusProcessor.stop', | ||
}); | ||
return next(); | ||
}, | ||
next => { | ||
if (this._mProducer) { | ||
this.logger.debug('closing metrics producer', { | ||
method: 'ReplicationStatusProcessor.stop', | ||
}); | ||
return this._mProducer.close(next); | ||
} | ||
this.logger.debug('no metrics producer to close', { | ||
method: 'ReplicationStatusProcessor.stop', | ||
}); | ||
return next(); | ||
} | ||
], done); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep this log, won't hurt