Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement/bb 614 remove redis metrics #2588

Open
wants to merge 4 commits into
base: development/9.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 18 additions & 120 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
'use strict'; // eslint-disable-line

const async = require('async');

const Logger = require('werelogs').Logger;
const errors = require('arsenal').errors;
const { replicationBackends, emptyFileMd5 } = require('arsenal').constants;
Expand All @@ -15,16 +13,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const QueueEntry = require('../../lib/models/QueueEntry');
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const MetricsProducer = require('../../lib/MetricsProducer');
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } =
require('../ingestion/constants');

const getContentType = require('./utils/contentTypeHelper');
const BucketMemState = require('./utils/BucketMemState');
const MongoProcessorMetrics = require('./MongoProcessorMetrics');

// batch metrics by location and send to kafka metrics topic every 5 seconds
const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000;

// TODO - ADD PREFIX BASED ON SOURCE
// april 6, 2018

Expand Down Expand Up @@ -76,19 +69,6 @@ class MongoQueueProcessor {
this._mongoClient = new MongoClient(this.mongoClientConfig);
this._bucketMemState = new BucketMemState(Config);

// in-mem batch of metrics, we only track total entry count by location
// this._accruedMetrics = { zenko-location: 10 }
this._accruedMetrics = {};

setInterval(() => {
this._sendMetrics();
}, METRIC_REPORT_INTERVAL_MS);
}

_setupMetricsClients(cb) {
// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig);
this._mProducer.setupProducer(cb);
}

/**
Expand All @@ -97,32 +77,15 @@ class MongoQueueProcessor {
* @return {undefined}
*/
start() {
this.logger.info('starting mongo queue processor');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this log, won't hurt

async.series([
next => this._setupMetricsClients(err => {
if (err) {
this.logger.error('error setting up metrics client', {
method: 'MongoQueueProcessor.start',
error: err,
});
}
return next(err);
}),
next => this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
}
return next(err);
}),
], error => {
if (error) {
this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
this.logger.fatal('error starting mongo queue processor');
Comment on lines +82 to 86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to log twice I would say, just log the fatal message with all the info

Suggested change
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
this.logger.fatal('error starting mongo queue processor');
this.logger.fatal('could not connect to MongoDB, error starting mongo queue processor', {
method: 'MongoQueueProcessor.start',
error: err.message,
});

process.exit(1);
}

this._bootstrapList = Config.getBootstrapList();
Config.on('bootstrap-list-update', () => {
this._bootstrapList = Config.getBootstrapList();
Expand Down Expand Up @@ -164,32 +127,17 @@ class MongoQueueProcessor {
* @return {undefined}
*/
stop(done) {
async.parallel([
next => {
if (this._consumer) {
this.logger.debug('closing kafka consumer', {
method: 'MongoQueueProcessor.stop',
});
return this._consumer.close(next);
}
this.logger.debug('no kafka consumer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'MongoQueueProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
if (this._consumer) {
this.logger.debug('closing kafka consumer', {
method: 'MongoQueueProcessor.stop',
});
this._consumer.close(done);
} else {
this.logger.debug('no kafka consumer to close', {
method: 'MongoQueueProcessor.stop',
});
done();
}
}

_getZenkoObjectMetadata(log, entry, bucketInfo, done) {
Expand Down Expand Up @@ -408,7 +356,6 @@ class MongoQueueProcessor {
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
Expand All @@ -418,7 +365,6 @@ class MongoQueueProcessor {
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata deleted from mongo', {
entry: sourceEntry.getLogInfo(),
location,
Expand All @@ -443,7 +389,6 @@ class MongoQueueProcessor {
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand All @@ -454,7 +399,6 @@ class MongoQueueProcessor {

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
log.end().debug('skipping duplicate entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand Down Expand Up @@ -498,7 +442,6 @@ class MongoQueueProcessor {
return this._mongoClient.putObject(bucket, key, objVal, params,
this.logger, err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error putting object metadata ' +
'to mongo', {
bucket,
Expand All @@ -509,7 +452,6 @@ class MongoQueueProcessor {
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata put to mongo', {
entry: sourceEntry.getLogInfo(),
location,
Expand All @@ -519,49 +461,6 @@ class MongoQueueProcessor {
});
}

/**
* Send accrued metrics by location to kafka
* @return {undefined}
*/
_sendMetrics() {
Object.keys(this._accruedMetrics).forEach(loc => {
const count = this._accruedMetrics[loc];

// only report metrics if something has been recorded for location
if (count > 0) {
this._accruedMetrics[loc] = 0;
const metric = { [loc]: { ops: count } };
this._mProducer.publishMetrics(metric, metricsTypeCompleted,
metricsExtension, () => {});
}
});
}

/**
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS
* @param {string} location - zenko storage location name
* @return {undefined}
*/
_produceMetricCompletionEntry(location) {
if (this._accruedMetrics[location]) {
this._accruedMetrics[location] += 1;
} else {
this._accruedMetrics[location] = 1;
}
}

/**
* For cases where we experience an error or skip an entry, we need to
* normalize pending metric. This means we will see pending metrics stuck
* above 0 and will need to bring those metrics down
* @param {string} location - location constraint name
* @return {undefined}
*/
_normalizePendingMetric(location) {
const metric = { [location]: { ops: 1 } };
this._mProducer.publishMetrics(metric, metricsTypePendingOnly,
metricsExtension, () => {});
}

/**
* Get bucket info in memoize state if exists, otherwise fetch from Mongo
Expand Down Expand Up @@ -639,7 +538,6 @@ class MongoQueueProcessor {
entryType: sourceEntry.constructor.name,
method: 'MongoQueueProcessor.processKafkaEntry',
});
this._normalizePendingMetric(location);
return process.nextTick(done);
});
}
Expand Down
26 changes: 0 additions & 26 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const EchoBucket = require('../tasks/EchoBucket');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry');
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const MetricsProducer = require('../../../lib/MetricsProducer');
const libConstants = require('../../../lib/constants');
const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
Expand Down Expand Up @@ -222,7 +221,6 @@ class QueueProcessor extends EventEmitter {
this.replicationStatusProducer = null;
this._consumer = null;
this._dataMoverConsumer = null;
this._mProducer = null;
this.site = site;
this.mConfig = mConfig;
this.serviceName = this.isReplayTopic ?
Expand Down Expand Up @@ -695,7 +693,6 @@ class QueueProcessor extends EventEmitter {
vaultclientCache: this.vaultclientCache,
accountCredsCache: this.accountCredsCache,
replicationStatusProducer: this.replicationStatusProducer,
mProducer: this._mProducer,
logger: this.logger,
site: this.site,
consumer: this._consumer,
Expand All @@ -722,16 +719,7 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
start(options) {
this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig);
return async.parallel([
done => this._mProducer.setupProducer(err => {
if (err) {
this.logger.info('error setting up metrics producer',
{ error: err.message });
process.exit(1);
}
return done();
}),
done => this._setupProducer(err => {
if (err) {
this.logger.info('error setting up kafka producer',
Expand Down Expand Up @@ -842,20 +830,6 @@ class QueueProcessor extends EventEmitter {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'QueueProcessor.stop',
site: this.site,
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'QueueProcessor.stop',
site: this.site,
});
return next();
},
], done);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const FailedCRRProducer = require('../failedCRR/FailedCRRProducer');
const ReplayProducer = require('../replay/ReplayProducer');
const MetricsProducer = require('../../../lib/MetricsProducer');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');

// StatsClient constant default for site metrics
Expand Down Expand Up @@ -216,7 +215,6 @@ class ReplicationStatusProcessor {
this.gcConfig = gcConfig;
this._consumer = null;
this._gcProducer = null;
this._mProducer = null;

this.logger =
new Logger('Backbeat:Replication:ReplicationStatusProcessor');
Expand Down Expand Up @@ -336,7 +334,6 @@ class ReplicationStatusProcessor {
sourceHTTPAgent: this.sourceHTTPAgent,
vaultclientCache: this.vaultclientCache,
gcProducer: this._gcProducer,
mProducer: this._mProducer,
statsClient: this._statsClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think _statsClient is also part of the "Redis metrics". To be double checked, but i think it also needs to be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in other files.

failedCRRProducer: this._failedCRRProducer,
replayProducers: this._ReplayProducers,
Expand Down Expand Up @@ -379,11 +376,6 @@ class ReplicationStatusProcessor {
done();
}
},
done => {
this._mProducer = new MetricsProducer(this.kafkaConfig,
this.mConfig);
this._mProducer.setupProducer(done);
},
done => {
let consumerReady = false;
this._consumer = new BackbeatConsumer({
Expand Down Expand Up @@ -434,18 +426,6 @@ class ReplicationStatusProcessor {
method: 'ReplicationStatusProcessor.stop',
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'ReplicationStatusProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'ReplicationStatusProcessor.stop',
});
return next();
}
], done);
}
Expand Down
Loading
Loading