From 497b1f9dc987e118bbd482e0b92e36889a33db7c Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 15 May 2023 07:22:00 +0000 Subject: [PATCH] feat: metrics for storage engine (#1574) * feat: add storage engine region count gauge * test: remove catalog metrics because we can't get a correct number * feat: add metrics for log store write and compaction * fix: address review issues --- src/storage/src/compaction/task.rs | 3 +- src/storage/src/metrics.rs | 6 ++++ src/storage/src/region.rs | 4 +++ src/storage/src/wal.rs | 2 ++ tests-integration/tests/http.rs | 46 ------------------------------ 5 files changed, 14 insertions(+), 47 deletions(-) diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 87b077321c..b37b3ff9bf 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use common_base::readable_size::ReadableSize; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, timer}; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -140,6 +140,7 @@ impl CompactionTaskImpl { #[async_trait::async_trait] impl CompactionTask for CompactionTaskImpl { async fn run(mut self) -> Result<()> { + let _timer = timer!(crate::metrics::COMPACT_ELAPSED); self.mark_files_compacting(true); let (output, mut compacted) = self.merge_ssts().await.map_err(|e| { diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index d37af341d0..640c6b5914 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -22,3 +22,9 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total"; pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total"; /// Elapsed time of a flush job. pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed"; +/// Gauge for open regions +pub const REGION_COUNT: &str = "storage.region_count"; +/// Timer for logstore write +pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed"; +/// Elapsed time of a compact job. +pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed"; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index f9482e4748..69ba0dfd8f 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -23,6 +23,7 @@ use std::time::Duration; use async_trait::async_trait; use common_telemetry::logging; +use metrics::{decrement_gauge, increment_gauge}; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; @@ -123,6 +124,7 @@ impl Region for RegionImpl { } async fn close(&self) -> Result<()> { + decrement_gauge!(crate::metrics::REGION_COUNT, 1.0); self.inner.close().await } @@ -215,6 +217,7 @@ impl RegionImpl { store_config.file_purger.clone(), ); let region = RegionImpl::new(version, store_config); + increment_gauge!(crate::metrics::REGION_COUNT, 1.0); Ok(region) } @@ -354,6 +357,7 @@ impl RegionImpl { manifest: store_config.manifest, }); + increment_gauge!(crate::metrics::REGION_COUNT, 1.0); Ok(Some(RegionImpl { inner })) } diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index fa415bd3db..9b86f022d4 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -16,6 +16,7 @@ use std::pin::Pin; use std::sync::Arc; use common_error::prelude::BoxedError; +use common_telemetry::timer; use futures::{stream, Stream, TryStreamExt}; use prost::Message; use snafu::{ensure, ResultExt}; @@ -106,6 +107,7 @@ impl Wal { mut header: WalHeader, payload: Option<&Payload>, ) -> Result { + let _timer = timer!(crate::metrics::LOG_STORE_WRITE_ELAPSED); if let Some(p) = payload { header.mutation_types = wal::gen_mutation_types(p); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 5fe6bdc5cc..8ee4093ac1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -56,7 +56,6 @@ macro_rules! http_tests { test_prometheus_promql_api, test_prom_http_api, test_metrics_api, - test_catalog_metrics, test_scripts_api, test_health_api, test_dashboard_path, @@ -338,51 +337,6 @@ pub async fn test_metrics_api(store_type: StorageType) { guard.remove_all().await; } -pub async fn test_catalog_metrics(store_type: StorageType) { - common_telemetry::init_default_ut_logging(); - common_telemetry::init_default_metrics_recorder(); - - let (app, mut guard) = setup_test_http_app(store_type, "metrics_api").await; - let client = TestClient::new(app); - - // Call metrics api - let body = client.get("/metrics").send().await.text().await; - assert!(body.contains("catalog_schema_count 3")); - assert!(body.contains("catalog_table_count{db=\"public\"}")); - - // create new schema - let res = client.get("/v1/sql?sql=create database tommy").send().await; - assert_eq!(res.status(), StatusCode::OK); - - // Call metrics api - let body = client.get("/metrics").send().await.text().await; - assert!(body.contains("catalog_schema_count 4")); - assert!(body.contains("catalog_table_count{db=\"public\"}")); - - // create new schema - let res = client - .get("/v1/sql?sql=create table ints (i BIGINT, ii TIMESTAMP TIME INDEX)") - .send() - .await; - assert_eq!(res.status(), StatusCode::OK); - - // Call metrics api - let body = client.get("/metrics").send().await.text().await; - assert!(body.contains("catalog_schema_count 4")); - assert!(body.contains("catalog_table_count{db=\"public\"}")); - - // create new schema - let res = client.get("/v1/sql?sql=drop table ints").send().await; - assert_eq!(res.status(), StatusCode::OK); - - // Call metrics api - let body = client.get("/metrics").send().await.text().await; - assert!(body.contains("catalog_schema_count 4")); - assert!(body.contains("catalog_table_count{db=\"public\"}")); - - guard.remove_all().await; -} - pub async fn test_scripts_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "script_api").await;