From 90deaae844fee392f1e33a83b43f472878326fee Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 8 Jan 2026 14:13:31 +0800 Subject: [PATCH] feat: update special remote write label name (#7527) * feat: update special remote write label name * chore: mark schema_label as deprecated --- src/frontend/src/instance/promql.rs | 4 +- src/servers/src/http/prometheus.rs | 4 +- src/servers/src/prom_store.rs | 70 ++++++++++++++++++++--------- src/servers/src/proto.rs | 10 +++-- 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/src/frontend/src/instance/promql.rs b/src/frontend/src/instance/promql.rs index f527b18a71..419be8d96e 100644 --- a/src/frontend/src/instance/promql.rs +++ b/src/frontend/src/instance/promql.rs @@ -23,7 +23,7 @@ use common_telemetry::tracing; use promql_parser::label::{MatchOp, Matcher, Matchers}; use query::promql; use query::promql::planner::PromPlanner; -use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL}; +use servers::prom_store::is_database_selection_label; use servers::prometheus; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -112,7 +112,7 @@ impl Instance { let table_schema = matchers .iter() .find_map(|m| { - if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal { + if is_database_selection_label(&m.name) && m.op == MatchOp::Equal { Some(m.value.clone()) } else { None diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 31a712a68b..fd4f5491f0 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -70,7 +70,7 @@ use crate::error::{ UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; -use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL}; +use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label}; use crate::prometheus_handler::PrometheusHandlerRef; /// For [ValueType::Vector] result type @@ -1342,7 +1342,7 @@ pub async fn label_values_query( field_columns.sort_unstable(); truncate_results(&mut field_columns, params.limit); return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns)); - } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL { + } else if is_database_selection_label(&label_name) { let catalog_manager = handler.catalog_manager(); let mut schema_names = try_call_return_response!( diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 9ed2d00268..487bd59812 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -41,18 +41,56 @@ use crate::row_writer::{self, MultiTableData}; pub const METRIC_NAME_LABEL: &str = "__name__"; pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__"; -pub const DATABASE_LABEL: &str = "__database__"; -pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__"; +/// special label for selecting database name on remote write +pub const DATABASE_LABEL: &str = "x_greptime_database"; +pub const DATABASE_LABEL_BYTES: &[u8] = b"x_greptime_database"; +pub const DATABASE_LABEL_ALT: &str = "__database__"; +pub const DATABASE_LABEL_ALT_BYTES: &[u8] = b"__database__"; +/// deprecated, use DATABASE_LABEL instead +#[deprecated(note = "use DATABASE_LABEL instead")] pub const SCHEMA_LABEL: &str = "__schema__"; +#[deprecated(note = "use DATABASE_LABEL_BYTES instead")] pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__"; -pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__"; -pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__"; +/// special label for selecting physical table name on remote write +pub const PHYSICAL_TABLE_LABEL: &str = "x_greptime_physical_table"; +pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"x_greptime_physical_table"; +pub const PHYSICAL_TABLE_LABEL_ALT: &str = "__physical_table__"; +pub const PHYSICAL_TABLE_LABEL_ALT_BYTES: &[u8] = b"__physical_table__"; /// The same as `FIELD_COLUMN_MATCHER` in `promql` crate pub const FIELD_NAME_LABEL: &str = "__field__"; +/// Check if given label is a special label for remote write +#[allow(deprecated)] +pub fn is_remote_write_special_label(label: &str) -> bool { + label == DATABASE_LABEL + || label == DATABASE_LABEL_ALT + || label == PHYSICAL_TABLE_LABEL + || label == PHYSICAL_TABLE_LABEL_ALT + || label == SCHEMA_LABEL +} + +#[allow(deprecated)] +pub fn is_remote_read_special_label(label: &str) -> bool { + label == METRIC_NAME_LABEL + || label == DATABASE_LABEL + || label == DATABASE_LABEL_ALT + || label == SCHEMA_LABEL +} + +/// Check if given label is a database selection label +#[allow(deprecated)] +pub fn is_database_selection_label(label: &str) -> bool { + label == DATABASE_LABEL || label == DATABASE_LABEL_ALT || label == SCHEMA_LABEL +} + +/// Check if given label is a physical table selection label +pub fn is_physical_table_selection_label(label: &str) -> bool { + label == PHYSICAL_TABLE_LABEL || label == PHYSICAL_TABLE_LABEL_ALT +} + /// Metrics for push gateway protocol pub struct Metrics { pub exposition: MetricsExposition, @@ -77,20 +115,12 @@ pub fn table_name(q: &Query) -> Result { } /// Extract schema from remote read request. Returns the first schema found from any query's matchers. -/// Prioritizes __schema__ over __database__ labels. pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option { for query in &request.queries { for matcher in &query.matchers { - if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 { - return Some(matcher.value.clone()); - } - } - } - - // If no __schema__ found, look for __database__ - for query in &request.queries { - for matcher in &query.matchers { - if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 { + if is_database_selection_label(&matcher.name) + && matcher.r#type == MatcherType::Eq as i32 + { return Some(matcher.value.clone()); } } @@ -115,7 +145,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { for m in label_matches { let name = &m.name; - if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL { + if is_remote_read_special_label(name) { continue; } @@ -546,8 +576,8 @@ pub fn mock_timeseries_special_labels() -> Vec { let idc3_schema = TimeSeries { labels: vec![ new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()), - new_label("__database__".to_string(), "idc3".to_string()), - new_label("__physical_table__".to_string(), "f1".to_string()), + new_label(DATABASE_LABEL.to_string(), "idc3".to_string()), + new_label(PHYSICAL_TABLE_LABEL.to_string(), "f1".to_string()), ], samples: vec![Sample { value: 42.0, @@ -561,8 +591,8 @@ pub fn mock_timeseries_special_labels() -> Vec { METRIC_NAME_LABEL.to_string(), "idc4_local_table".to_string(), ), - new_label("__database__".to_string(), "idc4".to_string()), - new_label("__physical_table__".to_string(), "f2".to_string()), + new_label(DATABASE_LABEL.to_string(), "idc4".to_string()), + new_label(PHYSICAL_TABLE_LABEL.to_string(), "f2".to_string()), ], samples: vec![Sample { value: 99.0, diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 1ef01c443b..79e5b38e77 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -36,7 +36,8 @@ use crate::http::event::PipelineIngestRequest; use crate::pipeline::run_pipeline; use crate::prom_row_builder::{PromCtx, TablesBuilder}; use crate::prom_store::{ - DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES, + DATABASE_LABEL_ALT_BYTES, DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, + PHYSICAL_TABLE_LABEL_ALT_BYTES, PHYSICAL_TABLE_LABEL_BYTES, }; use crate::query_handler::PipelineHandlerRef; use crate::repeated_field::{Clear, RepeatedField}; @@ -201,18 +202,19 @@ impl PromTimeSeries { self.table_name = prom_validation_mode.decode_string(&label.value)?; self.labels.truncate(self.labels.len() - 1); // remove last label } - SCHEMA_LABEL_BYTES => { + #[allow(deprecated)] + crate::prom_store::SCHEMA_LABEL_BYTES => { self.schema = Some(prom_validation_mode.decode_string(&label.value)?); self.labels.truncate(self.labels.len() - 1); // remove last label } - DATABASE_LABEL_BYTES => { + DATABASE_LABEL_BYTES | DATABASE_LABEL_ALT_BYTES => { // Only set schema from __database__ if __schema__ hasn't been set yet if self.schema.is_none() { self.schema = Some(prom_validation_mode.decode_string(&label.value)?); } self.labels.truncate(self.labels.len() - 1); // remove last label } - PHYSICAL_TABLE_LABEL_BYTES => { + PHYSICAL_TABLE_LABEL_BYTES | PHYSICAL_TABLE_LABEL_ALT_BYTES => { self.physical_table = Some(prom_validation_mode.decode_string(&label.value)?); self.labels.truncate(self.labels.len() - 1); // remove last label