diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index e90e1d3457..9cafac7a52 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -493,23 +493,30 @@ impl RangePlanRewriter { schema: &Arc, need_default_by: bool, ) -> Result<(Expr, Vec)> { - let mut time_index_expr = None; + #[allow(deprecated)] + let mut time_index_expr = Expr::Wildcard { + qualifier: None, + options: Box::new(WildcardOptions::default()), + }; let mut default_by = vec![]; - - let metadata_time_index_expr = find_time_index_by_metadata(schema); - - let mut table_refs = Vec::new(); + let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| { + let (qualifier, field) = schema.qualified_field(i); + if field.metadata().contains_key(TIME_INDEX_KEY) + && matches!(field.data_type(), DataType::Timestamp(_, _)) + { + Some(Expr::Column(Column::new( + qualifier.cloned(), + field.name().clone(), + ))) + } else { + None + } + }); for i in 0..schema.fields().len() { let (qualifier, _) = schema.qualified_field(i); - if let Some(table_ref) = qualifier - && !table_refs.contains(table_ref) - { - table_refs.push(table_ref.clone()); - } - } - - for table_ref in table_refs { - let table_source = match self.table_provider.resolve_table(table_ref.clone()).await { + if let Some(table_ref) = qualifier { + let table_source = match self.table_provider.resolve_table(table_ref.clone()).await + { Ok(table_source) => table_source, Err(error) => { if matches!(&error, catalog::error::Error::TableNotExist { .. }) @@ -520,16 +527,15 @@ impl RangePlanRewriter { return Err(error).context(CatalogSnafu); } }; - let default_table_source = table_source + let table = table_source .as_any() .downcast_ref::() - .context(UnknownTableSnafu)?; - let adapter = default_table_source + .context(UnknownTableSnafu)? .table_provider .as_any() .downcast_ref::() - .context(UnknownTableSnafu)?; - let table = adapter.table(); + .context(UnknownTableSnafu)? + .table(); let schema = table.schema(); let time_index_column = schema @@ -551,48 +557,35 @@ impl RangePlanRewriter { if default_by.is_empty() { default_by = vec![1.lit()]; } - time_index_expr = Some(Expr::Column(Column::new( + time_index_expr = Expr::Column(Column::new( Some(table_ref.clone()), time_index_column.name.clone(), - ))); + )); } } - - if let Some(expr) = time_index_expr { - return Ok((expr, default_by)); } - - if let Some(expr) = metadata_time_index_expr { + #[allow(deprecated)] + if matches!(time_index_expr, Expr::Wildcard { .. }) + && let Some(expr) = metadata_time_index_expr + { ensure!( !need_default_by, RangeQuerySnafu { msg: "Cannot infer default BY columns from derived range query input" } ); - return Ok((expr, vec![])); + time_index_expr = expr; } - + #[allow(deprecated)] + if matches!(time_index_expr, Expr::Wildcard { .. }) { TimeIndexNotFoundSnafu { table: schema.to_string(), } .fail() - } -} - -fn find_time_index_by_metadata(schema: &DFSchema) -> Option { - (0..schema.fields().len()).find_map(|i| { - let (qualifier, field) = schema.qualified_field(i); - if field.metadata().contains_key(TIME_INDEX_KEY) - && matches!(field.data_type(), DataType::Timestamp(_, _)) - { - Some(Expr::Column(Column::new( - qualifier.cloned(), - field.name().clone(), - ))) } else { - None + Ok((time_index_expr, default_by)) } - }) + } } fn have_range_in_exprs(exprs: &[Expr]) -> bool {