feat: collect stager metrics (#5553)

* feat: collect stager metrics

* Apply suggestions from code review

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/mito2/src/metrics.rs

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Yingwen
2025-02-17 15:09:15 +08:00
committed by GitHub
parent f359eeb667
commit 6bba5e0afa
2 changed files with 81 additions and 5 deletions

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use lazy_static::lazy_static;
use prometheus::*;
use puffin::puffin_manager::stager::StagerNotifier;
/// Stage label.
pub const STAGE_LABEL: &str = "stage";
@@ -28,6 +31,10 @@ pub const FILE_TYPE_LABEL: &str = "file_type";
pub const WORKER_LABEL: &str = "worker";
/// Partition label.
pub const PARTITION_LABEL: &str = "partition";
/// Staging dir type label.
pub const STAGING_TYPE: &str = "index_staging";
/// Recycle bin type label.
pub const RECYCLE_TYPE: &str = "recycle_bin";
lazy_static! {
/// Global write buffer size in bytes.
@@ -381,3 +388,68 @@ lazy_static! {
exponential_buckets(0.01, 10.0, 6).unwrap(),
).unwrap();
}
/// Stager notifier to collect metrics.
pub struct StagerMetrics {
cache_hit: IntCounter,
cache_miss: IntCounter,
staging_cache_bytes: IntGauge,
recycle_cache_bytes: IntGauge,
cache_eviction: IntCounter,
staging_miss_read: Histogram,
}
impl StagerMetrics {
/// Creates a new stager notifier.
pub fn new() -> Self {
Self {
cache_hit: CACHE_HIT.with_label_values(&[STAGING_TYPE]),
cache_miss: CACHE_MISS.with_label_values(&[STAGING_TYPE]),
staging_cache_bytes: CACHE_BYTES.with_label_values(&[STAGING_TYPE]),
recycle_cache_bytes: CACHE_BYTES.with_label_values(&[RECYCLE_TYPE]),
cache_eviction: CACHE_EVICTION.with_label_values(&[STAGING_TYPE, "size"]),
staging_miss_read: READ_STAGE_ELAPSED.with_label_values(&["staging_miss_read"]),
}
}
}
impl Default for StagerMetrics {
fn default() -> Self {
Self::new()
}
}
impl StagerNotifier for StagerMetrics {
fn on_cache_hit(&self, _size: u64) {
self.cache_hit.inc();
}
fn on_cache_miss(&self, _size: u64) {
self.cache_miss.inc();
}
fn on_cache_insert(&self, size: u64) {
self.staging_cache_bytes.add(size as i64);
}
fn on_load_dir(&self, duration: Duration) {
self.staging_miss_read.observe(duration.as_secs_f64());
}
fn on_load_blob(&self, duration: Duration) {
self.staging_miss_read.observe(duration.as_secs_f64());
}
fn on_cache_evict(&self, size: u64) {
self.cache_eviction.inc();
self.staging_cache_bytes.sub(size as i64);
}
fn on_recycle_insert(&self, size: u64) {
self.recycle_cache_bytes.add(size as i64);
}
fn on_recycle_clear(&self, size: u64) {
self.recycle_cache_bytes.sub(size as i64);
}
}

View File

@@ -27,8 +27,8 @@ use snafu::ResultExt;
use crate::error::{PuffinInitStagerSnafu, Result};
use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
};
use crate::sst::index::store::{self, InstrumentedStore};
@@ -63,9 +63,13 @@ impl PuffinManagerFactory {
write_buffer_size: Option<usize>,
) -> Result<Self> {
let staging_dir = aux_path.as_ref().join(STAGING_DIR);
let stager = BoundedStager::new(staging_dir, staging_capacity, None)
.await
.context(PuffinInitStagerSnafu)?;
let stager = BoundedStager::new(
staging_dir,
staging_capacity,
Some(Arc::new(StagerMetrics::default())),
)
.await
.context(PuffinInitStagerSnafu)?;
Ok(Self {
stager: Arc::new(stager),
write_buffer_size,