You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, the FirehoseProducer has a very simple linear backoff model. When rate limiting on the Firehose stream is encountered, the recovery time could be greatly improved by implementing a strategy similar to AdaptivePollingRecordPublisher's adaptRecordsToRead method:
/** * Calculates how many records to read each time through the loop based on a target throughput * and the measured frequenecy of the loop. * @param runLoopTimeNanos The total time of one pass through the loop * @param numRecords The number of records of the last read operation * @param recordBatchSizeBytes The total batch size of the last read operation * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch */privateintadaptRecordsToRead(longrunLoopTimeNanos, intnumRecords, longrecordBatchSizeBytes,
intmaxNumberOfRecordsPerFetch) {
if (numRecords != 0 && runLoopTimeNanos != 0) {
longaverageRecordSizeBytes = recordBatchSizeBytes / numRecords;
// Adjust number of records to fetch from the shard depending on current average record size// to optimize 2 Mb / sec read limitsdoubleloopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
doublebytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
// Ensure the value is greater than 0 and not more than 10000LmaxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
// Set metricsmetricsReporter.setLoopFrequencyHz(loopFrequencyHz);
metricsReporter.setBytesPerRead(bytesPerRead);
}
returnmaxNumberOfRecordsPerFetch;
}
The text was updated successfully, but these errors were encountered:
Currently, the
FirehoseProducer
has a very simple linear backoff model. When rate limiting on the Firehose stream is encountered, the recovery time could be greatly improved by implementing a strategy similar toAdaptivePollingRecordPublisher
's adaptRecordsToRead method:The text was updated successfully, but these errors were encountered: