fix: infer time index from column meta on derived table (#8013)

* rough fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reorganize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* enhance default by infer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* supply comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-12 01:18:46 -07:00
committed by GitHub
parent d709fd29ef
commit 0d90f7407c
5 changed files with 323 additions and 39 deletions

View File

@@ -35,6 +35,7 @@ use datafusion_expr::{
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::TIME_INDEX_KEY;
use promql_parser::util::parse_duration;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
@@ -376,7 +377,7 @@ impl RangePlanRewriter {
}
.fail();
};
let (time_index, default_by) = self.get_index_by(input.schema()).await?;
let query_ctx = self.query_ctx.clone();
let mut range_rewriter = RangeExprRewriter {
input_plan: &input,
align: Duration::default(),
@@ -384,13 +385,16 @@ impl RangePlanRewriter {
by: vec![],
range_fn: BTreeSet::new(),
sub_aggr: aggr_plan,
query_ctx: &self.query_ctx,
query_ctx: &query_ctx,
};
let new_expr = expr
.iter()
.map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
.collect::<DFResult<Vec<_>>>()?;
if range_rewriter.by.is_empty() {
let need_default_by = range_rewriter.by.is_empty();
let (time_index, default_by) =
self.get_index_by(input.schema(), need_default_by).await?;
if need_default_by {
range_rewriter.by = default_by;
}
let range_select = RangeSelect::try_new(
@@ -481,25 +485,58 @@ impl RangePlanRewriter {
}
}
/// this function use to find the time_index column and row columns from input schema,
/// return `(time_index, [row_columns])` to the rewriter.
/// If the user does not explicitly use the `by` keyword to indicate time series,
/// `[row_columns]` will be use as default time series
async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
/// Finds the time index column and default row-key grouping from the input schema.
///
/// Returns `(time_index, [row_columns])` to the rewriter. If the user omits `BY`,
/// `[row_columns]` is used as the default time-series grouping.
///
/// For derived inputs such as subqueries, joins, or set operations, the source table
/// qualifier may no longer resolve back to a table provider. In that case we can still
/// recover the time index from column metadata, but we cannot safely reconstruct the
/// original row-key columns, so omitted `BY` must be rejected by the caller.
async fn get_index_by(
&mut self,
schema: &Arc<DFSchema>,
need_default_by: bool,
) -> Result<(Expr, Vec<Expr>)> {
#[allow(deprecated)]
let mut time_index_expr = Expr::Wildcard {
qualifier: None,
options: Box::new(WildcardOptions::default()),
};
let mut default_by = vec![];
let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| {
let (qualifier, field) = schema.qualified_field(i);
if field.metadata().contains_key(TIME_INDEX_KEY)
&& matches!(field.data_type(), DataType::Timestamp(_, _))
{
Some(Expr::Column(Column::new(
qualifier.cloned(),
field.name().clone(),
)))
} else {
None
}
});
for i in 0..schema.fields().len() {
let (qualifier, _) = schema.qualified_field(i);
if let Some(table_ref) = qualifier {
let table = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?
let table_source = match self.table_provider.resolve_table(table_ref.clone()).await
{
Ok(table_source) => table_source,
Err(error) => {
// `TableNotExist` here usually means the qualifier now refers to a derived
// input instead of a base table. We can still salvage the time index from
// field metadata, but only when such metadata is present.
if matches!(&error, catalog::error::Error::TableNotExist { .. })
&& metadata_time_index_expr.is_some()
{
continue;
}
return Err(error).context(CatalogSnafu);
}
};
let table = table_source
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
@@ -537,6 +574,22 @@ impl RangePlanRewriter {
}
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. })
&& let Some(expr) = metadata_time_index_expr
{
common_telemetry::debug!(
"Range query falling back to time-index metadata for derived input schema: {}",
schema
);
ensure!(
!need_default_by,
RangeQuerySnafu {
msg: "Cannot infer default BY columns from derived range query input"
}
);
time_index_expr = expr;
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. }) {
TimeIndexNotFoundSnafu {
table: schema.to_string(),
@@ -614,6 +667,7 @@ mod test {
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table::TableRef;
use table::test_util::EmptyTable;
use super::*;
@@ -622,7 +676,45 @@ mod test {
use crate::{QueryEngineFactory, QueryEngineRef};
async fn create_test_engine() -> QueryEngineRef {
let table_name = "test".to_string();
create_test_engine_with_tables(&["test"], false).await
}
async fn create_union_test_engine() -> QueryEngineRef {
create_test_engine_with_tables(&["test_0", "test_1"], true).await
}
async fn create_test_engine_with_tables(
table_names: &[&str],
with_extra_timestamp: bool,
) -> QueryEngineRef {
let catalog_list = MemoryCatalogManager::with_default_setup();
for (i, table_name) in table_names.iter().enumerate() {
let table = create_test_table(table_name, with_extra_timestamp);
assert!(
catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: (*table_name).to_string(),
table_id: 1024 + i as u32,
table,
})
.is_ok()
);
}
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
}
fn create_test_table(table_name: &str, with_extra_timestamp: bool) -> TableRef {
let mut columns = vec![];
for i in 0..5 {
columns.push(ColumnSchema::new(
@@ -639,6 +731,13 @@ mod test {
)
.with_time_index(true),
);
if with_extra_timestamp {
columns.push(ColumnSchema::new(
"timestamp_2".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
));
}
for i in 0..5 {
columns.push(ColumnSchema::new(
format!("field_{i}"),
@@ -650,38 +749,20 @@ mod test {
let table_meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices((0..5).collect())
.value_indices((6..11).collect())
.value_indices(if with_extra_timestamp {
(6..12).collect()
} else {
(6..11).collect()
})
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name)
.name(table_name)
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(
catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.is_ok()
);
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
EmptyTable::from_table_info(&table_info)
}
async fn do_query(sql: &str) -> Result<LogicalPlan> {
@@ -690,6 +771,12 @@ mod test {
engine.planner().plan(&stmt, QueryContext::arc()).await
}
async fn do_union_query(sql: &str) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let engine = create_union_test_engine().await;
engine.planner().plan(&stmt, QueryContext::arc()).await
}
async fn query_plan_compare(sql: &str, expected: String) {
let plan = do_query(sql).await.unwrap();
assert_eq!(plan.display_indent_schema().to_string(), expected);
@@ -765,6 +852,69 @@ mod test {
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_from_union_query() {
let queries = [
r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
)
WHERE timestamp >= '1970-01-01 00:00:00'
ALIGN '1h' by (tag_0)"#,
r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
) AS tmp
WHERE tmp.timestamp >= '1970-01-01 00:00:00'
ALIGN '1h' by (tmp.tag_0)"#,
];
for query in queries {
let plan = do_union_query(query)
.await
.unwrap()
.display_indent_schema()
.to_string();
assert!(plan.contains("RangeSelect"));
assert!(plan.contains("Union"));
assert!(plan.contains("time_index=timestamp"));
}
}
#[tokio::test]
async fn range_from_derived_query_without_by_err() {
let queries = [
r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
)
WHERE timestamp >= '1970-01-01 00:00:00'
ALIGN '1h'"#,
r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
) AS tmp
WHERE tmp.timestamp >= '1970-01-01 00:00:00'
ALIGN '1h'"#,
];
for query in queries {
assert_eq!(
do_union_query(query).await.unwrap_err().to_string(),
"Range Query: Cannot infer default BY columns from derived range query input"
);
}
}
#[tokio::test]
async fn range_in_expr() {
let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;

View File

@@ -60,6 +60,12 @@ SELECT min(val) RANGE '5s' FILL PREV FROM host;
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: ALIGN argument cannot be omitted in the range select query
SELECT tmp.ts, tmp.host, min(tmp.val) RANGE '5s'
FROM (SELECT ts, host, val FROM host) AS tmp
ALIGN '5s';
Error: 2000(InvalidSyntax), Range Query: Cannot infer default BY columns from derived range query input
-- 2.3 type mismatch
SELECT covar(ceil(val), floor(val)) RANGE '20s' FROM host ALIGN '10s';

View File

@@ -42,6 +42,10 @@ SELECT min(val) RANGE '5s' FROM host;
SELECT min(val) RANGE '5s' FILL PREV FROM host;
SELECT tmp.ts, tmp.host, min(tmp.val) RANGE '5s'
FROM (SELECT ts, host, val FROM host) AS tmp
ALIGN '5s';
-- 2.3 type mismatch
SELECT covar(ceil(val), floor(val)) RANGE '20s' FROM host ALIGN '10s';

View File

@@ -45,6 +45,82 @@ SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host W
| 1970-01-01T00:00:20 | host1 | 2 |
+---------------------+-------+-----------------+
CREATE TABLE host_union_0 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
Affected Rows: 0
CREATE TABLE host_union_1 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
Affected Rows: 0
INSERT INTO TABLE host_union_0 VALUES
(0, 'host1', 3, 0),
(5000, 'host1', 2, 5000),
(10000, 'host1', 1, 10000);
Affected Rows: 3
INSERT INTO TABLE host_union_1 VALUES
(0, 'host1', 6, 0),
(5000, 'host1', 5, 5000),
(10000, 'host1', 4, 10000);
Affected Rows: 3
SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
)
WHERE ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (host)
ORDER BY host, ts;
+---------------------+-------+------------------------------------------------+
| ts | host | min(val) ORDER BY [ts ASC NULLS LAST] RANGE 5s |
+---------------------+-------+------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 3 |
| 1970-01-01T00:00:05 | host1 | 2 |
| 1970-01-01T00:00:10 | host1 | 1 |
+---------------------+-------+------------------------------------------------+
SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
) AS tmp
WHERE tmp.ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (tmp.host)
ORDER BY tmp.host, tmp.ts;
+---------------------+-------+--------------------------------------------------------+
| ts | host | min(tmp.val) ORDER BY [tmp.ts ASC NULLS LAST] RANGE 5s |
+---------------------+-------+--------------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 3 |
| 1970-01-01T00:00:05 | host1 | 2 |
| 1970-01-01T00:00:10 | host1 | 1 |
+---------------------+-------+--------------------------------------------------------+
DROP TABLE host_union_0;
Affected Rows: 0
DROP TABLE host_union_1;
Affected Rows: 0
-- Test EXPLAIN and ANALYZE
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _

View File

@@ -22,6 +22,54 @@ SELECT ts, host, foo FROM (SELECT ts, host, min(val) RANGE '5s' AS foo FROM host
SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE host = 'host1') ALIGN '5s' BY (b) ORDER BY b, ts;
CREATE TABLE host_union_0 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
CREATE TABLE host_union_1 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
INSERT INTO TABLE host_union_0 VALUES
(0, 'host1', 3, 0),
(5000, 'host1', 2, 5000),
(10000, 'host1', 1, 10000);
INSERT INTO TABLE host_union_1 VALUES
(0, 'host1', 6, 0),
(5000, 'host1', 5, 5000),
(10000, 'host1', 4, 10000);
SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
)
WHERE ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (host)
ORDER BY host, ts;
SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
) AS tmp
WHERE tmp.ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (tmp.host)
ORDER BY tmp.host, tmp.ts;
DROP TABLE host_union_0;
DROP TABLE host_union_1;
-- Test EXPLAIN and ANALYZE