fix: stream tables for prometheus label discovery (#8341)

Signed-off-by: Ritwij Aryan Parmar <ritwij.aryan.parmar@gmail.com>
This commit is contained in:
Ritwij Aryan Parmar
2026-06-25 04:41:12 -04:00
committed by GitHub
parent 9b1672316c
commit a0fcfd2110

View File

@@ -43,8 +43,8 @@ use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::types::jsonb_to_string;
use futures::StreamExt;
use futures::future::join_all;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
use promql_parser::parser::token::{self};
@@ -624,13 +624,9 @@ async fn get_all_column_names(
schema: &str,
manager: &CatalogManagerRef,
) -> std::result::Result<HashSet<String>, catalog::error::Error> {
let table_names = manager.table_names(catalog, schema, None).await?;
let mut labels = HashSet::new();
for table_name in table_names {
let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
continue;
};
let mut tables = manager.tables(catalog, schema, None);
while let Some(table) = tables.try_next().await? {
for column in table.primary_key_columns() {
if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
&& column.name != DATA_SCHEMA_TSID_COLUMN_NAME
@@ -1683,7 +1679,16 @@ pub async fn parse_query(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use catalog::memory::MemoryCatalogManager;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use promql_parser::parser::value::ValueType;
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::test_util::EmptyTable;
use super::*;
@@ -1895,4 +1900,52 @@ mod tests {
}
}
}
#[tokio::test]
async fn test_get_all_column_names_uses_tag_columns() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("region", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("value", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::uint64_datatype(),
true,
),
]));
let meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![1, 2, 4])
.engine("metric".to_string())
.next_column_id(5)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.table_id(1024)
.table_version(0 as TableVersion)
.name("cpu_usage")
.catalog_name(DEFAULT_CATALOG_NAME)
.schema_name(DEFAULT_SCHEMA_NAME)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap();
let manager: CatalogManagerRef =
MemoryCatalogManager::new_with_table(EmptyTable::from_table_info(&table_info));
let labels = get_all_column_names(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, &manager)
.await
.unwrap();
assert_eq!(
labels,
HashSet::from(["host".to_string(), "region".to_string()])
);
}
}