diff --git a/config/config.md b/config/config.md index 15d431e2af..f8ce99cae1 100644 --- a/config/config.md +++ b/config/config.md @@ -209,6 +209,7 @@ | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | | `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | +| `logging.enable_per_region_metrics` | Bool | `false` | Whether to enable per-region metrics.
Default to false. | | `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. | | `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http | | `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | @@ -328,6 +329,7 @@ | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | | `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | +| `logging.enable_per_region_metrics` | Bool | `false` | Whether to enable per-region metrics.
Default to false. | | `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. | | `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http | | `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 23de91fbb2..2331cdf028 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -306,6 +306,10 @@ log_format = "text" ## The maximum amount of log files. max_log_files = 720 +## Whether to enable per-region metrics. +## Default to false. +enable_per_region_metrics = false + ## The OTLP tracing export protocol. Can be `grpc`/`http`. otlp_export_protocol = "http" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 7d9366b6a2..79ed2814f3 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -823,6 +823,10 @@ log_format = "text" ## The maximum amount of log files. max_log_files = 720 +## Whether to enable per-region metrics. +## Default to false. +enable_per_region_metrics = false + ## The OTLP tracing export protocol. Can be `grpc`/`http`. otlp_export_protocol = "http" diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 73fb6c3d0b..74e4127de8 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -253,6 +253,7 @@ fn test_load_flownode_example_config() { parallelism: 1, allow_query_fallback: false, memory_pool_size: MemoryLimit::Percentage(50), + enable_per_region_metrics: false, }, meta_client: Some(MetaClientOptions { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 1b371c1d78..953dbbc2df 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -271,6 +271,9 @@ pub struct LoggingOptions { /// Additional HTTP headers for OTLP exporter. #[serde(skip_serializing_if = "HashMap::is_empty")] pub otlp_headers: HashMap, + + /// Whether to enable per-region metrics. + pub enable_per_region_metrics: bool, } /// The protocol of OTLP export. @@ -346,6 +349,7 @@ impl PartialEq for LoggingOptions { && self.otlp_endpoint == other.otlp_endpoint && self.tracing_sample_ratio == other.tracing_sample_ratio && self.append_stdout == other.append_stdout + && self.enable_per_region_metrics == other.enable_per_region_metrics } } @@ -373,6 +377,7 @@ impl Default for LoggingOptions { max_log_files: 720, otlp_export_protocol: None, otlp_headers: HashMap::new(), + enable_per_region_metrics: false, } } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 3d77251953..bfdaf32a1c 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -418,12 +418,10 @@ impl HeartbeatTask { if let Some(serialized) = region_stat.serialize_to_vec() { extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized); } - RegionStat { region_id: stat.region_id.as_u64(), engine: stat.engine, role: RegionRole::from(stat.role).into(), - // TODO(weny): w/rcus rcus: 0, wcus: 0, approximate_bytes: region_stat.estimated_disk_size() as i64, diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 351d176db0..a7c1af5b45 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -129,6 +129,7 @@ impl Default for FlownodeOptions { parallelism: 1, allow_query_fallback: false, memory_pool_size: MemoryLimit::default(), + enable_per_region_metrics: false, }, memory: MemoryOptions::default(), } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 238de303d5..3fa7fca33f 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -240,6 +240,8 @@ impl FrontendBuilder { Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone()); + let mut query_options = self.options.query.clone(); + query_options.enable_per_region_metrics = self.options.logging.enable_per_region_metrics; let query_engine = QueryEngineFactory::new_with_plugins( self.catalog_manager.clone(), Some(partition_manager.clone()), @@ -249,7 +251,7 @@ impl FrontendBuilder { Some(Arc::new(flow_service)), true, plugins.clone(), - self.options.query.clone(), + query_options, ) .query_engine(); diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index f58d09d1cc..72c269bd0e 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -34,6 +34,7 @@ use store_api::ManifestVersion; use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; use store_api::metadata::RegionMetadataRef; +use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES}; use store_api::region_engine::{ RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, }; @@ -208,6 +209,13 @@ impl StagingPartitionInfo { } impl MitoRegion { + fn remove_region_metrics(&self) { + let region_id = self.region_id.as_u64().to_string(); + let labels = &[region_id.as_str()]; + let _ = REGION_QUERY_CPU_TIME.remove_label_values(labels); + let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(labels); + } + /// Stop background managers for this region. pub(crate) async fn stop(&self) { self.manifest_ctx @@ -978,6 +986,12 @@ impl MitoRegion { } } +impl Drop for MitoRegion { + fn drop(&mut self) { + self.remove_region_metrics(); + } +} + /// Context to update the region manifest. #[derive(Debug)] pub(crate) struct ManifestContext { diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 5d0db51e02..9615bc2a13 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -46,6 +46,7 @@ use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; +use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES}; use store_api::storage::RegionId; use table::table::scan::REGION_SCAN_EXEC_NAME; use table::table_name::TableName; @@ -221,6 +222,7 @@ pub struct MergeScanExec { captured_remote_dyn_filters: Arc>>, target_partition: usize, partition_cols: AliasMapping, + enable_per_region_metrics: bool, } impl std::fmt::Debug for MergeScanExec { @@ -246,6 +248,7 @@ impl MergeScanExec { target_partition: usize, partition_cols: AliasMapping, remote_dyn_filter_producer_id: Option, + enable_per_region_metrics: bool, ) -> Result { // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs. @@ -322,6 +325,7 @@ impl MergeScanExec { captured_remote_dyn_filters: Arc::default(), target_partition, partition_cols, + enable_per_region_metrics, }) } @@ -497,17 +501,9 @@ impl MergeScanExec { // process metrics after all data is drained. if let Some(metrics) = stream.metrics() { - let output_bytes = scan_output_bytes(&metrics); + let load = region_scan_load(&metrics); let (c, s) = parse_catalog_and_schema_from_db_string(&dbname); - let value = read_meter!( - c, - s, - ReadItem { - cpu_time: metrics.elapsed_compute as u64, - table_scan: output_bytes as u64, - }, - current_channel as u8 - ); + let value = read_meter!(c, s, load, current_channel as u8); metric.record_greptime_exec_cost(value as usize); // record metrics from sub sgates @@ -598,6 +594,7 @@ impl MergeScanExec { captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(), target_partition: self.target_partition, partition_cols: self.partition_cols.clone(), + enable_per_region_metrics: self.enable_per_region_metrics, }) } @@ -929,6 +926,37 @@ fn scan_output_bytes(metrics: &RecordBatchMetrics) -> usize { .sum() } +fn region_scan_load(metrics: &RecordBatchMetrics) -> ReadItem { + ReadItem { + cpu_time: metrics.elapsed_compute as u64, + table_scan: scan_output_bytes(metrics) as u64, + } +} + +fn report_region_query_load(region_id: RegionId, load: &ReadItem) { + let region_id = region_id.to_string(); + REGION_QUERY_CPU_TIME + .with_label_values(&[®ion_id]) + .inc_by(load.cpu_time); + REGION_QUERY_SCANNED_BYTES + .with_label_values(&[®ion_id]) + .inc_by(load.table_scan); +} + +impl Drop for MergeScanExec { + fn drop(&mut self) { + if !self.enable_per_region_metrics { + return; + } + + let metrics = self.sub_stage_metrics.lock().unwrap(); + for (region_id, metrics) in metrics.iter() { + let load = region_scan_load(metrics); + report_region_query_load(*region_id, &load); + } + } +} + #[derive(Debug, Clone)] struct MergeScanMetric { /// Nanosecond elapsed till the scan operator is ready to emit data @@ -1147,6 +1175,7 @@ mod tests { target_partition, partition_cols, Some(remote_dyn_filter_producer_id), + false, ) .unwrap(); @@ -1200,6 +1229,7 @@ mod tests { 1, AliasMapping::new(), Some(remote_dyn_filter_producer_id), + false, ) .unwrap(); let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new( @@ -1277,4 +1307,68 @@ mod tests { assert_eq!(scan_output_bytes(&metrics), 60); } + + #[test] + fn merge_scan_reports_region_query_load_on_drop() { + use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES}; + + let region_id = RegionId::new(1024, 10002); + let region_id_label = region_id.to_string(); + let labels = [®ion_id_label]; + let _ = REGION_QUERY_CPU_TIME.remove_label_values(&labels); + let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(&labels); + + let plan = LogicalPlanBuilder::empty(true) + .project(vec![lit(1i32).alias("col1")]) + .unwrap() + .build() + .unwrap(); + let schema = plan.schema().as_arrow().clone(); + let exec = MergeScanExec::new( + &SessionStateBuilder::new().build(), + TableName::new("catalog", "schema", "table"), + vec![region_id], + plan, + &schema, + Arc::new(TestRegionQueryHandler), + QueryContext::arc(), + 1, + AliasMapping::new(), + None, + true, + ) + .unwrap(); + + let metrics = RecordBatchMetrics { + elapsed_compute: 42, + plan_metrics: vec![PlanMetrics { + plan: "RegionScanExec: region=1".to_string(), + plan_name: REGION_SCAN_EXEC_NAME.to_string(), + level: 0, + metrics: vec![("output_bytes".to_string(), 24)], + }], + ..Default::default() + }; + exec.sub_stage_metrics + .lock() + .unwrap() + .insert(region_id, metrics); + + assert_eq!(REGION_QUERY_CPU_TIME.with_label_values(&labels).get(), 0); + assert_eq!( + REGION_QUERY_SCANNED_BYTES.with_label_values(&labels).get(), + 0 + ); + + drop(exec); + + assert_eq!(REGION_QUERY_CPU_TIME.with_label_values(&labels).get(), 42); + assert_eq!( + REGION_QUERY_SCANNED_BYTES.with_label_values(&labels).get(), + 24 + ); + + let _ = REGION_QUERY_CPU_TIME.remove_label_values(&labels); + let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(&labels); + } } diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index a14aefbf5b..f613807b29 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -108,6 +108,7 @@ pub struct DistExtensionPlanner { catalog_manager: CatalogManagerRef, partition_rule_manager: PartitionRuleManagerRef, region_query_handler: RegionQueryHandlerRef, + enable_per_region_metrics: bool, } impl DistExtensionPlanner { @@ -115,11 +116,13 @@ impl DistExtensionPlanner { catalog_manager: CatalogManagerRef, partition_rule_manager: PartitionRuleManagerRef, region_query_handler: RegionQueryHandlerRef, + enable_per_region_metrics: bool, ) -> Self { Self { catalog_manager, partition_rule_manager, region_query_handler, + enable_per_region_metrics, } } } @@ -180,6 +183,7 @@ impl ExtensionPlanner for DistExtensionPlanner { session_state.config().target_partitions(), merge_scan.partition_cols().clone(), merge_scan.remote_dyn_filter_producer_id(), + self.enable_per_region_metrics, )?; Ok(Some(Arc::new(merge_scan_plan) as _)) } diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index eb8644c2cf..973917b7cb 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -184,7 +184,6 @@ impl TableProvider for DummyTableProvider { request.projection_input = projection.map(|p| p.clone().into()); request.filters = filters.to_vec(); request.limit = limit; - if let Some(query_ctx) = &self.query_ctx { let is_sink_scan = is_sink_scan(query_ctx, self.region_id) .map_err(|e| DataFusionError::External(Box::new(e)))?; diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index b1483faea2..bee436660c 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -460,6 +460,7 @@ mod tests { 1, BTreeMap::>::new(), Some(RemoteDynFilterProducerId::new(0)), + false, ) .unwrap(), ) diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index e9cd5f028d..22e2b90359 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -293,6 +293,7 @@ mod tests { 32, partition_cols, Some(RemoteDynFilterProducerId::new(1)), + false, ) .unwrap() } diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 721d79ea8d..51418c10a5 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -43,6 +43,9 @@ pub struct QueryOptions { /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%"). /// When this limit is reached, queries will fail with ResourceExhausted error. pub memory_pool_size: MemoryLimit, + /// Whether to expose per-region query load metrics. + #[serde(skip)] + pub enable_per_region_metrics: bool, } #[allow(clippy::derivable_impls)] @@ -52,6 +55,7 @@ impl Default for QueryOptions { parallelism: 0, allow_query_fallback: false, memory_pool_size: MemoryLimit::default(), + enable_per_region_metrics: false, } } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 44c75292b1..50785c4a13 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -233,6 +233,7 @@ impl QueryEngineState { catalog_list.clone(), partition_rule_manager, region_query_handler.clone(), + options.enable_per_region_metrics, ))) .with_optimizer_rules(optimizer.rules) .with_physical_optimizer_rules(physical_optimizer.rules) @@ -511,6 +512,7 @@ impl DfQueryPlanner { catalog_manager: CatalogManagerRef, partition_rule_manager: Option, region_query_handler: Option, + enable_per_region_metrics: bool, ) -> Self { let mut planners: Vec> = vec![ Arc::new(PromExtensionPlanner), @@ -524,6 +526,7 @@ impl DfQueryPlanner { catalog_manager, partition_rule_manager, region_query_handler, + enable_per_region_metrics, ))); planners.push(Arc::new(MergeSortExtensionPlanner {})); } diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index f97e348842..30c9ec50f1 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -19,7 +19,7 @@ pub mod data_source; pub mod logstore; pub mod metadata; pub mod metric_engine_consts; -mod metrics; +pub mod metrics; pub mod mito_engine_options; pub mod path_utils; pub mod region_engine; diff --git a/src/store-api/src/metrics.rs b/src/store-api/src/metrics.rs index 1eab3021c3..f939a1bb86 100644 --- a/src/store-api/src/metrics.rs +++ b/src/store-api/src/metrics.rs @@ -13,7 +13,10 @@ // limitations under the License. use lazy_static::lazy_static; -use prometheus::{HistogramVec, register_histogram_vec}; +use prometheus::{HistogramVec, IntCounterVec, register_histogram_vec, register_int_counter_vec}; + +/// Region id label. +pub const REGION_LABEL: &str = "region_id"; lazy_static! { pub static ref CONVERT_REGION_BULK_REQUEST: HistogramVec = register_histogram_vec!( @@ -26,4 +29,18 @@ lazy_static! { ] ) .unwrap(); + /// Query CPU time accumulated by each region. + pub static ref REGION_QUERY_CPU_TIME: IntCounterVec = register_int_counter_vec!( + "greptime_mito_region_query_cpu_time", + "mito region query CPU time in nanoseconds", + &[REGION_LABEL] + ) + .unwrap(); + /// Query scanned bytes accumulated by each region. + pub static ref REGION_QUERY_SCANNED_BYTES: IntCounterVec = register_int_counter_vec!( + "greptime_mito_region_query_scanned_bytes", + "mito region query scanned bytes", + &[REGION_LABEL] + ) + .unwrap(); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index bef0ba1530..b37cb6ee81 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1612,6 +1612,7 @@ read_preference = "Leader" max_log_files = 720 append_stdout = true enable_otlp_tracing = false +enable_per_region_metrics = false [[region_engine]]