diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 0276e549d6..2366db7897 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -540,6 +540,7 @@ impl Inserter { let mut create_tables = vec![]; let mut alter_tables = vec![]; + let mut need_refresh_table_infos = HashSet::new(); let mut instant_table_ids = HashSet::new(); for req in &mut requests.inserts { @@ -549,7 +550,6 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - table_infos.insert(table_info.table_id(), table.table_info()); if let Some(alter_expr) = self.get_alter_table_expr_on_demand( req, &table, @@ -558,6 +558,13 @@ impl Inserter { is_single_value, )? { alter_tables.push(alter_expr); + need_refresh_table_infos.insert(( + catalog.to_string(), + schema.clone(), + req.table_name.clone(), + )); + } else { + table_infos.insert(table_info.table_id(), table.table_info()); } } None => { @@ -696,6 +703,22 @@ impl Inserter { } } + // refresh table infos for altered tables + for (catalog, schema, table_name) in need_refresh_table_infos { + let table = self + .get_table(&catalog, &schema, &table_name) + .await? + .context(TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name( + &catalog, + &schema, + &table_name, + ), + })?; + let table_info = table.table_info(); + table_infos.insert(table_info.table_id(), table.table_info()); + } + Ok(CreateAlterTableResult { instant_table_ids, table_infos, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 18212be337..e2a395a2e2 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -417,7 +417,7 @@ impl StatementExecutor { ); } - let mut raw_tables_info = create_table_exprs + let raw_tables_info = create_table_exprs .iter() .map(|create| create_table_info(create, vec![])) .collect::>>()?; @@ -428,7 +428,7 @@ impl StatementExecutor { .collect::>(); let resp = self - .create_logical_tables_procedure(tables_data, query_context) + .create_logical_tables_procedure(tables_data, query_context.clone()) .await?; let table_ids = resp.table_ids; @@ -444,18 +444,51 @@ impl StatementExecutor { ); info!("Successfully created logical tables: {:?}", table_ids); - for (i, table_info) in raw_tables_info.iter_mut().enumerate() { - table_info.ident.table_id = table_ids[i]; - } - let tables_info = raw_tables_info - .into_iter() - .map(|x| x.try_into().context(CreateTableInfoSnafu)) - .collect::>>()?; + // Reacquire table infos from catalog so logical tables inherit the latest partition + // metadata (e.g. partition_key_indices) from their physical tables. + // And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr + let mut tables_info = Vec::with_capacity(table_ids.len()); + for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) { + let table = self + .catalog_manager + .table( + &create_table.catalog_name, + &create_table.schema_name, + &create_table.table_name, + Some(&query_context), + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &create_table.catalog_name, + &create_table.schema_name, + &create_table.table_name, + ), + })?; - Ok(tables_info - .into_iter() - .map(|x| DistTable::table(Arc::new(x))) - .collect()) + let table_info = table.table_info(); + // Safety check: ensure we are returning the table info that matches the newly created table id. + ensure!( + table_info.table_id() == *table_id, + CreateLogicalTablesSnafu { + reason: format!( + "Table id mismatch after creation, expected {}, got {} for table {}", + table_id, + table_info.table_id(), + format_full_table_name( + &create_table.catalog_name, + &create_table.schema_name, + &create_table.table_name + ) + ) + } + ); + + tables_info.push(table_info); + } + + Ok(tables_info.into_iter().map(DistTable::table).collect()) } async fn validate_logical_table_partition_rule(