perf(prom): optimize label values query (#5653)

perf: optimize label values query
This commit is contained in:
Weny Xu
2025-03-10 21:20:47 +08:00
committed by GitHub
parent 3811e3f632
commit 0bd322a078
13 changed files with 331 additions and 105 deletions

View File

@@ -238,6 +238,13 @@ pub enum Error {
source: servers::error::Error,
},
#[snafu(display("Failed to create logical plan for prometheus label values query"))]
PrometheusLabelValuesQueryPlan {
#[snafu(implicit)]
location: Location,
source: query::promql::error::Error,
},
#[snafu(display("Failed to describe schema for given statement"))]
DescribeStatement {
#[snafu(implicit)]
@@ -366,6 +373,8 @@ impl ErrorExt for Error {
| Error::PrometheusMetricNamesQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),
Error::PrometheusLabelValuesQueryPlan { source, .. } => source.status_code(),
Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
Error::SqlExecIntercepted { source, .. } => source.status_code(),

View File

@@ -26,6 +26,7 @@ mod region_query;
pub mod standalone;
use std::sync::Arc;
use std::time::SystemTime;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -471,6 +472,21 @@ impl PrometheusHandler for Instance {
.context(ExecuteQuerySnafu)
}
async fn query_label_values(
&self,
metric: String,
label_name: String,
matchers: Vec<Matcher>,
start: SystemTime,
end: SystemTime,
ctx: &QueryContextRef,
) -> server_error::Result<Vec<String>> {
self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)
}
fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
}

View File

@@ -12,20 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::SystemTime;
use catalog::information_schema::TABLES;
use client::OutputData;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_catalog::format_full_table_name;
use common_recordbatch::util;
use common_telemetry::tracing;
use datatypes::prelude::Value;
use promql_parser::label::Matcher;
use promql_parser::label::{Matcher, Matchers};
use query::promql;
use query::promql::planner::PromPlanner;
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{
CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu, Result, TableNotFoundSnafu,
PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
Result, TableNotFoundSnafu,
};
use crate::instance::Instance;
@@ -96,4 +102,77 @@ impl Instance {
Ok(results)
}
/// Handles label values query request, returns the values.
#[tracing::instrument(skip_all)]
pub(crate) async fn handle_query_label_values(
&self,
metric: String,
label_name: String,
matchers: Vec<Matcher>,
start: SystemTime,
end: SystemTime,
ctx: &QueryContextRef,
) -> Result<Vec<String>> {
let table_schema = ctx.current_schema();
let table = self
.catalog_manager
.table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
})?;
let dataframe = self
.query_engine
.read_table(table.clone())
.with_context(|_| ReadTableSnafu {
table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
})?;
let scan_plan = dataframe.into_logical_plan();
let filter_conditions =
PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
.context(PrometheusLabelValuesQueryPlanSnafu)?;
let logical_plan = promql::label_values::rewrite_label_values_query(
table,
scan_plan,
filter_conditions,
label_name,
start,
end,
)
.context(PrometheusLabelValuesQueryPlanSnafu)?;
let results = self
.query_engine
.execute(logical_plan, ctx.clone())
.await
.context(ExecLogicalPlanSnafu)?;
let batches = match results.data {
OutputData::Stream(stream) => util::collect(stream)
.await
.context(CollectRecordbatchSnafu)?,
OutputData::RecordBatches(rbs) => rbs.take(),
_ => unreachable!("should not happen"),
};
let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
for batch in batches {
// Only one column the results, ensured by `prometheus::label_values_matchers_to_plan`.
let names = batch.column(0);
for i in 0..names.len() {
let Value::String(name) = names.get(i) else {
unreachable!();
};
results.push(name.into_string());
}
}
Ok(results)
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use datafusion::dataframe::DataFrame as DfDataFrame;
use datafusion_expr::LogicalPlan;
/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a Pandas DataFrame or Spark DataFrame
@@ -20,3 +21,11 @@ use datafusion::dataframe::DataFrame as DfDataFrame;
pub enum DataFrame {
DataFusion(DfDataFrame),
}
impl DataFrame {
pub fn into_logical_plan(self) -> LogicalPlan {
match self {
Self::DataFusion(dataframe) => dataframe.into_parts().1,
}
}
}

View File

@@ -188,7 +188,7 @@ impl QueryLanguageParser {
Ok(QueryStatement::Promql(eval_stmt))
}
fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
// try rfc3339 format
let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
.context(ParseTimestampSnafu { raw: timestamp })

View File

@@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod error;
pub mod error;
pub mod label_values;
pub mod planner;

View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{SystemTime, UNIX_EPOCH};
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::prelude::ConcreteDataType;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
time_index_expr
.clone()
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(start),
None,
)))
.and(
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(end),
None,
))),
)
}
/// Rewrite label values query to DataFusion logical plan.
pub fn rewrite_label_values_query(
table: TableRef,
mut scan_plan: LogicalPlan,
mut conditions: Vec<Expr>,
label_name: String,
start: SystemTime,
end: SystemTime,
) -> Result<LogicalPlan> {
let table_ref = TableReference::partial(
table.table_info().schema_name.as_str(),
table.table_info().name.as_str(),
);
let schema = table.schema();
let ts_column = schema
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let is_time_index_ms =
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr = vec![
col(Column::from_name(label_name.clone())),
Expr::Alias(Alias {
expr: Box::new(Expr::Cast(Cast {
expr: Box::new(time_index_expr.clone()),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref),
name: ts_column.name.clone(),
}),
];
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
};
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
conditions.push(build_time_filter(time_index_expr, start, end));
// Safety: `conditions` is not empty.
let filter = conjunction(conditions).unwrap();
// Builds time filter
let logical_plan = LogicalPlanBuilder::from(scan_plan)
.filter(filter)
.context(DataFusionPlanningSnafu)?
.project(vec![col(Column::from_name(label_name))])
.context(DataFusionPlanningSnafu)?
.distinct()
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
Ok(logical_plan)
}

View File

@@ -939,7 +939,7 @@ impl PromPlanner {
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
let mut scan_filters = self.matchers_to_expr(label_matchers.clone(), table_schema)?;
let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
@@ -1135,8 +1135,7 @@ impl PromPlanner {
}
// TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
fn matchers_to_expr(
&self,
pub fn matchers_to_expr(
label_matchers: Matchers,
table_schema: &DFSchemaRef,
) -> Result<Vec<DfExpr>> {

View File

@@ -410,6 +410,15 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to parse timestamp: {}", timestamp))]
ParseTimestamp {
timestamp: String,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: query::error::Error,
},
#[snafu(display("{}", reason))]
UnexpectedResult {
reason: String,
@@ -685,7 +694,8 @@ impl ErrorExt for Error {
| PrepareStatementNotFound { .. }
| FailedToParseQuery { .. }
| InvalidElasticsearchInput { .. }
| InvalidJaegerQuery { .. } => StatusCode::InvalidArguments,
| InvalidJaegerQuery { .. }
| ParseTimestamp { .. } => StatusCode::InvalidArguments,
Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),

View File

@@ -29,7 +29,7 @@ use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use common_version::OwnedBuildInfo;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector};
use datatypes::vectors::Float64Vector;
use futures::future::join_all;
use futures::StreamExt;
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
@@ -38,7 +38,7 @@ use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
UnaryExpr, VectorSelector,
};
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
use query::promql::planner::normalize_matcher;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
@@ -51,8 +51,8 @@ use store_api::metric_engine_consts::{
pub use super::result::prometheus_resp::PrometheusJsonResponse;
use crate::error::{
CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, TableNotFoundSnafu,
UnexpectedResultSnafu,
CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
TableNotFoundSnafu, UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
@@ -994,44 +994,58 @@ pub async fn label_values_query(
let start = params.start.unwrap_or_else(yesterday_rfc3339);
let end = params.end.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
let mut label_values = HashSet::new();
let mut merge_map = HashMap::new();
let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
.context(ParseTimestampSnafu { timestamp: &start }));
let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
.context(ParseTimestampSnafu { timestamp: &end }));
for query in queries {
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
let PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) = promql_expr else {
return PrometheusJsonResponse::error(
StatusCode::InvalidArguments,
"expected vector selector",
);
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) =
retrieve_label_values(result, &label_name, &mut label_values, &mut merge_map).await
{
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
&& err.status_code() != StatusCode::TableColumnNotFound
{
return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
let Some(name) = name else {
return PrometheusJsonResponse::error(
StatusCode::InvalidArguments,
"expected metric name",
);
};
// Only use and filter matchers.
let matchers = matchers.matchers;
let result = handler
.query_label_values(
name,
label_name.to_string(),
matchers,
start,
end,
&query_ctx,
)
.await;
match result {
Ok(result) => {
label_values.extend(result.into_iter());
}
Err(err) => {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
&& err.status_code() != StatusCode::TableColumnNotFound
{
return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
}
}
}
}
let merge_map = merge_map
.into_iter()
.map(|(k, v)| (k, Value::from(v)))
.collect();
let mut label_values: Vec<_> = label_values.into_iter().collect();
label_values.sort_unstable();
let mut resp = PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values));
resp.resp_metrics = merge_map;
resp
PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
}
async fn retrieve_field_names(
@@ -1076,71 +1090,6 @@ async fn retrieve_field_names(
Ok(field_columns)
}
async fn retrieve_label_values(
result: Result<Output>,
label_name: &str,
labels_values: &mut HashSet<String>,
metrics: &mut HashMap<String, u64>,
) -> Result<()> {
let result = result?;
match result.data {
OutputData::RecordBatches(batches) => {
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
OutputData::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
OutputData::AffectedRows(_) => UnexpectedResultSnafu {
reason: "expected data result, but got affected rows".to_string(),
}
.fail(),
}?;
if let Some(ref plan) = result.meta.plan {
collect_plan_metrics(plan, &mut [metrics]);
}
Ok(())
}
async fn retrieve_label_values_from_record_batch(
batches: RecordBatches,
label_name: &str,
labels_values: &mut HashSet<String>,
) -> Result<()> {
let Some(label_col_idx) = batches.schema().column_index_by_name(label_name) else {
return Ok(());
};
// check whether label_name belongs to tag column
match batches
.schema()
.column_schema_by_name(label_name)
.unwrap()
.data_type
{
ConcreteDataType::String(_) => {}
_ => return Ok(()),
}
for batch in batches.iter() {
let label_column = batch
.column(label_col_idx)
.as_any()
.downcast_ref::<StringVector>()
.unwrap();
for row_index in 0..batch.num_rows() {
if let Some(label_value) = label_column.get_data(row_index) {
let _ = labels_values.insert(label_value.to_string());
}
}
}
Ok(())
}
/// Try to parse and extract the name of referenced metric from the promql query.
///
/// Returns the metric name if a single metric is referenced, otherwise None.

View File

@@ -15,6 +15,7 @@
//! prom supply the prometheus HTTP API Server compliance
use std::sync::Arc;
use std::time::SystemTime;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
@@ -40,5 +41,15 @@ pub trait PrometheusHandler {
ctx: &QueryContextRef,
) -> Result<Vec<String>>;
async fn query_label_values(
&self,
metric: String,
label_name: String,
matchers: Vec<Matcher>,
start: SystemTime,
end: SystemTime,
ctx: &QueryContextRef,
) -> Result<Vec<String>>;
fn catalog_manager(&self) -> CatalogManagerRef;
}

View File

@@ -465,6 +465,7 @@ pub async fn setup_test_prom_app_with_frontend(
run_sql(sql, &instance).await;
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
// insert rows
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
run_sql(sql, &instance).await;
@@ -472,6 +473,14 @@ pub async fn setup_test_prom_app_with_frontend(
"INSERT INTO demo_metrics(idc, val, ts) VALUES ('idc1', 1.1, 0), ('idc2', 2.1, 600000)";
run_sql(sql, &instance).await;
// build physical table
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE demo_metrics_with_nanos(ts timestamp(9) time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy2')";
run_sql(sql, &instance).await;
let sql = "INSERT INTO demo_metrics_with_nanos(idc, val, ts) VALUES ('idc1', 1.1, 0)";
run_sql(sql, &instance).await;
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()

View File

@@ -666,6 +666,32 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// single match[]
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=300")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1"])).unwrap()
);
// single match[]
let res = client
.get("/v1/prometheus/api/v1/label/idc/values?match[]=demo_metrics_with_nanos&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["idc1"])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
@@ -745,6 +771,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
PrometheusResponse::Labels(vec![
"demo".to_string(),
"demo_metrics".to_string(),
"demo_metrics_with_nanos".to_string(),
"logic_table".to_string(),
"numbers".to_string()
])