mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
feat: add catalog/schema/table count as catalog metrics (#1499)
* feat: add catalog/schema/table count as catalog metrics * test: add integration tests for catalog metrics
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1192,6 +1192,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"meta-client",
|
||||
"metrics",
|
||||
"mito",
|
||||
"object-store",
|
||||
"parking_lot",
|
||||
|
||||
@@ -27,6 +27,7 @@ futures-util.workspace = true
|
||||
key-lock = "0.1"
|
||||
lazy_static = "1.4"
|
||||
meta-client = { path = "../meta-client" }
|
||||
metrics.workspace = true
|
||||
parking_lot = "0.12"
|
||||
regex = "1.6"
|
||||
serde = "1.0"
|
||||
|
||||
@@ -33,6 +33,7 @@ pub mod error;
|
||||
pub mod helper;
|
||||
pub(crate) mod information_schema;
|
||||
pub mod local;
|
||||
mod metrics;
|
||||
pub mod remote;
|
||||
pub mod schema;
|
||||
pub mod system;
|
||||
|
||||
@@ -26,6 +26,7 @@ use common_telemetry::{error, info};
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::vectors::{BinaryVector, UInt8Vector};
|
||||
use futures_util::lock::Mutex;
|
||||
use metrics::increment_gauge;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::engine::manager::TableEngineManagerRef;
|
||||
use table::engine::EngineContext;
|
||||
@@ -370,6 +371,11 @@ impl CatalogManager for LocalCatalogManager {
|
||||
schema
|
||||
.register_table(request.table_name, request.table)
|
||||
.await?;
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(catalog_name, schema_name)],
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
@@ -499,11 +505,15 @@ impl CatalogManager for LocalCatalogManager {
|
||||
catalog
|
||||
.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))
|
||||
.await?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
let catalog_name = request.create_table_request.catalog_name.clone();
|
||||
let schema_name = request.create_table_request.schema_name.clone();
|
||||
|
||||
ensure!(
|
||||
!*self.init_lock.lock().await,
|
||||
IllegalManagerStateSnafu {
|
||||
@@ -513,7 +523,11 @@ impl CatalogManager for LocalCatalogManager {
|
||||
|
||||
let mut sys_table_requests = self.system_table_requests.lock().await;
|
||||
sys_table_requests.push(request);
|
||||
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(&catalog_name, &schema_name)],
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::sync::{Arc, RwLock};
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_telemetry::error;
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table::TableIdProvider;
|
||||
@@ -86,6 +87,11 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
catalog: &request.catalog,
|
||||
schema: &request.schema,
|
||||
})?;
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(&request.catalog, &request.schema)],
|
||||
);
|
||||
schema
|
||||
.register_table(request.table_name, request.table)
|
||||
.await
|
||||
@@ -124,6 +130,11 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
catalog: &request.catalog,
|
||||
schema: &request.schema,
|
||||
})?;
|
||||
decrement_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(&request.catalog, &request.schema)],
|
||||
);
|
||||
schema
|
||||
.deregister_table(&request.table_name)
|
||||
.await
|
||||
@@ -139,6 +150,7 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
catalog
|
||||
.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))
|
||||
.await?;
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -180,6 +192,7 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
name: String,
|
||||
catalog: CatalogProviderRef,
|
||||
) -> Result<Option<CatalogProviderRef>> {
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
self.register_catalog_sync(name, catalog)
|
||||
}
|
||||
|
||||
@@ -255,6 +268,7 @@ impl MemoryCatalogProvider {
|
||||
!schemas.contains_key(&name),
|
||||
error::SchemaExistsSnafu { schema: &name }
|
||||
);
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
|
||||
Ok(schemas.insert(name, schema))
|
||||
}
|
||||
|
||||
|
||||
26
src/catalog/src/metrics.rs
Normal file
26
src/catalog/src/metrics.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_catalog::build_db_string;
|
||||
|
||||
pub(crate) const METRIC_DB_LABEL: &str = "db";
|
||||
|
||||
pub(crate) const METRIC_CATALOG_MANAGER_CATALOG_COUNT: &str = "catalog.catalog_count";
|
||||
pub(crate) const METRIC_CATALOG_MANAGER_SCHEMA_COUNT: &str = "catalog.schema_count";
|
||||
pub(crate) const METRIC_CATALOG_MANAGER_TABLE_COUNT: &str = "catalog.table_count";
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn db_label(catalog: &str, schema: &str) -> (&'static str, String) {
|
||||
(METRIC_DB_LABEL, build_db_string(catalog, schema))
|
||||
}
|
||||
@@ -24,6 +24,7 @@ use common_telemetry::{debug, error, info, warn};
|
||||
use dashmap::DashMap;
|
||||
use futures::Stream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use parking_lot::RwLock;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::engine::manager::TableEngineManagerRef;
|
||||
@@ -185,6 +186,7 @@ impl RemoteCatalogManager {
|
||||
.entry(catalog_name.clone())
|
||||
.or_insert_with(|| self.new_catalog_provider(&catalog_name))
|
||||
.clone();
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
|
||||
self.initiate_schemas(&catalog_name, catalog, &mut max_table_id)
|
||||
.await?;
|
||||
@@ -228,6 +230,11 @@ impl RemoteCatalogManager {
|
||||
"Fetch schema from metasrv: {}.{}",
|
||||
&catalog_name, &schema_name
|
||||
);
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(&catalog_name, &schema_name)],
|
||||
);
|
||||
self.initiate_tables(&catalog_name, &schema_name, schema, max_table_id)
|
||||
.await?;
|
||||
}
|
||||
@@ -269,6 +276,11 @@ impl RemoteCatalogManager {
|
||||
table_num += 1;
|
||||
}
|
||||
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(catalog_name, schema_name)],
|
||||
);
|
||||
info!(
|
||||
"initialized tables in {}.{}, total: {}",
|
||||
catalog_name, schema_name, table_num
|
||||
@@ -453,9 +465,16 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(&catalog_name, &schema_name)],
|
||||
);
|
||||
schema_provider
|
||||
.register_table(request.table_name, request.table)
|
||||
.await?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -471,6 +490,11 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
})?
|
||||
.deregister_table(&request.table_name)
|
||||
.await?;
|
||||
decrement_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(catalog_name, schema_name)],
|
||||
);
|
||||
Ok(result.is_none())
|
||||
}
|
||||
|
||||
@@ -487,6 +511,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
catalog_provider
|
||||
.register_schema(schema_name, schema_provider)
|
||||
.await?;
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -517,8 +542,16 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
let catalog_name = request.create_table_request.catalog_name.clone();
|
||||
let schema_name = request.create_table_request.schema_name.clone();
|
||||
|
||||
let mut requests = self.system_table_requests.lock().await;
|
||||
requests.push(request);
|
||||
increment_gauge!(
|
||||
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
|
||||
1.0,
|
||||
&[crate::metrics::db_label(&catalog_name, &schema_name)],
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -595,6 +628,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use consts::DEFAULT_CATALOG_NAME;
|
||||
|
||||
pub mod consts;
|
||||
pub mod error;
|
||||
|
||||
@@ -20,3 +22,23 @@ pub mod error;
|
||||
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
|
||||
format!("{catalog}.{schema}.{table}")
|
||||
}
|
||||
|
||||
/// Build db name from catalog and schema string
|
||||
pub fn build_db_string(catalog: &str, schema: &str) -> String {
|
||||
if catalog == DEFAULT_CATALOG_NAME {
|
||||
schema.to_string()
|
||||
} else {
|
||||
format!("{catalog}-{schema}")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_db_string() {
|
||||
assert_eq!("test", build_db_string(DEFAULT_CATALOG_NAME, "test"));
|
||||
assert_eq!("a0b1c2d3-test", build_db_string("a0b1c2d3", "test"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use common_catalog::build_db_string;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_telemetry::debug;
|
||||
|
||||
@@ -96,11 +97,7 @@ impl QueryContext {
|
||||
pub fn get_db_string(&self) -> String {
|
||||
let catalog = self.current_catalog();
|
||||
let schema = self.current_schema();
|
||||
if catalog == DEFAULT_CATALOG_NAME {
|
||||
schema
|
||||
} else {
|
||||
format!("{catalog}-{schema}")
|
||||
}
|
||||
build_db_string(&catalog, &schema)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ macro_rules! http_tests {
|
||||
test_prometheus_promql_api,
|
||||
test_prom_http_api,
|
||||
test_metrics_api,
|
||||
test_catalog_metrics,
|
||||
test_scripts_api,
|
||||
test_health_api,
|
||||
test_dashboard_path,
|
||||
@@ -337,6 +338,51 @@ pub async fn test_metrics_api(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_catalog_metrics(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
common_telemetry::init_default_metrics_recorder();
|
||||
|
||||
let (app, mut guard) = setup_test_http_app(store_type, "metrics_api").await;
|
||||
let client = TestClient::new(app);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 3"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
// create new schema
|
||||
let res = client.get("/v1/sql?sql=create database tommy").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 4"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
// create new schema
|
||||
let res = client
|
||||
.get("/v1/sql?sql=create table ints (i BIGINT, ii TIMESTAMP TIME INDEX)")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 4"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
// create new schema
|
||||
let res = client.get("/v1/sql?sql=drop table ints").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Call metrics api
|
||||
let body = client.get("/metrics").send().await.text().await;
|
||||
assert!(body.contains("catalog_schema_count 4"));
|
||||
assert!(body.contains("catalog_table_count{db=\"public\"}"));
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_scripts_api(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "script_api").await;
|
||||
|
||||
Reference in New Issue
Block a user