mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-03 12:30:40 +00:00
feat: expose region read load metrics (#8316)
* feat: expose region read load through Prometheus metrics and heartbeat Introduce region-level query load tracking (CPU time and scanned bytes) collected by `RegionScanExec`, exposed via Prometheus metrics and optionally reported through heartbeat region stats. - **Region metrics** (`src/mito2/src/metrics.rs`, `src/store-api/src/metrics.rs`): Add `greptime_mito_region_query_cpu_time`, `greptime_mito_region_query_scanned_bytes`, and `greptime_mito_region_written_bytes_since_open` gauge metrics. - **MitoRegion** (`src/mito2/src/region.rs`, `src/mito2/src/region/opener.rs`, `src/mito2/src/region_write_ctx.rs`): Replace `AtomicU64` `written_bytes` with `IntGauge`; add `query_cpu_time`/`query_scanned_bytes` fields with lifecycle management (init, reset, remove-on-drop). - **RegionStatistic** (`src/store-api/src/region_engine.rs`, `src/store-api/src/storage/requests.rs`): Add `query_cpu_time` and `query_scanned_bytes` fields. - **Metric-engine** (`src/metric-engine/src/utils.rs`): Aggregate query load from metadata and data regions. - **Heartbeat** (`src/datanode/src/heartbeat.rs`, `src/common/meta/src/datanode.rs`): Relay region query load via heartbeat `RegionStat`; add test. - **Query engine** (`src/query/src/options.rs`, `src/query/src/query_engine/state.rs`, `src/query/src/datafusion.rs`, `src/query/src/dist_plan/merge_scan.rs`, `src/query/src/dist_plan/analyzer.rs`, `src/query/src/dummy_catalog.rs`): Add `enable_region_query_load_report` config; wire `RegionScanExec` to accumulate CPU time and scanned bytes. - **Table scan** (`src/table/src/table/scan.rs`, `src/table/src/table/metrics.rs`): Wire table scan metrics. - **Config** (`config/standalone.example.toml`, `config/datanode.example.toml`, `config/frontend.example.toml`, `config/config.md`): Add example config and documentation for `enable_region_query_load_report`. - **Tests** (`src/mito2/src/engine/basic_test.rs`, `src/mito2/src/engine/close_test.rs`, `src/cmd/tests/load_config_test.rs`, `src/flow/src/adapter.rs`): Add unit tests for region query load reporting and metric cleanup on region close; set default config values. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: move region read load report config from query layer to mito engine Move the `enable_region_query_load_report` setting from query-level config (`QueryOptions`/`DistPlannerOptions`) into the mito2 storage engine config (`MitoConfig`), and expose it through the `RegionScanner` trait instead of `ScanRequest`/`PrepareRequest`. - Mito config: `src/mito2/src/config.rs`, `src/mito2/src/engine.rs` - Scan region plumbing: `src/mito2/src/read/scan_region.rs` - RegionScanner trait: `src/store-api/src/region_engine.rs` - Scanner impls: `src/mito2/src/read/seq_scan.rs`, `src/mito2/src/read/series_scan.rs`, `src/mito2/src/read/unordered_scan.rs` - RegionScanExec: `src/table/src/table/scan.rs` - Removed from query layer: `src/query/src/options.rs`, `src/query/src/dist_plan/analyzer.rs`, `src/query/src/query_engine/state.rs`, `src/query/src/datafusion.rs`, `src/query/src/dummy_catalog.rs` - Removed from test/config: `src/query/src/dist_plan/analyzer/test.rs`, `src/flow/src/adapter.rs`, `src/cmd/tests/load_config_test.rs`, `src/store-api/src/storage/requests.rs` - Config docs: `config/config.md`, `config/datanode.example.toml`, `config/frontend.example.toml`, `config/standalone.example.toml` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: move region query load report config from MitoConfig to LoggingOptions Relocate the `enable_region_query_load_report` setting from `MitoConfig` to `LoggingOptions` (as `enable_per_region_metrics`), and thread it into `MitoEngineBuilder` instead of reading from the engine config directly. This makes the region read-load reporting a per-node logging/observability concern rather than a per-engine storage setting. - `config/config.md` - `config/datanode.example.toml` - `config/standalone.example.toml` - `src/common/telemetry/src/logging.rs` - `src/datanode/src/datanode.rs` - `src/mito2/src/config.rs` - `src/mito2/src/engine.rs` - `src/mito2/src/region.rs` Signed-off-by: Lei Huang <lei@huang.to> Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: report region query load on stream drop instead of stream end Move `report_region_query_load()` from `StreamWithMetricWrapper::poll_next()` to `Drop::drop()` so that region query load is reported even when the stream is dropped prematurely (not just when fully consumed). Affected files: - `src/table/src/table/scan.rs` Signed-off-by: Lei, Huang <huanglei@qiyi.com> Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: make region query load reporting configurable Introduce `enable_region_query_load_report` flag to optionally report per-region `query_cpu_time` and `query_scanned_bytes` metrics instead of always creating them. When disabled, the Prometheus gauges are not created (`None`), avoiding metric churn for workloads that do not need query-level load tracking. - `src/common/meta/src/datanode.rs` — Placeholder fields for query load - `src/mito2/src/region.rs` — Make query metrics `Option<IntGauge>`, conditional create/remove/reset - `src/mito2/src/region/opener.rs` — Thread flag through `RegionOpener` - `src/mito2/src/worker.rs` — Thread flag through `WorkerGroup`/`WorkerStarter`/`RegionWorkerLoop` - `src/mito2/src/worker/handle_catchup.rs` — Pass flag on region open - `src/mito2/src/worker/handle_create.rs` — Pass flag on region create - `src/mito2/src/worker/handle_open.rs` — Pass flag on region open - `src/mito2/src/engine.rs` — Pass flag from `MitoEngineBuilder` - `src/mito2/src/test_util.rs` — Test helpers for both modes - `src/mito2/src/engine/basic_test.rs` — Cover disabled and preserve cases - `src/mito2/src/engine/close_test.rs` — Adapt to optional metrics Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * refactor: remove elapsed_compute metric from scan stream The elapsed_compute metric conflated poll-wait time with actual CPU computation, making it misleading. Removed the metric and its recording path from StreamMetrics and StreamWithMetricWrapper. Added a test asserting that poll duration is not reported as elapsed_compute. - `src/table/src/table/metrics.rs` — removed elapsed_compute field, builder, and record_elapsed_compute method - `src/table/src/table/scan.rs` — removed record_elapsed_compute call; added SlowRecordBatchStream test helper and wrapper_poll_time_is_not_elapsed_compute test Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: disable region query load report for compaction scans Compaction scans are internal operations initiated by the engine, not user queries. Disable region query load reporting when the scan input is marked as compaction to avoid misleading load metrics. - `src/mito2/src/read/scan_region.rs` — set `enable_region_query_load_report` to `false` when compaction is enabled; add unit test Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * test: add `enable_per_region_metrics` config to HTTP integration test - Enable per-region metrics config in HTTP test setup \`tests-integration/tests/http.rs\` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * refactor: remove region query load reporting tests and helpers Remove the region query load reporting feature from the codebase, including tests, test utilities, and helper infrastructure that were part of this now-deprecated functionality. Specifically: - Remove region query load reporting tests from `src/mito2/src/engine/basic_test.rs` and `src/table/src/table/scan.rs`, and the region close metrics test from `src/mito2/src/engine/close_test.rs` - Remove region query load report test utilities and simplify engine construction helpers in `src/mito2/src/test_util.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * perf: avoid disabled region query load timing Summary: - Avoid per-poll `Instant::now` and elapsed-time accumulation when `enable_region_query_load_report` is disabled. - Keep region query-load CPU accounting active only when reporting is enabled. Files: - `src/table/src/table/scan.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: move per-region query load reporting from storage to query engine Move `enable_per_region_metrics` from datanode to frontend config and migrate query load tracking (CPU time, scanned bytes) from mito2 storage engine to the query engine's distributed scan planner. The storage-level metrics plumbing and `enable_region_query_load_report` flag are removed from mito2, `ScanInput`, `ScanRegion`, and `RegionScanner`. Query-level metrics are now collected in `merge_scan.rs` via `scan_region_load`. - `src/mito2/` -- Remove `query_cpu_time`, `query_scanned_bytes` metrics, `enable_region_query_load_report` plumbing from engine, region, opener, scanner types, workers - `src/store-api/` -- Remove `query_cpu_time`, `query_scanned_bytes` from `RegionStatistic` - `src/metric-engine/` -- Remove query load fields from `get_region_statistic` - `src/query/` -- Add `enable_per_region_metrics` to `QueryOptions`; wire through planner, optimizer, merge scan with `scan_region_load` metrics - `src/frontend/` -- Pass `enable_per_region_metrics` into `QueryOptions` - `src/common/meta/` -- Remove TODO for query load fields - `config/` -- Move `enable_per_region_metrics` from datanode to frontend and standalone example configs - `src/cmd/tests/` -- Add `enable_per_region_metrics` to flownode config test - `src/flow/` -- Add `enable_per_region_metrics` default to flownode options - `src/table/` -- Remove unused query load fields from scan - `src/datanode/` -- Remove `with_enable_region_query_load_report` calls Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * refactor: remove obsolete mito write load metric Remove obsolete mito-side region written-bytes metric plumbing that is not needed by the frontend read-load reporting path. Related files: - \`src/mito2/src/metrics.rs\` - \`src/mito2/src/region.rs\` - \`src/mito2/src/region/opener.rs\` - \`src/mito2/src/region_write_ctx.rs\` - \`src/mito2/src/engine/basic_test.rs\` - \`src/mito2/src/worker.rs\` - \`src/mito2/src/config.rs\` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: change region query load metrics from gauge to counter Change `REGION_QUERY_CPU_TIME` and `REGION_QUERY_SCANNED_BYTES` from `IntGaugeVec` to `IntCounterVec` since these values are monotonically increasing and do not need gauge semantics. Update corresponding `add` calls to `inc_by` in merge scan reporting. Files: - `src/store-api/src/metrics.rs` — metric type and label changes - `src/query/src/dist_plan/merge_scan.rs` — caller adaptation Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * refactor: pass ReadItem directly to report_region_query_load Move `region_scan_load` call to the caller, so `report_region_query_load` accepts the already-computed `ReadItem` instead of `RecordBatchMetrics`. - `src/query/src/dist_plan/merge_scan.rs` — update signature, inline call, remove stale test Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: ensure region query load is reported on MergeScanExec drop Remove the `enable_per_region_metrics` parameter from `report_region_query_load` so region load metrics are always emitted. Add a `Drop` impl for `MergeScanExec` that reports sub-stage metrics when the executor is dropped, covering edge cases where per-region metric emission was missed. Add a unit test verifying CPU time and scanned bytes are recorded on drop. Affected file: `src/query/src/dist_plan/merge_scan.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: gate region query load reporting Guard drop-time region query load reporting with the configured per-region metrics flag. Related files: - \`src/query/src/dist_plan/merge_scan.rs\` Symbols: - \`MergeScanExec::drop\` - \`enable_per_region_metrics\` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: clean region query load metrics on drop Remove per-region query load metric labels when a region is dropped so stale label series do not remain in the registry. Related files: - \`src/mito2/src/region.rs\` Symbols: - \`MitoRegion::drop\` - \`REGION_QUERY_CPU_TIME\` - \`REGION_QUERY_SCANNED_BYTES\` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Signed-off-by: Lei Huang <lei@huang.to> Signed-off-by: Lei, Huang <huanglei@qiyi.com>
This commit is contained in:
@@ -209,6 +209,7 @@
|
||||
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
|
||||
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.enable_per_region_metrics` | Bool | `false` | Whether to enable per-region metrics.<br/>Default to false. |
|
||||
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
|
||||
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
|
||||
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
@@ -328,6 +329,7 @@
|
||||
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
|
||||
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.enable_per_region_metrics` | Bool | `false` | Whether to enable per-region metrics.<br/>Default to false. |
|
||||
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
|
||||
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
|
||||
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
|
||||
@@ -306,6 +306,10 @@ log_format = "text"
|
||||
## The maximum amount of log files.
|
||||
max_log_files = 720
|
||||
|
||||
## Whether to enable per-region metrics.
|
||||
## Default to false.
|
||||
enable_per_region_metrics = false
|
||||
|
||||
## The OTLP tracing export protocol. Can be `grpc`/`http`.
|
||||
otlp_export_protocol = "http"
|
||||
|
||||
|
||||
@@ -823,6 +823,10 @@ log_format = "text"
|
||||
## The maximum amount of log files.
|
||||
max_log_files = 720
|
||||
|
||||
## Whether to enable per-region metrics.
|
||||
## Default to false.
|
||||
enable_per_region_metrics = false
|
||||
|
||||
## The OTLP tracing export protocol. Can be `grpc`/`http`.
|
||||
otlp_export_protocol = "http"
|
||||
|
||||
|
||||
@@ -253,6 +253,7 @@ fn test_load_flownode_example_config() {
|
||||
parallelism: 1,
|
||||
allow_query_fallback: false,
|
||||
memory_pool_size: MemoryLimit::Percentage(50),
|
||||
enable_per_region_metrics: false,
|
||||
},
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
|
||||
@@ -271,6 +271,9 @@ pub struct LoggingOptions {
|
||||
/// Additional HTTP headers for OTLP exporter.
|
||||
#[serde(skip_serializing_if = "HashMap::is_empty")]
|
||||
pub otlp_headers: HashMap<String, String>,
|
||||
|
||||
/// Whether to enable per-region metrics.
|
||||
pub enable_per_region_metrics: bool,
|
||||
}
|
||||
|
||||
/// The protocol of OTLP export.
|
||||
@@ -346,6 +349,7 @@ impl PartialEq for LoggingOptions {
|
||||
&& self.otlp_endpoint == other.otlp_endpoint
|
||||
&& self.tracing_sample_ratio == other.tracing_sample_ratio
|
||||
&& self.append_stdout == other.append_stdout
|
||||
&& self.enable_per_region_metrics == other.enable_per_region_metrics
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,6 +377,7 @@ impl Default for LoggingOptions {
|
||||
max_log_files: 720,
|
||||
otlp_export_protocol: None,
|
||||
otlp_headers: HashMap::new(),
|
||||
enable_per_region_metrics: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,12 +418,10 @@ impl HeartbeatTask {
|
||||
if let Some(serialized) = region_stat.serialize_to_vec() {
|
||||
extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
|
||||
}
|
||||
|
||||
RegionStat {
|
||||
region_id: stat.region_id.as_u64(),
|
||||
engine: stat.engine,
|
||||
role: RegionRole::from(stat.role).into(),
|
||||
// TODO(weny): w/rcus
|
||||
rcus: 0,
|
||||
wcus: 0,
|
||||
approximate_bytes: region_stat.estimated_disk_size() as i64,
|
||||
|
||||
@@ -129,6 +129,7 @@ impl Default for FlownodeOptions {
|
||||
parallelism: 1,
|
||||
allow_query_fallback: false,
|
||||
memory_pool_size: MemoryLimit::default(),
|
||||
enable_per_region_metrics: false,
|
||||
},
|
||||
memory: MemoryOptions::default(),
|
||||
}
|
||||
|
||||
@@ -240,6 +240,8 @@ impl FrontendBuilder {
|
||||
Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
|
||||
|
||||
let mut query_options = self.options.query.clone();
|
||||
query_options.enable_per_region_metrics = self.options.logging.enable_per_region_metrics;
|
||||
let query_engine = QueryEngineFactory::new_with_plugins(
|
||||
self.catalog_manager.clone(),
|
||||
Some(partition_manager.clone()),
|
||||
@@ -249,7 +251,7 @@ impl FrontendBuilder {
|
||||
Some(Arc::new(flow_service)),
|
||||
true,
|
||||
plugins.clone(),
|
||||
self.options.query.clone(),
|
||||
query_options,
|
||||
)
|
||||
.query_engine();
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ use store_api::ManifestVersion;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES};
|
||||
use store_api::region_engine::{
|
||||
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
|
||||
};
|
||||
@@ -208,6 +209,13 @@ impl StagingPartitionInfo {
|
||||
}
|
||||
|
||||
impl MitoRegion {
|
||||
fn remove_region_metrics(&self) {
|
||||
let region_id = self.region_id.as_u64().to_string();
|
||||
let labels = &[region_id.as_str()];
|
||||
let _ = REGION_QUERY_CPU_TIME.remove_label_values(labels);
|
||||
let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(labels);
|
||||
}
|
||||
|
||||
/// Stop background managers for this region.
|
||||
pub(crate) async fn stop(&self) {
|
||||
self.manifest_ctx
|
||||
@@ -978,6 +986,12 @@ impl MitoRegion {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MitoRegion {
|
||||
fn drop(&mut self) {
|
||||
self.remove_region_metrics();
|
||||
}
|
||||
}
|
||||
|
||||
/// Context to update the region manifest.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ManifestContext {
|
||||
|
||||
@@ -46,6 +46,7 @@ use greptime_proto::v1::region::RegionRequestHeader;
|
||||
use meter_core::data::ReadItem;
|
||||
use meter_macros::read_meter;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES};
|
||||
use store_api::storage::RegionId;
|
||||
use table::table::scan::REGION_SCAN_EXEC_NAME;
|
||||
use table::table_name::TableName;
|
||||
@@ -221,6 +222,7 @@ pub struct MergeScanExec {
|
||||
captured_remote_dyn_filters: Arc<Mutex<Vec<CapturedDynFilter>>>,
|
||||
target_partition: usize,
|
||||
partition_cols: AliasMapping,
|
||||
enable_per_region_metrics: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MergeScanExec {
|
||||
@@ -246,6 +248,7 @@ impl MergeScanExec {
|
||||
target_partition: usize,
|
||||
partition_cols: AliasMapping,
|
||||
remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
|
||||
enable_per_region_metrics: bool,
|
||||
) -> Result<Self> {
|
||||
// TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
|
||||
// keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
|
||||
@@ -322,6 +325,7 @@ impl MergeScanExec {
|
||||
captured_remote_dyn_filters: Arc::default(),
|
||||
target_partition,
|
||||
partition_cols,
|
||||
enable_per_region_metrics,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -497,17 +501,9 @@ impl MergeScanExec {
|
||||
|
||||
// process metrics after all data is drained.
|
||||
if let Some(metrics) = stream.metrics() {
|
||||
let output_bytes = scan_output_bytes(&metrics);
|
||||
let load = region_scan_load(&metrics);
|
||||
let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
|
||||
let value = read_meter!(
|
||||
c,
|
||||
s,
|
||||
ReadItem {
|
||||
cpu_time: metrics.elapsed_compute as u64,
|
||||
table_scan: output_bytes as u64,
|
||||
},
|
||||
current_channel as u8
|
||||
);
|
||||
let value = read_meter!(c, s, load, current_channel as u8);
|
||||
metric.record_greptime_exec_cost(value as usize);
|
||||
|
||||
// record metrics from sub sgates
|
||||
@@ -598,6 +594,7 @@ impl MergeScanExec {
|
||||
captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(),
|
||||
target_partition: self.target_partition,
|
||||
partition_cols: self.partition_cols.clone(),
|
||||
enable_per_region_metrics: self.enable_per_region_metrics,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -929,6 +926,37 @@ fn scan_output_bytes(metrics: &RecordBatchMetrics) -> usize {
|
||||
.sum()
|
||||
}
|
||||
|
||||
fn region_scan_load(metrics: &RecordBatchMetrics) -> ReadItem {
|
||||
ReadItem {
|
||||
cpu_time: metrics.elapsed_compute as u64,
|
||||
table_scan: scan_output_bytes(metrics) as u64,
|
||||
}
|
||||
}
|
||||
|
||||
fn report_region_query_load(region_id: RegionId, load: &ReadItem) {
|
||||
let region_id = region_id.to_string();
|
||||
REGION_QUERY_CPU_TIME
|
||||
.with_label_values(&[®ion_id])
|
||||
.inc_by(load.cpu_time);
|
||||
REGION_QUERY_SCANNED_BYTES
|
||||
.with_label_values(&[®ion_id])
|
||||
.inc_by(load.table_scan);
|
||||
}
|
||||
|
||||
impl Drop for MergeScanExec {
|
||||
fn drop(&mut self) {
|
||||
if !self.enable_per_region_metrics {
|
||||
return;
|
||||
}
|
||||
|
||||
let metrics = self.sub_stage_metrics.lock().unwrap();
|
||||
for (region_id, metrics) in metrics.iter() {
|
||||
let load = region_scan_load(metrics);
|
||||
report_region_query_load(*region_id, &load);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct MergeScanMetric {
|
||||
/// Nanosecond elapsed till the scan operator is ready to emit data
|
||||
@@ -1147,6 +1175,7 @@ mod tests {
|
||||
target_partition,
|
||||
partition_cols,
|
||||
Some(remote_dyn_filter_producer_id),
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -1200,6 +1229,7 @@ mod tests {
|
||||
1,
|
||||
AliasMapping::new(),
|
||||
Some(remote_dyn_filter_producer_id),
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
|
||||
@@ -1277,4 +1307,68 @@ mod tests {
|
||||
|
||||
assert_eq!(scan_output_bytes(&metrics), 60);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_scan_reports_region_query_load_on_drop() {
|
||||
use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES};
|
||||
|
||||
let region_id = RegionId::new(1024, 10002);
|
||||
let region_id_label = region_id.to_string();
|
||||
let labels = [®ion_id_label];
|
||||
let _ = REGION_QUERY_CPU_TIME.remove_label_values(&labels);
|
||||
let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(&labels);
|
||||
|
||||
let plan = LogicalPlanBuilder::empty(true)
|
||||
.project(vec![lit(1i32).alias("col1")])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let schema = plan.schema().as_arrow().clone();
|
||||
let exec = MergeScanExec::new(
|
||||
&SessionStateBuilder::new().build(),
|
||||
TableName::new("catalog", "schema", "table"),
|
||||
vec![region_id],
|
||||
plan,
|
||||
&schema,
|
||||
Arc::new(TestRegionQueryHandler),
|
||||
QueryContext::arc(),
|
||||
1,
|
||||
AliasMapping::new(),
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let metrics = RecordBatchMetrics {
|
||||
elapsed_compute: 42,
|
||||
plan_metrics: vec![PlanMetrics {
|
||||
plan: "RegionScanExec: region=1".to_string(),
|
||||
plan_name: REGION_SCAN_EXEC_NAME.to_string(),
|
||||
level: 0,
|
||||
metrics: vec![("output_bytes".to_string(), 24)],
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
exec.sub_stage_metrics
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(region_id, metrics);
|
||||
|
||||
assert_eq!(REGION_QUERY_CPU_TIME.with_label_values(&labels).get(), 0);
|
||||
assert_eq!(
|
||||
REGION_QUERY_SCANNED_BYTES.with_label_values(&labels).get(),
|
||||
0
|
||||
);
|
||||
|
||||
drop(exec);
|
||||
|
||||
assert_eq!(REGION_QUERY_CPU_TIME.with_label_values(&labels).get(), 42);
|
||||
assert_eq!(
|
||||
REGION_QUERY_SCANNED_BYTES.with_label_values(&labels).get(),
|
||||
24
|
||||
);
|
||||
|
||||
let _ = REGION_QUERY_CPU_TIME.remove_label_values(&labels);
|
||||
let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(&labels);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,6 +108,7 @@ pub struct DistExtensionPlanner {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_rule_manager: PartitionRuleManagerRef,
|
||||
region_query_handler: RegionQueryHandlerRef,
|
||||
enable_per_region_metrics: bool,
|
||||
}
|
||||
|
||||
impl DistExtensionPlanner {
|
||||
@@ -115,11 +116,13 @@ impl DistExtensionPlanner {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_rule_manager: PartitionRuleManagerRef,
|
||||
region_query_handler: RegionQueryHandlerRef,
|
||||
enable_per_region_metrics: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
partition_rule_manager,
|
||||
region_query_handler,
|
||||
enable_per_region_metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,6 +183,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
|
||||
session_state.config().target_partitions(),
|
||||
merge_scan.partition_cols().clone(),
|
||||
merge_scan.remote_dyn_filter_producer_id(),
|
||||
self.enable_per_region_metrics,
|
||||
)?;
|
||||
Ok(Some(Arc::new(merge_scan_plan) as _))
|
||||
}
|
||||
|
||||
@@ -184,7 +184,6 @@ impl TableProvider for DummyTableProvider {
|
||||
request.projection_input = projection.map(|p| p.clone().into());
|
||||
request.filters = filters.to_vec();
|
||||
request.limit = limit;
|
||||
|
||||
if let Some(query_ctx) = &self.query_ctx {
|
||||
let is_sink_scan = is_sink_scan(query_ctx, self.region_id)
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
@@ -460,6 +460,7 @@ mod tests {
|
||||
1,
|
||||
BTreeMap::<String, BTreeSet<datafusion_common::Column>>::new(),
|
||||
Some(RemoteDynFilterProducerId::new(0)),
|
||||
false,
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -293,6 +293,7 @@ mod tests {
|
||||
32,
|
||||
partition_cols,
|
||||
Some(RemoteDynFilterProducerId::new(1)),
|
||||
false,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -43,6 +43,9 @@ pub struct QueryOptions {
|
||||
/// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
|
||||
/// When this limit is reached, queries will fail with ResourceExhausted error.
|
||||
pub memory_pool_size: MemoryLimit,
|
||||
/// Whether to expose per-region query load metrics.
|
||||
#[serde(skip)]
|
||||
pub enable_per_region_metrics: bool,
|
||||
}
|
||||
|
||||
#[allow(clippy::derivable_impls)]
|
||||
@@ -52,6 +55,7 @@ impl Default for QueryOptions {
|
||||
parallelism: 0,
|
||||
allow_query_fallback: false,
|
||||
memory_pool_size: MemoryLimit::default(),
|
||||
enable_per_region_metrics: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +233,7 @@ impl QueryEngineState {
|
||||
catalog_list.clone(),
|
||||
partition_rule_manager,
|
||||
region_query_handler.clone(),
|
||||
options.enable_per_region_metrics,
|
||||
)))
|
||||
.with_optimizer_rules(optimizer.rules)
|
||||
.with_physical_optimizer_rules(physical_optimizer.rules)
|
||||
@@ -511,6 +512,7 @@ impl DfQueryPlanner {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_rule_manager: Option<PartitionRuleManagerRef>,
|
||||
region_query_handler: Option<RegionQueryHandlerRef>,
|
||||
enable_per_region_metrics: bool,
|
||||
) -> Self {
|
||||
let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
|
||||
Arc::new(PromExtensionPlanner),
|
||||
@@ -524,6 +526,7 @@ impl DfQueryPlanner {
|
||||
catalog_manager,
|
||||
partition_rule_manager,
|
||||
region_query_handler,
|
||||
enable_per_region_metrics,
|
||||
)));
|
||||
planners.push(Arc::new(MergeSortExtensionPlanner {}));
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ pub mod data_source;
|
||||
pub mod logstore;
|
||||
pub mod metadata;
|
||||
pub mod metric_engine_consts;
|
||||
mod metrics;
|
||||
pub mod metrics;
|
||||
pub mod mito_engine_options;
|
||||
pub mod path_utils;
|
||||
pub mod region_engine;
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::{HistogramVec, register_histogram_vec};
|
||||
use prometheus::{HistogramVec, IntCounterVec, register_histogram_vec, register_int_counter_vec};
|
||||
|
||||
/// Region id label.
|
||||
pub const REGION_LABEL: &str = "region_id";
|
||||
|
||||
lazy_static! {
|
||||
pub static ref CONVERT_REGION_BULK_REQUEST: HistogramVec = register_histogram_vec!(
|
||||
@@ -26,4 +29,18 @@ lazy_static! {
|
||||
]
|
||||
)
|
||||
.unwrap();
|
||||
/// Query CPU time accumulated by each region.
|
||||
pub static ref REGION_QUERY_CPU_TIME: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_mito_region_query_cpu_time",
|
||||
"mito region query CPU time in nanoseconds",
|
||||
&[REGION_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
/// Query scanned bytes accumulated by each region.
|
||||
pub static ref REGION_QUERY_SCANNED_BYTES: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_mito_region_query_scanned_bytes",
|
||||
"mito region query scanned bytes",
|
||||
&[REGION_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -1612,6 +1612,7 @@ read_preference = "Leader"
|
||||
max_log_files = 720
|
||||
append_stdout = true
|
||||
enable_otlp_tracing = false
|
||||
enable_per_region_metrics = false
|
||||
|
||||
[[region_engine]]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user