mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: metrics for storage engine (#1574)
* feat: add storage engine region count gauge * test: remove catalog metrics because we can't get a correct number * feat: add metrics for log store write and compaction * fix: address review issues
This commit is contained in:
@@ -16,7 +16,7 @@ use std::collections::HashSet;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{debug, error};
|
||||
use common_telemetry::{debug, error, timer};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -140,6 +140,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
|
||||
#[async_trait::async_trait]
|
||||
impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
|
||||
async fn run(mut self) -> Result<()> {
|
||||
let _timer = timer!(crate::metrics::COMPACT_ELAPSED);
|
||||
self.mark_files_compacting(true);
|
||||
|
||||
let (output, mut compacted) = self.merge_ssts().await.map_err(|e| {
|
||||
|
||||
@@ -22,3 +22,9 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total";
|
||||
pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total";
|
||||
/// Elapsed time of a flush job.
|
||||
pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed";
|
||||
/// Gauge for open regions
|
||||
pub const REGION_COUNT: &str = "storage.region_count";
|
||||
/// Timer for logstore write
|
||||
pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed";
|
||||
/// Elapsed time of a compact job.
|
||||
pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed";
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
|
||||
@@ -123,6 +124,7 @@ impl<S: LogStore> Region for RegionImpl<S> {
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<()> {
|
||||
decrement_gauge!(crate::metrics::REGION_COUNT, 1.0);
|
||||
self.inner.close().await
|
||||
}
|
||||
|
||||
@@ -215,6 +217,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
store_config.file_purger.clone(),
|
||||
);
|
||||
let region = RegionImpl::new(version, store_config);
|
||||
increment_gauge!(crate::metrics::REGION_COUNT, 1.0);
|
||||
|
||||
Ok(region)
|
||||
}
|
||||
@@ -354,6 +357,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
manifest: store_config.manifest,
|
||||
});
|
||||
|
||||
increment_gauge!(crate::metrics::REGION_COUNT, 1.0);
|
||||
Ok(Some(RegionImpl { inner }))
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_telemetry::timer;
|
||||
use futures::{stream, Stream, TryStreamExt};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -106,6 +107,7 @@ impl<S: LogStore> Wal<S> {
|
||||
mut header: WalHeader,
|
||||
payload: Option<&Payload>,
|
||||
) -> Result<Id> {
|
||||
let _timer = timer!(crate::metrics::LOG_STORE_WRITE_ELAPSED);
|
||||
if let Some(p) = payload {
|
||||
header.mutation_types = wal::gen_mutation_types(p);
|
||||
}
|
||||
|
||||
@@ -56,7 +56,6 @@ macro_rules! http_tests {
|
||||
test_prometheus_promql_api,
|
||||
test_prom_http_api,
|
||||
test_metrics_api,
|
||||
test_catalog_metrics,
|
||||
test_scripts_api,
|
||||
test_health_api,
|
||||
test_dashboard_path,
|
||||
@@ -338,51 +337,6 @@ pub async fn test_metrics_api(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_catalog_metrics(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
common_telemetry::init_default_metrics_recorder();
|
||||
|
||||
let (app, mut guard) = setup_test_http_app(store_type, "metrics_api").await;
|
||||
let client = TestClient::new(app);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 3"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
// create new schema
|
||||
let res = client.get("/v1/sql?sql=create database tommy").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 4"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
// create new schema
|
||||
let res = client
|
||||
.get("/v1/sql?sql=create table ints (i BIGINT, ii TIMESTAMP TIME INDEX)")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 4"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
// create new schema
|
||||
let res = client.get("/v1/sql?sql=drop table ints").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 4"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_scripts_api(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "script_api").await;
|
||||
|
||||
Reference in New Issue
Block a user