feat: implement drop multiple tables (#4085)

* feat: implement drop multiple tables

* fix: pass fmt and clippy checks

* add: drop multiple sqlness test

* update: accept review suggestions

* update: accept reviem suggestion

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fix: pass clippy check

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
sarailQAQ
2024-06-04 21:11:41 +08:00
committed by GitHub
parent c0aed1d267
commit 98c19ed0fa
7 changed files with 207 additions and 54 deletions

View File

@@ -185,12 +185,15 @@ impl StatementExecutor {
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name, stmt.drop_if_exists(), query_ctx)
let mut table_names = Vec::with_capacity(stmt.table_names().len());
for table_name_stmt in stmt.table_names() {
let (catalog, schema, table) =
table_idents_to_full_name(table_name_stmt, &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
table_names.push(TableName::new(catalog, schema, table));
}
self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
.await
}
Statement::DropDatabase(stmt) => {

View File

@@ -643,18 +643,44 @@ impl StatementExecutor {
drop_if_exists: bool,
query_context: QueryContextRef,
) -> Result<Output> {
if let Some(table) = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
// Reserved for grpc call
self.drop_tables(&[table_name], drop_if_exists, query_context)
.await
.context(CatalogSnafu)?
{
let table_id = table.table_info().table_id();
self.drop_table_procedure(&table_name, table_id, drop_if_exists, query_context)
}
#[tracing::instrument(skip_all)]
pub async fn drop_tables(
&self,
table_names: &[TableName],
drop_if_exists: bool,
query_context: QueryContextRef,
) -> Result<Output> {
let mut tables = Vec::with_capacity(table_names.len());
for table_name in table_names {
if let Some(table) = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
{
tables.push(table.table_info().table_id());
} else if drop_if_exists {
// DROP TABLE IF EXISTS meets table not found - ignored
continue;
} else {
return TableNotFoundSnafu {
table_name: table_name.to_string(),
}
.fail();
}
}
for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
.await?;
// Invalidates local cache ASAP.
@@ -668,17 +694,8 @@ impl StatementExecutor {
)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
} else if drop_if_exists {
// DROP TABLE IF EXISTS meets table not found - ignored
Ok(Output::new_with_affected_rows(0))
} else {
TableNotFoundSnafu {
table_name: table_name.to_string(),
}
.fail()
}
Ok(Output::new_with_affected_rows(0))
}
#[tracing::instrument(skip_all)]