feat: update special remote write label name (#7527)

* feat: update special remote write label name

* chore: mark schema_label as deprecated
This commit is contained in:
Ning Sun
2026-01-08 14:13:31 +08:00
committed by GitHub
parent a32326c887
commit 90deaae844
4 changed files with 60 additions and 28 deletions

View File

@@ -23,7 +23,7 @@ use common_telemetry::tracing;
use promql_parser::label::{MatchOp, Matcher, Matchers};
use query::promql;
use query::promql::planner::PromPlanner;
use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
use servers::prom_store::is_database_selection_label;
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -112,7 +112,7 @@ impl Instance {
let table_schema = matchers
.iter()
.find_map(|m| {
if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
if is_database_selection_label(&m.name) && m.op == MatchOp::Equal {
Some(m.value.clone())
} else {
None

View File

@@ -70,7 +70,7 @@ use crate::error::{
UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label};
use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
@@ -1342,7 +1342,7 @@ pub async fn label_values_query(
field_columns.sort_unstable();
truncate_results(&mut field_columns, params.limit);
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
} else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
} else if is_database_selection_label(&label_name) {
let catalog_manager = handler.catalog_manager();
let mut schema_names = try_call_return_response!(

View File

@@ -41,18 +41,56 @@ use crate::row_writer::{self, MultiTableData};
pub const METRIC_NAME_LABEL: &str = "__name__";
pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
pub const DATABASE_LABEL: &str = "__database__";
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
/// special label for selecting database name on remote write
pub const DATABASE_LABEL: &str = "x_greptime_database";
pub const DATABASE_LABEL_BYTES: &[u8] = b"x_greptime_database";
pub const DATABASE_LABEL_ALT: &str = "__database__";
pub const DATABASE_LABEL_ALT_BYTES: &[u8] = b"__database__";
/// deprecated, use DATABASE_LABEL instead
#[deprecated(note = "use DATABASE_LABEL instead")]
pub const SCHEMA_LABEL: &str = "__schema__";
#[deprecated(note = "use DATABASE_LABEL_BYTES instead")]
pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
/// special label for selecting physical table name on remote write
pub const PHYSICAL_TABLE_LABEL: &str = "x_greptime_physical_table";
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"x_greptime_physical_table";
pub const PHYSICAL_TABLE_LABEL_ALT: &str = "__physical_table__";
pub const PHYSICAL_TABLE_LABEL_ALT_BYTES: &[u8] = b"__physical_table__";
/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
pub const FIELD_NAME_LABEL: &str = "__field__";
/// Check if given label is a special label for remote write
#[allow(deprecated)]
pub fn is_remote_write_special_label(label: &str) -> bool {
label == DATABASE_LABEL
|| label == DATABASE_LABEL_ALT
|| label == PHYSICAL_TABLE_LABEL
|| label == PHYSICAL_TABLE_LABEL_ALT
|| label == SCHEMA_LABEL
}
#[allow(deprecated)]
pub fn is_remote_read_special_label(label: &str) -> bool {
label == METRIC_NAME_LABEL
|| label == DATABASE_LABEL
|| label == DATABASE_LABEL_ALT
|| label == SCHEMA_LABEL
}
/// Check if given label is a database selection label
#[allow(deprecated)]
pub fn is_database_selection_label(label: &str) -> bool {
label == DATABASE_LABEL || label == DATABASE_LABEL_ALT || label == SCHEMA_LABEL
}
/// Check if given label is a physical table selection label
pub fn is_physical_table_selection_label(label: &str) -> bool {
label == PHYSICAL_TABLE_LABEL || label == PHYSICAL_TABLE_LABEL_ALT
}
/// Metrics for push gateway protocol
pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
@@ -77,20 +115,12 @@ pub fn table_name(q: &Query) -> Result<String> {
}
/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
/// Prioritizes __schema__ over __database__ labels.
pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
for query in &request.queries {
for matcher in &query.matchers {
if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
return Some(matcher.value.clone());
}
}
}
// If no __schema__ found, look for __database__
for query in &request.queries {
for matcher in &query.matchers {
if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
if is_database_selection_label(&matcher.name)
&& matcher.r#type == MatcherType::Eq as i32
{
return Some(matcher.value.clone());
}
}
@@ -115,7 +145,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
for m in label_matches {
let name = &m.name;
if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
if is_remote_read_special_label(name) {
continue;
}
@@ -546,8 +576,8 @@ pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
let idc3_schema = TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
new_label("__database__".to_string(), "idc3".to_string()),
new_label("__physical_table__".to_string(), "f1".to_string()),
new_label(DATABASE_LABEL.to_string(), "idc3".to_string()),
new_label(PHYSICAL_TABLE_LABEL.to_string(), "f1".to_string()),
],
samples: vec![Sample {
value: 42.0,
@@ -561,8 +591,8 @@ pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
METRIC_NAME_LABEL.to_string(),
"idc4_local_table".to_string(),
),
new_label("__database__".to_string(), "idc4".to_string()),
new_label("__physical_table__".to_string(), "f2".to_string()),
new_label(DATABASE_LABEL.to_string(), "idc4".to_string()),
new_label(PHYSICAL_TABLE_LABEL.to_string(), "f2".to_string()),
],
samples: vec![Sample {
value: 99.0,

View File

@@ -36,7 +36,8 @@ use crate::http::event::PipelineIngestRequest;
use crate::pipeline::run_pipeline;
use crate::prom_row_builder::{PromCtx, TablesBuilder};
use crate::prom_store::{
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES,
DATABASE_LABEL_ALT_BYTES, DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES,
PHYSICAL_TABLE_LABEL_ALT_BYTES, PHYSICAL_TABLE_LABEL_BYTES,
};
use crate::query_handler::PipelineHandlerRef;
use crate::repeated_field::{Clear, RepeatedField};
@@ -201,18 +202,19 @@ impl PromTimeSeries {
self.table_name = prom_validation_mode.decode_string(&label.value)?;
self.labels.truncate(self.labels.len() - 1); // remove last label
}
SCHEMA_LABEL_BYTES => {
#[allow(deprecated)]
crate::prom_store::SCHEMA_LABEL_BYTES => {
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
self.labels.truncate(self.labels.len() - 1); // remove last label
}
DATABASE_LABEL_BYTES => {
DATABASE_LABEL_BYTES | DATABASE_LABEL_ALT_BYTES => {
// Only set schema from __database__ if __schema__ hasn't been set yet
if self.schema.is_none() {
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
}
self.labels.truncate(self.labels.len() - 1); // remove last label
}
PHYSICAL_TABLE_LABEL_BYTES => {
PHYSICAL_TABLE_LABEL_BYTES | PHYSICAL_TABLE_LABEL_ALT_BYTES => {
self.physical_table =
Some(prom_validation_mode.decode_string(&label.value)?);
self.labels.truncate(self.labels.len() - 1); // remove last label