rough fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-22 05:45:04 +08:00
parent 80c395ee23
commit 2c86309dee
3 changed files with 249 additions and 40 deletions

View File

@@ -35,14 +35,13 @@ use datafusion_expr::{
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::TIME_INDEX_KEY;
use promql_parser::util::parse_duration;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ensure};
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
};
use crate::error::{RangeQuerySnafu, Result, TimeIndexNotFoundSnafu};
use crate::plan::ExtractExpr;
use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
@@ -495,19 +494,23 @@ impl RangePlanRewriter {
for i in 0..schema.fields().len() {
let (qualifier, _) = schema.qualified_field(i);
if let Some(table_ref) = qualifier {
let table = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
let Ok(table_source) = self.table_provider.resolve_table(table_ref.clone()).await
else {
continue;
};
let Some(default_table_source) =
table_source.as_any().downcast_ref::<DefaultTableSource>()
else {
continue;
};
let Some(adapter) = default_table_source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table();
else {
continue;
};
let table = adapter.table();
let schema = table.schema();
let time_index_column =
schema
@@ -537,6 +540,20 @@ impl RangePlanRewriter {
}
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. }) {
for i in 0..schema.fields().len() {
let (qualifier, field) = schema.qualified_field(i);
if field.metadata().contains_key(TIME_INDEX_KEY)
&& matches!(field.data_type(), DataType::Timestamp(_, _))
{
default_by = vec![1.lit()];
time_index_expr =
Expr::Column(Column::new(qualifier.cloned(), field.name().clone()));
break;
}
}
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. }) {
TimeIndexNotFoundSnafu {
table: schema.to_string(),
@@ -614,6 +631,7 @@ mod test {
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table::TableRef;
use table::test_util::EmptyTable;
use super::*;
@@ -622,7 +640,45 @@ mod test {
use crate::{QueryEngineFactory, QueryEngineRef};
async fn create_test_engine() -> QueryEngineRef {
let table_name = "test".to_string();
create_test_engine_with_tables(&["test"], false).await
}
async fn create_union_test_engine() -> QueryEngineRef {
create_test_engine_with_tables(&["test_0", "test_1"], true).await
}
async fn create_test_engine_with_tables(
table_names: &[&str],
with_extra_timestamp: bool,
) -> QueryEngineRef {
let catalog_list = MemoryCatalogManager::with_default_setup();
for (i, table_name) in table_names.iter().enumerate() {
let table = create_test_table(table_name, with_extra_timestamp);
assert!(
catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: (*table_name).to_string(),
table_id: 1024 + i as u32,
table,
})
.is_ok()
);
}
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
}
fn create_test_table(table_name: &str, with_extra_timestamp: bool) -> TableRef {
let mut columns = vec![];
for i in 0..5 {
columns.push(ColumnSchema::new(
@@ -639,6 +695,13 @@ mod test {
)
.with_time_index(true),
);
if with_extra_timestamp {
columns.push(ColumnSchema::new(
"timestamp_2".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
));
}
for i in 0..5 {
columns.push(ColumnSchema::new(
format!("field_{i}"),
@@ -650,38 +713,20 @@ mod test {
let table_meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices((0..5).collect())
.value_indices((6..11).collect())
.value_indices(if with_extra_timestamp {
(6..12).collect()
} else {
(6..11).collect()
})
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name)
.name(table_name)
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(
catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.is_ok()
);
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
EmptyTable::from_table_info(&table_info)
}
async fn do_query(sql: &str) -> Result<LogicalPlan> {
@@ -690,6 +735,12 @@ mod test {
engine.planner().plan(&stmt, QueryContext::arc()).await
}
async fn do_union_query(sql: &str) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let engine = create_union_test_engine().await;
engine.planner().plan(&stmt, QueryContext::arc()).await
}
async fn query_plan_compare(sql: &str, expected: String) {
let plan = do_query(sql).await.unwrap();
assert_eq!(plan.display_indent_schema().to_string(), expected);
@@ -765,6 +816,40 @@ mod test {
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_from_union_query() {
let queries = [
r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
)
WHERE timestamp >= '1970-01-01 00:00:00'
ALIGN '1h' by (tag_0)"#,
r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
) AS tmp
WHERE tmp.timestamp >= '1970-01-01 00:00:00'
ALIGN '1h' by (tmp.tag_0)"#,
];
for query in queries {
let plan = do_union_query(query)
.await
.unwrap()
.display_indent_schema()
.to_string();
assert!(plan.contains("RangeSelect"));
assert!(plan.contains("Union"));
assert!(plan.contains("time_index=timestamp"));
}
}
#[tokio::test]
async fn range_in_expr() {
let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;

View File

@@ -45,6 +45,82 @@ SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host W
| 1970-01-01T00:00:20 | host1 | 2 |
+---------------------+-------+-----------------+
CREATE TABLE host_union_0 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
Affected Rows: 0
CREATE TABLE host_union_1 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
Affected Rows: 0
INSERT INTO TABLE host_union_0 VALUES
(0, 'host1', 3, 0),
(5000, 'host1', 2, 5000),
(10000, 'host1', 1, 10000);
Affected Rows: 3
INSERT INTO TABLE host_union_1 VALUES
(0, 'host1', 6, 0),
(5000, 'host1', 5, 5000),
(10000, 'host1', 4, 10000);
Affected Rows: 3
SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
)
WHERE ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (host)
ORDER BY host, ts;
+---------------------+-------+------------------------------------------------+
| ts | host | min(val) ORDER BY [ts ASC NULLS LAST] RANGE 5s |
+---------------------+-------+------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 3 |
| 1970-01-01T00:00:05 | host1 | 2 |
| 1970-01-01T00:00:10 | host1 | 1 |
+---------------------+-------+------------------------------------------------+
SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
) AS tmp
WHERE tmp.ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (tmp.host)
ORDER BY tmp.host, tmp.ts;
+---------------------+-------+--------------------------------------------------------+
| ts | host | min(tmp.val) ORDER BY [tmp.ts ASC NULLS LAST] RANGE 5s |
+---------------------+-------+--------------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 3 |
| 1970-01-01T00:00:05 | host1 | 2 |
| 1970-01-01T00:00:10 | host1 | 1 |
+---------------------+-------+--------------------------------------------------------+
DROP TABLE host_union_0;
Affected Rows: 0
DROP TABLE host_union_1;
Affected Rows: 0
-- Test EXPLAIN and ANALYZE
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _

View File

@@ -22,6 +22,54 @@ SELECT ts, host, foo FROM (SELECT ts, host, min(val) RANGE '5s' AS foo FROM host
SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE host = 'host1') ALIGN '5s' BY (b) ORDER BY b, ts;
CREATE TABLE host_union_0 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
CREATE TABLE host_union_1 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);
INSERT INTO TABLE host_union_0 VALUES
(0, 'host1', 3, 0),
(5000, 'host1', 2, 5000),
(10000, 'host1', 1, 10000);
INSERT INTO TABLE host_union_1 VALUES
(0, 'host1', 6, 0),
(5000, 'host1', 5, 5000),
(10000, 'host1', 4, 10000);
SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
)
WHERE ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (host)
ORDER BY host, ts;
SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
) AS tmp
WHERE tmp.ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (tmp.host)
ORDER BY tmp.host, tmp.ts;
DROP TABLE host_union_0;
DROP TABLE host_union_1;
-- Test EXPLAIN and ANALYZE