mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
fix: get correct table info when insert create/alter table (#7641)
* fix: set partition column&other newly acquired table info Signed-off-by: discord9 <discord9@163.com> * fix: update after alter table info Signed-off-by: discord9 <discord9@163.com> * refactor: use table name instead Signed-off-by: discord9 <discord9@163.com> * chore: log Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -540,6 +540,7 @@ impl Inserter {
|
||||
|
||||
let mut create_tables = vec![];
|
||||
let mut alter_tables = vec![];
|
||||
let mut need_refresh_table_infos = HashSet::new();
|
||||
let mut instant_table_ids = HashSet::new();
|
||||
|
||||
for req in &mut requests.inserts {
|
||||
@@ -549,7 +550,6 @@ impl Inserter {
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
|
||||
req,
|
||||
&table,
|
||||
@@ -558,6 +558,13 @@ impl Inserter {
|
||||
is_single_value,
|
||||
)? {
|
||||
alter_tables.push(alter_expr);
|
||||
need_refresh_table_infos.insert((
|
||||
catalog.to_string(),
|
||||
schema.clone(),
|
||||
req.table_name.clone(),
|
||||
));
|
||||
} else {
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
}
|
||||
}
|
||||
None => {
|
||||
@@ -696,6 +703,22 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
// refresh table infos for altered tables
|
||||
for (catalog, schema, table_name) in need_refresh_table_infos {
|
||||
let table = self
|
||||
.get_table(&catalog, &schema, &table_name)
|
||||
.await?
|
||||
.context(TableNotFoundSnafu {
|
||||
table_name: common_catalog::format_full_table_name(
|
||||
&catalog,
|
||||
&schema,
|
||||
&table_name,
|
||||
),
|
||||
})?;
|
||||
let table_info = table.table_info();
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
}
|
||||
|
||||
Ok(CreateAlterTableResult {
|
||||
instant_table_ids,
|
||||
table_infos,
|
||||
|
||||
@@ -417,7 +417,7 @@ impl StatementExecutor {
|
||||
);
|
||||
}
|
||||
|
||||
let mut raw_tables_info = create_table_exprs
|
||||
let raw_tables_info = create_table_exprs
|
||||
.iter()
|
||||
.map(|create| create_table_info(create, vec![]))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
@@ -428,7 +428,7 @@ impl StatementExecutor {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let resp = self
|
||||
.create_logical_tables_procedure(tables_data, query_context)
|
||||
.create_logical_tables_procedure(tables_data, query_context.clone())
|
||||
.await?;
|
||||
|
||||
let table_ids = resp.table_ids;
|
||||
@@ -444,18 +444,51 @@ impl StatementExecutor {
|
||||
);
|
||||
info!("Successfully created logical tables: {:?}", table_ids);
|
||||
|
||||
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
|
||||
table_info.ident.table_id = table_ids[i];
|
||||
}
|
||||
let tables_info = raw_tables_info
|
||||
.into_iter()
|
||||
.map(|x| x.try_into().context(CreateTableInfoSnafu))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
// Reacquire table infos from catalog so logical tables inherit the latest partition
|
||||
// metadata (e.g. partition_key_indices) from their physical tables.
|
||||
// And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr
|
||||
let mut tables_info = Vec::with_capacity(table_ids.len());
|
||||
for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
Some(&query_context),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
Ok(tables_info
|
||||
.into_iter()
|
||||
.map(|x| DistTable::table(Arc::new(x)))
|
||||
.collect())
|
||||
let table_info = table.table_info();
|
||||
// Safety check: ensure we are returning the table info that matches the newly created table id.
|
||||
ensure!(
|
||||
table_info.table_id() == *table_id,
|
||||
CreateLogicalTablesSnafu {
|
||||
reason: format!(
|
||||
"Table id mismatch after creation, expected {}, got {} for table {}",
|
||||
table_id,
|
||||
table_info.table_id(),
|
||||
format_full_table_name(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name
|
||||
)
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
tables_info.push(table_info);
|
||||
}
|
||||
|
||||
Ok(tables_info.into_iter().map(DistTable::table).collect())
|
||||
}
|
||||
|
||||
async fn validate_logical_table_partition_rule(
|
||||
|
||||
Reference in New Issue
Block a user