fix: remove metric engine's internal column from promql's query (#5032)

* fix: remove metric engine's internal column from promql's query

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unwrap

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out physical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-01-26 11:10:59 +08:00
committed by GitHub
parent 4111c18d44
commit f165bfb0af
4 changed files with 84 additions and 23 deletions

View File

@@ -258,7 +258,10 @@ impl Stream for SeriesDivideStream {
let timer = std::time::Instant::now();
loop {
if !self.buffer.is_empty() {
let cut_at = self.find_first_diff_row();
let cut_at = match self.find_first_diff_row() {
Ok(cut_at) => cut_at,
Err(e) => return Poll::Ready(Some(Err(e))),
};
if let Some((batch_index, row_index)) = cut_at {
// slice out the first time series and return it.
let half_batch_of_first_series =
@@ -322,10 +325,10 @@ impl SeriesDivideStream {
/// Return the position to cut buffer.
/// None implies the current buffer only contains one time series.
fn find_first_diff_row(&mut self) -> Option<(usize, usize)> {
fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
return None;
return Ok(None);
}
let mut resumed_batch_index = self.inspect_start;
@@ -341,18 +344,26 @@ impl SeriesDivideStream {
for index in &self.tag_indices {
let current_array = batch.column(*index);
let last_array = last_batch.column(*index);
let current_value = current_array
let current_string_array = current_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
let last_value = last_array
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
let last_string_array = last_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(last_row);
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
let current_value = current_string_array.value(0);
let last_value = last_string_array.value(last_row);
if current_value != last_value {
return Some((resumed_batch_index, 0));
return Ok(Some((resumed_batch_index, 0)));
}
}
}
@@ -360,7 +371,15 @@ impl SeriesDivideStream {
// check column by column
for index in &self.tag_indices {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
let string_array =
array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(
"Failed to downcast tag column to StringArray".to_string(),
)
})?;
// the first row number that not equal to the next row.
let mut same_until = 0;
while same_until < num_rows - 1 {
@@ -376,12 +395,12 @@ impl SeriesDivideStream {
// all rows are the same, inspect next batch
resumed_batch_index += 1;
} else {
return Some((resumed_batch_index, result_index));
return Ok(Some((resumed_batch_index, result_index)));
}
}
self.inspect_start = resumed_batch_index;
None
Ok(None)
}
}

View File

@@ -63,6 +63,9 @@ use promql_parser::parser::{
VectorMatchCardinality, VectorSelector,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use table::table::adapter::DfTableProviderAdapter;
use crate::promql::error::{
@@ -1146,6 +1149,10 @@ impl PromPlanner {
.table_info()
.meta
.row_key_column_names()
.filter(|col| {
// remove metric engine's internal columns
col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
})
.cloned()
.collect();
self.ctx.tag_columns = tags;

View File

@@ -45,7 +45,7 @@ use serde_json::Value;
use session::context::{QueryContext, QueryContextRef};
use snafu::{Location, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
pub use super::result::prometheus_resp::PrometheusJsonResponse;
@@ -941,16 +941,30 @@ pub async fn label_values_query(
.start_timer();
if label_name == METRIC_NAME_LABEL {
let mut table_names = match handler
.catalog_manager()
.table_names(&catalog, &schema, Some(&query_ctx))
.await
{
Ok(table_names) => table_names,
Err(e) => {
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
let catalog_manager = handler.catalog_manager();
let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
let mut table_names = Vec::new();
while let Some(table) = tables_stream.next().await {
// filter out physical tables
match table {
Ok(table) => {
if table
.table_info()
.meta
.options
.extra_options
.contains_key(PHYSICAL_TABLE_METADATA_KEY)
{
continue;
}
table_names.push(table.table_info().name.clone());
}
Err(e) => {
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
}
}
};
}
table_names.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
} else if label_name == FIELD_NAME_LABEL {

View File

@@ -704,6 +704,18 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert!(prom_resp.error_type.is_none());
// query `__name__` without match[]
// create a physical table and a logical table
let res = client
.get("/v1/sql?sql=create table physical_table (`ts` timestamp time index, message string) with ('physical_metric_table' = 'true');")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let res = client
.get("/v1/sql?sql=create table logic_table (`ts` timestamp time index, message string) with ('on_physical_table' = 'physical_table');")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
// query `__name__`
let res = client
.get("/v1/prometheus/api/v1/label/__name__/values")
.send()
@@ -713,6 +725,15 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(prom_resp.status, "success");
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());
assert_eq!(
prom_resp.data,
PrometheusResponse::Labels(vec![
"demo".to_string(),
"demo_metrics".to_string(),
"logic_table".to_string(),
"numbers".to_string()
])
);
// buildinfo
let res = client