From da505b5ce09f3ef8ed3cfd16ca4c297e5a9c78cb Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Mon, 27 Dec 2021 23:30:45 +0100 Subject: [PATCH] Close #3: Implement basic metrics Signed-off-by: Lee Smet --- src/cas/block_stream.rs | 12 ++++- src/cas/fs.rs | 20 ++++++--- src/metrics.rs | 98 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 122 insertions(+), 8 deletions(-) diff --git a/src/cas/block_stream.rs b/src/cas/block_stream.rs index 03e1ccd..f8ee12a 100644 --- a/src/cas/block_stream.rs +++ b/src/cas/block_stream.rs @@ -1,3 +1,5 @@ +use crate::metrics::SharedMetrics; + use super::range_request::RangeRequest; use futures::{ready, AsyncRead, AsyncSeek, Future, Stream}; use hyper::body::Bytes; @@ -13,6 +15,7 @@ pub struct BlockStream { paths: Vec<(PathBuf, usize)>, fp: usize, // pointer to current file path size: usize, + metrics: SharedMetrics, processed: usize, has_seeked: bool, range: RangeRequest, @@ -21,12 +24,18 @@ pub struct BlockStream { } impl BlockStream { - pub fn new(paths: Vec<(PathBuf, usize)>, size: usize, range: RangeRequest) -> Self { + pub fn new( + paths: Vec<(PathBuf, usize)>, + size: usize, + range: RangeRequest, + metrics: SharedMetrics, + ) -> Self { Self { paths, fp: 0, file: None, size, + metrics, has_seeked: true, processed: 0, open_fut: None, @@ -91,6 +100,7 @@ impl Stream for BlockStream { Poll::Ready(Ok(n)) => { self.processed += n; buf.truncate(n); + self.metrics.bytes_sent(n); Poll::Ready(Some(Ok(buf.into()))) } }; diff --git a/src/cas/fs.rs b/src/cas/fs.rs index b9e2b96..bd2ba97 100644 --- a/src/cas/fs.rs +++ b/src/cas/fs.rs @@ -242,6 +242,7 @@ impl CasFS { if let Ok(bytes) = maybe_bytes { content_hash.update(bytes); size += bytes.len() as u64; + self.metrics.bytes_received(bytes.len()); } }) .zip(stream::repeat((tx, block_map, path_map))) @@ -318,12 +319,13 @@ impl CasFS { return; } Ok(false) => { + self.metrics.block_ignored(); if let Err(e) = tx.send(Ok((idx, block_hash))).await { eprintln!("Could not send block id: {}", e); } return; } - Ok(true) => {} + Ok(true) => self.metrics.block_pending(), // We don't abort manually so this can't happen Err(sled::transaction::TransactionError::Abort(_)) => unreachable!(), }; @@ -338,6 +340,7 @@ impl CasFS { Ok(None) => unreachable!(), Err(e) => { if let Err(e) = tx.send(Err(e.into())).await { + self.metrics.block_write_error(); eprintln!("Could not send db error: {}", e); } return; @@ -347,17 +350,24 @@ impl CasFS { let block_path = block.disk_path(self.root.clone()); if let Err(e) = async_fs::create_dir_all(block_path.parent().unwrap()).await { if let Err(e) = tx.send(Err(e)).await { + self.metrics.block_write_error(); eprintln!("Could not send path create error: {}", e); + return; } } - if let Err(e) = async_fs::write(block_path, bytes).await { + if let Err(e) = async_fs::write(block_path, &bytes).await { if let Err(e) = tx.send(Err(e)).await { + self.metrics.block_write_error(); eprintln!("Could not send block write error: {}", e); + return; } } + self.metrics.block_written(bytes.len()); + if let Err(e) = tx.send(Ok((idx, block_hash))).await { eprintln!("Could not send block id: {}", e); + return; } }, ) @@ -561,6 +571,8 @@ impl S3Storage for CasFS { trace_try!(self.bucket_delete(&bucket).await); + self.metrics.dec_bucket_count(); + Ok(DeleteBucketOutput) } @@ -576,8 +588,6 @@ impl S3Storage for CasFS { trace_try!(self.delete_object(&bucket, &key).await); - self.metrics.dec_bucket_count(); - Ok(DeleteObjectOutput::default()) } @@ -674,7 +684,7 @@ impl S3Storage for CasFS { paths.push((block_meta.disk_path(self.root.clone()), block_meta.size())); } debug_assert!(obj_meta.size() as usize == block_size); - let block_stream = BlockStream::new(paths, block_size, range); + let block_stream = BlockStream::new(paths, block_size, range, self.metrics.clone()); // TODO: part count let stream = ByteStream::new_with_size(block_stream, stream_size as usize); diff --git a/src/metrics.rs b/src/metrics.rs index 98fb364..767cf3c 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,5 +1,8 @@ use async_trait::async_trait; -use prometheus::{register_int_counter_vec, register_int_gauge, IntCounterVec, IntGauge}; +use prometheus::{ + register_int_counter, register_int_counter_vec, register_int_gauge, IntCounter, IntCounterVec, + IntGauge, +}; use s3_server::S3Storage; use std::{ops::Deref, sync::Arc}; @@ -52,6 +55,13 @@ impl Deref for SharedMetrics { pub struct Metrics { method_calls: IntCounterVec, bucket_count: IntGauge, + data_bytes_received: IntCounter, + data_bytes_sent: IntCounter, + data_bytes_written: IntCounter, + data_blocks_written: IntCounter, + data_blocks_ignored: IntCounter, + data_blocks_pending_write: IntGauge, + data_blocks_write_errors: IntCounter, } // TODO: this can be improved, make sure this does not crash on multiple instances; @@ -72,11 +82,58 @@ impl Metrics { "s3_bucket_count", "Amount of active buckets in the S3 instance" ) - .expect("can register an int gauge"); + .expect("can register an int gauge in the default registry"); + + let data_bytes_received = register_int_counter!( + "s3_data_bytes_received", + "Amount of bytes of actual data received" + ) + .expect("can register an int counter in the default registry"); + + let data_bytes_sent = + register_int_counter!("s3_data_bytes_sent", "Amount of bytes of actual data sent") + .expect("can register an int counter in the default registry"); + + let data_bytes_written = register_int_counter!( + "s3_data_bytes_written", + "Amount of bytes of actual data written to block storage" + ) + .expect("can register an int counter in the default registry"); + + let data_blocks_written = register_int_counter!( + "s3_data_blocks_written", + "Amount of data blocks written to block storage" + ) + .expect("can register an int counter in the default registry"); + + let data_blocks_ignored = register_int_counter!( + "s3_data_blocks_ignored", + "Amount of data blocks not written to block storage, because a block with the same hash is already present" + ) + .expect("can register an int counter in the default registry"); + + let data_blocks_pending_write = register_int_gauge!( + "s3_data_blocks_pending_write", + "Amount of data blocks in memory, waiting to be written to block storage" + ) + .expect("can register an int gauge in the default registry"); + + let data_blocks_write_errors = register_int_counter!( + "s3_data_blocks_write_errors", + "Amount of data blocks which could not be written to block storage" + ) + .expect("can register an int counter in the default registry"); Self { method_calls, bucket_count, + data_bytes_received, + data_bytes_sent, + data_bytes_written, + data_blocks_written, + data_blocks_ignored, + data_blocks_pending_write, + data_blocks_write_errors, } } @@ -95,6 +152,43 @@ impl Metrics { pub fn dec_bucket_count(&self) { self.bucket_count.dec() } + + pub fn bytes_received(&self, amount: usize) { + self.data_bytes_received.inc_by(amount as u64) + } + + pub fn bytes_sent(&self, amount: usize) { + self.data_bytes_sent.inc_by(amount as u64) + } + + pub fn bytes_written(&self, amount: usize) { + self.data_bytes_written.inc_by(amount as u64) + } + + pub fn block_pending(&self) { + self.data_blocks_pending_write.inc() + } + + pub fn block_written(&self, block_size: usize) { + self.data_bytes_written.inc_by(block_size as u64); + self.data_blocks_pending_write.dec(); + self.data_blocks_written.inc() + } + + pub fn block_write_error(&self) { + self.data_blocks_pending_write.dec(); + self.data_blocks_write_errors.inc() + } + + pub fn block_ignored(&self) { + self.data_blocks_ignored.inc() + } +} + +impl Default for Metrics { + fn default() -> Self { + Self::new() + } } pub struct MetricFs {