Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue occupancy histogram #912

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ Please add your name to the end of this file and include this file to the PR, un
* Eran Gampel
* Tamás Lévai
* Matthew Mussomele
* David Naylor
103 changes: 103 additions & 0 deletions bessctl/module_tests/queue_occupancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2016-2019, Nefeli Networks, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the names of the copyright holders nor the names of their
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from test_utils import *


class BessQueueOccupancyTest(BessModuleTestCase):
def _send_packets(self, q):
eth = scapy.Ether(src='02:1e:67:9f:4d:ae', dst='06:16:3e:1b:72:32')
ip = scapy.IP(src='172.16.0.2', dst='8.8.8.8')
tcp = scapy.TCP(sport=52428, dport=80)
l7 = 'helloworld'
pkt = eth / ip / tcp / l7

pkts = [pkt] * 100
_ = self.run_module(q, 0, pkts, [0])
return len(pkts)

def test_hist_enable(self):
q = Queue(size=1024, track_occupancy=True)
sent = self._send_packets(q)
resp = q.get_status()
self.assertEqual(resp.enqueued, sent)
self.assertEqual(resp.dequeued, sent)
self.assertEqual(resp.occupancy_summary.count, sent)

def test_hist_disable(self):
q = Queue(size=1024, track_occupancy=False)
sent = self._send_packets(q)
resp = q.get_status()
self.assertEqual(resp.enqueued, sent)
self.assertEqual(resp.dequeued, sent)
self.assertEqual(resp.occupancy_summary.count, 0)

def test_hist_size(self):
q = Queue(size=1024, track_occupancy=True)
resp = q.get_status()
self.assertEqual(resp.size, 1024)
self.assertEqual(resp.occupancy_summary.num_buckets, 32)
self.assertEqual(resp.occupancy_summary.bucket_width, 32)

q.set_size(size=2048)
resp = q.get_status()
self.assertEqual(resp.size, 2048)
self.assertEqual(resp.occupancy_summary.num_buckets, 32)
self.assertEqual(resp.occupancy_summary.bucket_width, 64)

q = Queue(size=1024, track_occupancy=True, occupancy_hist_buckets=64)
resp = q.get_status()
self.assertEqual(resp.size, 1024)
self.assertEqual(resp.occupancy_summary.num_buckets, 64)
self.assertEqual(resp.occupancy_summary.bucket_width, 16)

def test_hist_summary(self):
q = Queue(size=1024, track_occupancy=True)
sent = self._send_packets(q)

resp = q.get_status(occupancy_percentiles=[0.5, 0.9, 0.99])
self.assertEqual(resp.occupancy_summary.count, 100)
self.assertEqual(len(resp.occupancy_summary.percentile_values), 3)

resp = q.get_status(occupancy_percentiles=[0, 0.5, 0.9, 0.99])
self.assertEqual(resp.occupancy_summary.count, 100)
self.assertEqual(len(resp.occupancy_summary.percentile_values), 4)

resp = q.get_status(clear_hist=True)
self.assertEqual(resp.occupancy_summary.count, 100)

resp = q.get_status()
self.assertEqual(resp.occupancy_summary.count, 0)


suite = unittest.TestLoader().loadTestsFromTestCase(BessQueueOccupancyTest)
results = unittest.TextTestRunner(verbosity=2, stream=sys.stdout).run(suite)

if results.failures or results.errors:
sys.exit(1)
15 changes: 5 additions & 10 deletions core/modules/measure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,6 @@ void Measure::Clear() {
mcs_unlock(&lock_, &mynode);
}

static bool IsValidPercentiles(const std::vector<double> &percentiles) {
if (percentiles.empty()) {
return true;
}

return std::is_sorted(percentiles.cbegin(), percentiles.cend()) &&
*std::min_element(percentiles.cbegin(), percentiles.cend()) >= 0.0 &&
*std::max_element(percentiles.cbegin(), percentiles.cend()) <= 100.0;
}

CommandResponse Measure::CommandGetSummary(
const bess::pb::MeasureCommandGetSummaryArg &arg) {
bess::pb::MeasureCommandGetSummaryResponse r;
Expand All @@ -211,9 +201,14 @@ CommandResponse Measure::CommandGetSummary(
const auto &rtt = rtt_hist_.Summarize(latency_percentiles);
const auto &jitter = jitter_hist_.Summarize(jitter_percentiles);

// TODO(dnaylor): latency and jitter are deprecated in favor of latency_ns
// and jitter_ns; remove these eventually.
SetHistogram(r.mutable_latency(), rtt, rtt_hist_.bucket_width());
SetHistogram(r.mutable_jitter(), jitter, jitter_hist_.bucket_width());

SetSummary(r.mutable_latency_ns(), rtt);
SetSummary(r.mutable_jitter_ns(), jitter);

if (arg.clear()) {
// Note that some samples might be lost due to the small gap between
// Summarize() and the next mcs_lock... but we posit that smaller
Expand Down
61 changes: 57 additions & 4 deletions core/modules/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

#include "../utils/format.h"

#define DEFAULT_QUEUE_SIZE 1024

const Commands Queue::cmds = {
{"set_burst", "QueueCommandSetBurstArg",
MODULE_CMD_FUNC(&Queue::CommandSetBurst), Command::THREAD_SAFE},
Expand Down Expand Up @@ -79,6 +77,10 @@ int Queue::Resize(int slots) {
queue_ = new_queue;
size_ = slots;

if (track_occupancy_) {
occupancy_hist_.Resize(occupancy_buckets_, slots / occupancy_buckets_);
}

if (backpressure_) {
AdjustWaterLevels();
}
Expand All @@ -97,6 +99,15 @@ CommandResponse Queue::Init(const bess::pb::QueueArg &arg) {

burst_ = bess::PacketBatch::kMaxBurst;

if (arg.track_occupancy()) {
track_occupancy_ = true;
occupancy_buckets_ = kDefaultBuckets;
if (arg.occupancy_hist_buckets() != 0) {
occupancy_buckets_ = arg.occupancy_hist_buckets();
}
VLOG(1) << "Occupancy tracking enabled for " << name() << "::Queue (" << occupancy_buckets_ << " buckets)";
}

if (arg.backpressure()) {
VLOG(1) << "Backpressure enabled for " << name() << "::Queue";
backpressure_ = true;
Expand Down Expand Up @@ -191,7 +202,19 @@ struct task_result Queue::RunTask(Context *ctx, bess::PacketBatch *batch,

RunNextModule(ctx, batch);

if (backpressure_ && llring_count(queue_) < low_water_) {
uint32_t occupancy;
if (track_occupancy_ || backpressure_) {
occupancy = llring_count(queue_);
}

if (track_occupancy_) {
mcslock_node_t mynode;
mcs_lock(&lock_, &mynode);
occupancy_hist_.Insert(occupancy);
mcs_unlock(&lock_, &mynode);
}

if (backpressure_ && occupancy < low_water_) {
SignalUnderload();
}

Expand Down Expand Up @@ -236,16 +259,46 @@ CommandResponse Queue::CommandSetSize(
}

CommandResponse Queue::CommandGetStatus(
const bess::pb::QueueCommandGetStatusArg &) {
const bess::pb::QueueCommandGetStatusArg &arg) {
bess::pb::QueueCommandGetStatusResponse resp;

std::vector<double> occupancy_percentiles;
std::copy(arg.occupancy_percentiles().begin(), arg.occupancy_percentiles().end(),
back_inserter(occupancy_percentiles));
if (!IsValidPercentiles(occupancy_percentiles)) {
return CommandFailure(EINVAL, "invalid 'occupancy_percentiles'");
}
const auto &occupancy_summary = occupancy_hist_.Summarize(occupancy_percentiles);

resp.set_count(llring_count(queue_));
resp.set_size(size_);
resp.set_enqueued(stats_.enqueued);
resp.set_dequeued(stats_.dequeued);
resp.set_dropped(stats_.dropped);
SetSummary(resp.mutable_occupancy_summary(), occupancy_summary);

if (arg.clear_hist()) {
// Note that some samples might be lost due to the small gap between
// Summarize() and the next mcs_lock... but we posit that smaller
// critical section is more important.
ClearOccupancyHist();
}

return CommandSuccess(resp);
}

void Queue::ClearOccupancyHist() {
// vector initialization is expensive thus should be out of critical section
decltype(occupancy_hist_) new_occupancy_hist(occupancy_hist_.num_buckets(),
occupancy_hist_.bucket_width());

// Use move semantics to minimize critical section
mcslock_node_t mynode;
mcs_lock(&lock_, &mynode);
occupancy_hist_ = std::move(new_occupancy_hist);
mcs_unlock(&lock_, &mynode);
}

void Queue::AdjustWaterLevels() {
high_water_ = static_cast<uint64_t>(size_ * kHighWaterRatio);
low_water_ = static_cast<uint64_t>(size_ * kLowWaterRatio);
Expand Down
18 changes: 17 additions & 1 deletion core/modules/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "../kmod/llring.h"
#include "../module.h"
#include "../pb/module_msg.pb.h"
#include "../utils/histogram.h"
#include "../utils/mcslock.h"

#define DEFAULT_QUEUE_SIZE 1024

class Queue : public Module {
public:
Expand All @@ -48,7 +52,9 @@ class Queue : public Module {
size_(),
high_water_(),
low_water_(),
stats_() {
stats_(),
track_occupancy_(),
occupancy_hist_(kDefaultBuckets, kDefaultBucketWidth) {
is_task_ = true;
propagate_workers_ = false;
max_allowed_workers_ = Worker::kMaxWorkers;
Expand Down Expand Up @@ -77,6 +83,8 @@ class Queue : public Module {

int Resize(int slots);

void ClearOccupancyHist();

// Readjusts the water level according to `size_`.
void AdjustWaterLevels();

Expand Down Expand Up @@ -105,6 +113,14 @@ class Queue : public Module {
uint64_t dequeued;
uint64_t dropped;
} stats_;

// Queue occupancy statistics
const uint64_t kDefaultBuckets = 32;
const uint64_t kDefaultBucketWidth = DEFAULT_QUEUE_SIZE / kDefaultBuckets;
bool track_occupancy_;
uint64_t occupancy_buckets_;
Histogram<uint64_t> occupancy_hist_;
mcslock lock_;
};

#endif // BESS_MODULES_QUEUE_H_
40 changes: 40 additions & 0 deletions core/utils/histogram.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2016-2019, Nefeli Networks, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the names of the copyright holders nor the names of their
// contributors may be used to endorse or promote products derived from this
// software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

#include "histogram.h"

bool IsValidPercentiles(const std::vector<double> &percentiles) {
if (percentiles.empty()) {
return true;
}

return std::is_sorted(percentiles.cbegin(), percentiles.cend()) &&
*std::min_element(percentiles.cbegin(), percentiles.cend()) >= 0.0 &&
*std::max_element(percentiles.cbegin(), percentiles.cend()) <= 100.0;
}
29 changes: 26 additions & 3 deletions core/utils/histogram.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2014-2016, The Regents of the University of California.
// Copyright (c) 2016-2017, Nefeli Networks, Inc.
// Copyright (c) 2016-2019, Nefeli Networks, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -42,6 +42,8 @@

#include <glog/logging.h>

#include "../pb/util_msg.pb.h"

// Class for general purpose histogram. T generally should be an
// integral type, though floating point types will also work.
// A bin b_i corresponds for the range [i * width, (i + 1) * width)
Expand All @@ -52,7 +54,9 @@ class Histogram {
public:
static_assert(std::is_arithmetic<T>::value, "Arithmetic type required.");
struct Summary {
size_t count; // # of all samples. If 0, min, max and avg are also 0
size_t num_buckets; // Number of buckets in the histogram
size_t bucket_width; // Resolution of the measured data
size_t count; // # of samples (including above_range). If 0, min, max and avg are also 0
size_t above_range; // # of samples beyond the histogram range
T min; // Min value
T max; // Max value. May be underestimated if above_range > 0
Expand Down Expand Up @@ -124,6 +128,8 @@ class Histogram {
// percentile_values
const Summary Summarize(const std::vector<double> &percentiles = {}) const {
Summary ret = {};
ret.num_buckets = num_buckets();
ret.bucket_width = bucket_width_;
uint64_t count = std::accumulate(buckets_.begin(), buckets_.end(), 0);
ret.count = count;
ret.above_range = buckets_.back();
Expand Down Expand Up @@ -175,7 +181,7 @@ class Histogram {
return ret;
}

size_t num_buckets() const { return buckets_.size(); }
size_t num_buckets() const { return buckets_.size() - 1; }
T bucket_width() const { return bucket_width_; }

size_t max_num_buckets() const {
Expand All @@ -201,4 +207,21 @@ class Histogram {
std::vector<std::atomic<uint64_t>> buckets_;
};

bool IsValidPercentiles(const std::vector<double> &percentiles);

template <typename T>
void SetSummary(bess::pb::HistogramSummary *r, const T &summary) {
r->set_num_buckets(summary.num_buckets);
r->set_bucket_width(summary.bucket_width);
r->set_count(summary.count);
r->set_above_range(summary.above_range);
r->set_min(summary.min);
r->set_max(summary.max);
r->set_avg(summary.avg);
r->set_total(summary.total);
for (const auto &val : summary.percentile_values) {
r->add_percentile_values(val);
}
}

#endif // BESS_UTILS_HISTOGRAM_H_
Loading