diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 65a8e1dc85..fee9044ae9 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use lazy_static::lazy_static; use prometheus::*; +use puffin::puffin_manager::stager::StagerNotifier; /// Stage label. pub const STAGE_LABEL: &str = "stage"; @@ -28,6 +31,10 @@ pub const FILE_TYPE_LABEL: &str = "file_type"; pub const WORKER_LABEL: &str = "worker"; /// Partition label. pub const PARTITION_LABEL: &str = "partition"; +/// Staging dir type label. +pub const STAGING_TYPE: &str = "index_staging"; +/// Recycle bin type label. +pub const RECYCLE_TYPE: &str = "recycle_bin"; lazy_static! { /// Global write buffer size in bytes. @@ -381,3 +388,68 @@ lazy_static! { exponential_buckets(0.01, 10.0, 6).unwrap(), ).unwrap(); } + +/// Stager notifier to collect metrics. +pub struct StagerMetrics { + cache_hit: IntCounter, + cache_miss: IntCounter, + staging_cache_bytes: IntGauge, + recycle_cache_bytes: IntGauge, + cache_eviction: IntCounter, + staging_miss_read: Histogram, +} + +impl StagerMetrics { + /// Creates a new stager notifier. + pub fn new() -> Self { + Self { + cache_hit: CACHE_HIT.with_label_values(&[STAGING_TYPE]), + cache_miss: CACHE_MISS.with_label_values(&[STAGING_TYPE]), + staging_cache_bytes: CACHE_BYTES.with_label_values(&[STAGING_TYPE]), + recycle_cache_bytes: CACHE_BYTES.with_label_values(&[RECYCLE_TYPE]), + cache_eviction: CACHE_EVICTION.with_label_values(&[STAGING_TYPE, "size"]), + staging_miss_read: READ_STAGE_ELAPSED.with_label_values(&["staging_miss_read"]), + } + } +} + +impl Default for StagerMetrics { + fn default() -> Self { + Self::new() + } +} + +impl StagerNotifier for StagerMetrics { + fn on_cache_hit(&self, _size: u64) { + self.cache_hit.inc(); + } + + fn on_cache_miss(&self, _size: u64) { + self.cache_miss.inc(); + } + + fn on_cache_insert(&self, size: u64) { + self.staging_cache_bytes.add(size as i64); + } + + fn on_load_dir(&self, duration: Duration) { + self.staging_miss_read.observe(duration.as_secs_f64()); + } + + fn on_load_blob(&self, duration: Duration) { + self.staging_miss_read.observe(duration.as_secs_f64()); + } + + fn on_cache_evict(&self, size: u64) { + self.cache_eviction.inc(); + self.staging_cache_bytes.sub(size as i64); + } + + fn on_recycle_insert(&self, size: u64) { + self.recycle_cache_bytes.add(size as i64); + } + + fn on_recycle_clear(&self, size: u64) { + self.recycle_cache_bytes.sub(size as i64); + } +} diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 4b8b6a3dcb..d8559d2e07 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -27,8 +27,8 @@ use snafu::ResultExt; use crate::error::{PuffinInitStagerSnafu, Result}; use crate::metrics::{ - INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL, - INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, + StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, + INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, }; use crate::sst::index::store::{self, InstrumentedStore}; @@ -63,9 +63,13 @@ impl PuffinManagerFactory { write_buffer_size: Option, ) -> Result { let staging_dir = aux_path.as_ref().join(STAGING_DIR); - let stager = BoundedStager::new(staging_dir, staging_capacity, None) - .await - .context(PuffinInitStagerSnafu)?; + let stager = BoundedStager::new( + staging_dir, + staging_capacity, + Some(Arc::new(StagerMetrics::default())), + ) + .await + .context(PuffinInitStagerSnafu)?; Ok(Self { stager: Arc::new(stager), write_buffer_size,