Skip to content

Commit

Permalink
Close #3: Implement basic metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Lee Smet <[email protected]>
  • Loading branch information
LeeSmet committed Dec 27, 2021
1 parent b9753a4 commit da505b5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 8 deletions.
12 changes: 11 additions & 1 deletion src/cas/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::metrics::SharedMetrics;

use super::range_request::RangeRequest;
use futures::{ready, AsyncRead, AsyncSeek, Future, Stream};
use hyper::body::Bytes;
Expand All @@ -13,6 +15,7 @@ pub struct BlockStream {
paths: Vec<(PathBuf, usize)>,
fp: usize, // pointer to current file path
size: usize,
metrics: SharedMetrics,
processed: usize,
has_seeked: bool,
range: RangeRequest,
Expand All @@ -21,12 +24,18 @@ pub struct BlockStream {
}

impl BlockStream {
pub fn new(paths: Vec<(PathBuf, usize)>, size: usize, range: RangeRequest) -> Self {
pub fn new(
paths: Vec<(PathBuf, usize)>,
size: usize,
range: RangeRequest,
metrics: SharedMetrics,
) -> Self {
Self {
paths,
fp: 0,
file: None,
size,
metrics,
has_seeked: true,
processed: 0,
open_fut: None,
Expand Down Expand Up @@ -91,6 +100,7 @@ impl Stream for BlockStream {
Poll::Ready(Ok(n)) => {
self.processed += n;
buf.truncate(n);
self.metrics.bytes_sent(n);
Poll::Ready(Some(Ok(buf.into())))
}
};
Expand Down
20 changes: 15 additions & 5 deletions src/cas/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ impl CasFS {
if let Ok(bytes) = maybe_bytes {
content_hash.update(bytes);
size += bytes.len() as u64;
self.metrics.bytes_received(bytes.len());
}
})
.zip(stream::repeat((tx, block_map, path_map)))
Expand Down Expand Up @@ -318,12 +319,13 @@ impl CasFS {
return;
}
Ok(false) => {
self.metrics.block_ignored();
if let Err(e) = tx.send(Ok((idx, block_hash))).await {
eprintln!("Could not send block id: {}", e);
}
return;
}
Ok(true) => {}
Ok(true) => self.metrics.block_pending(),
// We don't abort manually so this can't happen
Err(sled::transaction::TransactionError::Abort(_)) => unreachable!(),
};
Expand All @@ -338,6 +340,7 @@ impl CasFS {
Ok(None) => unreachable!(),
Err(e) => {
if let Err(e) = tx.send(Err(e.into())).await {
self.metrics.block_write_error();
eprintln!("Could not send db error: {}", e);
}
return;
Expand All @@ -347,17 +350,24 @@ impl CasFS {
let block_path = block.disk_path(self.root.clone());
if let Err(e) = async_fs::create_dir_all(block_path.parent().unwrap()).await {
if let Err(e) = tx.send(Err(e)).await {
self.metrics.block_write_error();
eprintln!("Could not send path create error: {}", e);
return;
}
}
if let Err(e) = async_fs::write(block_path, bytes).await {
if let Err(e) = async_fs::write(block_path, &bytes).await {
if let Err(e) = tx.send(Err(e)).await {
self.metrics.block_write_error();
eprintln!("Could not send block write error: {}", e);
return;
}
}

self.metrics.block_written(bytes.len());

if let Err(e) = tx.send(Ok((idx, block_hash))).await {
eprintln!("Could not send block id: {}", e);
return;
}
},
)
Expand Down Expand Up @@ -561,6 +571,8 @@ impl S3Storage for CasFS {

trace_try!(self.bucket_delete(&bucket).await);

self.metrics.dec_bucket_count();

Ok(DeleteBucketOutput)
}

Expand All @@ -576,8 +588,6 @@ impl S3Storage for CasFS {

trace_try!(self.delete_object(&bucket, &key).await);

self.metrics.dec_bucket_count();

Ok(DeleteObjectOutput::default())
}

Expand Down Expand Up @@ -674,7 +684,7 @@ impl S3Storage for CasFS {
paths.push((block_meta.disk_path(self.root.clone()), block_meta.size()));
}
debug_assert!(obj_meta.size() as usize == block_size);
let block_stream = BlockStream::new(paths, block_size, range);
let block_stream = BlockStream::new(paths, block_size, range, self.metrics.clone());

// TODO: part count
let stream = ByteStream::new_with_size(block_stream, stream_size as usize);
Expand Down
98 changes: 96 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use async_trait::async_trait;
use prometheus::{register_int_counter_vec, register_int_gauge, IntCounterVec, IntGauge};
use prometheus::{
register_int_counter, register_int_counter_vec, register_int_gauge, IntCounter, IntCounterVec,
IntGauge,
};
use s3_server::S3Storage;
use std::{ops::Deref, sync::Arc};

Expand Down Expand Up @@ -52,6 +55,13 @@ impl Deref for SharedMetrics {
pub struct Metrics {
method_calls: IntCounterVec,
bucket_count: IntGauge,
data_bytes_received: IntCounter,
data_bytes_sent: IntCounter,
data_bytes_written: IntCounter,
data_blocks_written: IntCounter,
data_blocks_ignored: IntCounter,
data_blocks_pending_write: IntGauge,
data_blocks_write_errors: IntCounter,
}

// TODO: this can be improved, make sure this does not crash on multiple instances;
Expand All @@ -72,11 +82,58 @@ impl Metrics {
"s3_bucket_count",
"Amount of active buckets in the S3 instance"
)
.expect("can register an int gauge");
.expect("can register an int gauge in the default registry");

let data_bytes_received = register_int_counter!(
"s3_data_bytes_received",
"Amount of bytes of actual data received"
)
.expect("can register an int counter in the default registry");

let data_bytes_sent =
register_int_counter!("s3_data_bytes_sent", "Amount of bytes of actual data sent")
.expect("can register an int counter in the default registry");

let data_bytes_written = register_int_counter!(
"s3_data_bytes_written",
"Amount of bytes of actual data written to block storage"
)
.expect("can register an int counter in the default registry");

let data_blocks_written = register_int_counter!(
"s3_data_blocks_written",
"Amount of data blocks written to block storage"
)
.expect("can register an int counter in the default registry");

let data_blocks_ignored = register_int_counter!(
"s3_data_blocks_ignored",
"Amount of data blocks not written to block storage, because a block with the same hash is already present"
)
.expect("can register an int counter in the default registry");

let data_blocks_pending_write = register_int_gauge!(
"s3_data_blocks_pending_write",
"Amount of data blocks in memory, waiting to be written to block storage"
)
.expect("can register an int gauge in the default registry");

let data_blocks_write_errors = register_int_counter!(
"s3_data_blocks_write_errors",
"Amount of data blocks which could not be written to block storage"
)
.expect("can register an int counter in the default registry");

Self {
method_calls,
bucket_count,
data_bytes_received,
data_bytes_sent,
data_bytes_written,
data_blocks_written,
data_blocks_ignored,
data_blocks_pending_write,
data_blocks_write_errors,
}
}

Expand All @@ -95,6 +152,43 @@ impl Metrics {
pub fn dec_bucket_count(&self) {
self.bucket_count.dec()
}

pub fn bytes_received(&self, amount: usize) {
self.data_bytes_received.inc_by(amount as u64)
}

pub fn bytes_sent(&self, amount: usize) {
self.data_bytes_sent.inc_by(amount as u64)
}

pub fn bytes_written(&self, amount: usize) {
self.data_bytes_written.inc_by(amount as u64)
}

pub fn block_pending(&self) {
self.data_blocks_pending_write.inc()
}

pub fn block_written(&self, block_size: usize) {
self.data_bytes_written.inc_by(block_size as u64);
self.data_blocks_pending_write.dec();
self.data_blocks_written.inc()
}

pub fn block_write_error(&self) {
self.data_blocks_pending_write.dec();
self.data_blocks_write_errors.inc()
}

pub fn block_ignored(&self) {
self.data_blocks_ignored.inc()
}
}

impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}

pub struct MetricFs<T> {
Expand Down

0 comments on commit da505b5

Please sign in to comment.