fix: incorrect timestamp index inference (#7530)

* add sqlness case, but can't reproduce

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reproduction

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wildcard rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-07 19:18:25 +08:00
committed by GitHub
parent ef6dd5b99f
commit fce1687fa7
5 changed files with 502 additions and 1 deletions

View File

@@ -162,7 +162,7 @@ impl TreeNodeVisitor<'_> for TimeIndexFinder {
{
let table_info = adapter.table().table_info();
self.table_alias
.get_or_insert(TableReference::bare(table_info.name.clone()));
.get_or_insert(table_scan.table_name.clone());
self.time_index_col = table_info
.meta
.schema
@@ -196,9 +196,21 @@ impl TimeIndexFinder {
mod test {
use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::functions_aggregate::count::count_all;
use datafusion_common::Column;
use datafusion_expr::LogicalPlanBuilder;
use datafusion_sql::TableReference;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::data_source::DataSource;
use store_api::storage::ScanRequest;
use table::metadata::{FilterPushDownType, TableInfoBuilder, TableMetaBuilder, TableType};
use table::table::numbers::NumbersTable;
use table::{Table, TableRef};
use super::*;
@@ -224,4 +236,160 @@ mod test {
assert_eq!(finder.table_alias, Some(TableReference::bare("FgHiJ")));
assert!(finder.time_index_col.is_none());
}
#[test]
fn bare_table_name_time_index() {
let table_ref = TableReference::bare("multi_partitioned_test_1");
let table =
build_time_index_table("multi_partitioned_test_1", "public", DEFAULT_CATALOG_NAME);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table),
)));
let plan =
LogicalPlanBuilder::scan_with_filters(table_ref.clone(), table_source, None, vec![])
.unwrap()
.aggregate(Vec::<Expr>::new(), vec![count_all()])
.unwrap()
.build()
.unwrap();
let time_index = CountWildcardToTimeIndexRule::try_find_time_index_col(&plan);
assert_eq!(
time_index,
Some(Column::new(Some(table_ref), "greptime_timestamp"))
);
}
#[test]
fn schema_qualified_table_name_time_index() {
let table_ref = TableReference::partial("telemetry_events", "multi_partitioned_test_1");
let table = build_time_index_table(
"multi_partitioned_test_1",
"telemetry_events",
DEFAULT_CATALOG_NAME,
);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table),
)));
let plan =
LogicalPlanBuilder::scan_with_filters(table_ref.clone(), table_source, None, vec![])
.unwrap()
.aggregate(Vec::<Expr>::new(), vec![count_all()])
.unwrap()
.build()
.unwrap();
let time_index = CountWildcardToTimeIndexRule::try_find_time_index_col(&plan);
assert_eq!(
time_index,
Some(Column::new(Some(table_ref), "greptime_timestamp"))
);
}
#[test]
fn fully_qualified_table_name_time_index() {
let table_ref = TableReference::full(
"telemetry_catalog",
"telemetry_events",
"multi_partitioned_test_1",
);
let table = build_time_index_table(
"multi_partitioned_test_1",
"telemetry_events",
"telemetry_catalog",
);
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table),
)));
let plan =
LogicalPlanBuilder::scan_with_filters(table_ref.clone(), table_source, None, vec![])
.unwrap()
.aggregate(Vec::<Expr>::new(), vec![count_all()])
.unwrap()
.build()
.unwrap();
let time_index = CountWildcardToTimeIndexRule::try_find_time_index_col(&plan);
assert_eq!(
time_index,
Some(Column::new(Some(table_ref), "greptime_timestamp"))
);
}
fn build_time_index_table(table_name: &str, schema_name: &str, catalog_name: &str) -> TableRef {
let column_schemas = vec![
ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
)
.with_time_index(true),
];
let schema = SchemaBuilder::try_from_columns(column_schemas)
.unwrap()
.build()
.unwrap();
let meta = TableMetaBuilder::new_external_table()
.schema(Arc::new(schema))
.next_column_id(1)
.build()
.unwrap();
let info = TableInfoBuilder::new(table_name.to_string(), meta)
.table_id(1)
.table_version(0)
.catalog_name(catalog_name)
.schema_name(schema_name)
.table_type(TableType::Base)
.build()
.unwrap();
let data_source = Arc::new(DummyDataSource);
Arc::new(Table::new(
Arc::new(info),
FilterPushDownType::Unsupported,
data_source,
))
}
struct DummyDataSource;
impl DataSource for DummyDataSource {
fn get_stream(
&self,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
Err(BoxedError::new(DummyDataSourceError))
}
}
#[derive(Debug)]
struct DummyDataSourceError;
impl std::fmt::Display for DummyDataSourceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "dummy data source error")
}
}
impl std::error::Error for DummyDataSourceError {}
impl StackError for DummyDataSourceError {
fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
fn next(&self) -> Option<&dyn StackError> {
None
}
}
impl ErrorExt for DummyDataSourceError {
fn status_code(&self) -> StatusCode {
StatusCode::Internal
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
}