simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-22 13:36:35 +08:00
parent f1947da869
commit e7c47c40b5

View File

@@ -493,23 +493,30 @@ impl RangePlanRewriter {
schema: &Arc<DFSchema>,
need_default_by: bool,
) -> Result<(Expr, Vec<Expr>)> {
let mut time_index_expr = None;
#[allow(deprecated)]
let mut time_index_expr = Expr::Wildcard {
qualifier: None,
options: Box::new(WildcardOptions::default()),
};
let mut default_by = vec![];
let metadata_time_index_expr = find_time_index_by_metadata(schema);
let mut table_refs = Vec::new();
let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| {
let (qualifier, field) = schema.qualified_field(i);
if field.metadata().contains_key(TIME_INDEX_KEY)
&& matches!(field.data_type(), DataType::Timestamp(_, _))
{
Some(Expr::Column(Column::new(
qualifier.cloned(),
field.name().clone(),
)))
} else {
None
}
});
for i in 0..schema.fields().len() {
let (qualifier, _) = schema.qualified_field(i);
if let Some(table_ref) = qualifier
&& !table_refs.contains(table_ref)
{
table_refs.push(table_ref.clone());
}
}
for table_ref in table_refs {
let table_source = match self.table_provider.resolve_table(table_ref.clone()).await {
if let Some(table_ref) = qualifier {
let table_source = match self.table_provider.resolve_table(table_ref.clone()).await
{
Ok(table_source) => table_source,
Err(error) => {
if matches!(&error, catalog::error::Error::TableNotExist { .. })
@@ -520,16 +527,15 @@ impl RangePlanRewriter {
return Err(error).context(CatalogSnafu);
}
};
let default_table_source = table_source
let table = table_source
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?;
let adapter = default_table_source
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?;
let table = adapter.table();
.context(UnknownTableSnafu)?
.table();
let schema = table.schema();
let time_index_column =
schema
@@ -551,48 +557,35 @@ impl RangePlanRewriter {
if default_by.is_empty() {
default_by = vec![1.lit()];
}
time_index_expr = Some(Expr::Column(Column::new(
time_index_expr = Expr::Column(Column::new(
Some(table_ref.clone()),
time_index_column.name.clone(),
)));
));
}
}
if let Some(expr) = time_index_expr {
return Ok((expr, default_by));
}
if let Some(expr) = metadata_time_index_expr {
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. })
&& let Some(expr) = metadata_time_index_expr
{
ensure!(
!need_default_by,
RangeQuerySnafu {
msg: "Cannot infer default BY columns from derived range query input"
}
);
return Ok((expr, vec![]));
time_index_expr = expr;
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. }) {
TimeIndexNotFoundSnafu {
table: schema.to_string(),
}
.fail()
}
}
fn find_time_index_by_metadata(schema: &DFSchema) -> Option<Expr> {
(0..schema.fields().len()).find_map(|i| {
let (qualifier, field) = schema.qualified_field(i);
if field.metadata().contains_key(TIME_INDEX_KEY)
&& matches!(field.data_type(), DataType::Timestamp(_, _))
{
Some(Expr::Column(Column::new(
qualifier.cloned(),
field.name().clone(),
)))
} else {
None
Ok((time_index_expr, default_by))
}
})
}
}
fn have_range_in_exprs(exprs: &[Expr]) -> bool {