fix: alter procedure table not found issue (#1993)

* fix: alter procedure table not found issue

* chore: apply suggestions

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-07-19 17:26:13 +09:00
committed by GitHub
parent bb8468437e
commit 2e2a82689c
4 changed files with 146 additions and 32 deletions

View File

@@ -476,6 +476,12 @@ pub enum Error {
source: common_runtime::JoinError,
location: Location,
},
#[snafu(display("Unexpected, violated: {}", violated))]
Unexpected {
violated: String,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -530,7 +536,7 @@ impl ErrorExt for Error {
| MissingWalDirConfig { .. }
| PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => {
StatusCode::Unexpected
}

View File

@@ -306,7 +306,7 @@ mod test {
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, InsertRequests,
QueryRequest, TableId,
QueryRequest, RenameTable, TableId,
};
use common_catalog::consts::MITO_ENGINE;
use common_error::ext::ErrorExt;
@@ -554,6 +554,79 @@ mod test {
assert_eq!(err.status_code(), StatusCode::TableColumnExists);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_rename_table_twice() {
common_telemetry::init_default_ut_logging();
let instance = MockInstance::new("test_alter_table_twice").await;
let instance = instance.inner();
let query = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let query = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
engine: MITO_ENGINE.to_string(),
table_id: Some(TableId { id: 1025 }),
..Default::default()
})),
});
let output = instance
.do_query(query.clone(), QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let query = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
kind: Some(alter_expr::Kind::RenameTable(RenameTable {
new_table_name: "new_my_table".to_string(),
})),
table_id: Some(TableId { id: 1025 }),
..Default::default()
})),
});
let output = instance
.do_query(query.clone(), QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
// renames it again.
let output = instance
.do_query(query.clone(), QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_ddl() {
let instance = MockInstance::new("test_handle_ddl").await;

View File

@@ -70,21 +70,27 @@ impl Instance {
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
let table = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?;
let table_id = match expr.table_id.as_ref() {
None => {
self.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?
.table_info()
.ident
.table_id
}
Some(table_id) => table_id.id, // For requests from Metasrv.
};
let request = alter_expr_to_request(table.table_info().ident.table_id, expr)
.context(AlterExprToRequestSnafu)?;
let request = alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
self.sql_handler()
.execute(SqlRequest::Alter(request), QueryContext::arc())
.await

View File

@@ -23,7 +23,7 @@ use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
use table_procedure::AlterTableProcedure;
use crate::error::{self, Result};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::sql::SqlHandler;
impl SqlHandler {
@@ -35,25 +35,54 @@ impl SqlHandler {
table: &table_name,
};
let table = self.get_table(&table_ref).await?;
let engine_procedure = self.engine_procedure(table)?;
match self.get_table(&table_ref).await {
Ok(table) => {
let engine_procedure = self.engine_procedure(table)?;
let procedure =
AlterTableProcedure::new(req, self.catalog_manager.clone(), engine_procedure);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
let procedure =
AlterTableProcedure::new(req, self.catalog_manager.clone(), engine_procedure);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Alter table {} by procedure {}", table_name, procedure_id);
info!("Alter table {} by procedure {}", table_name, procedure_id);
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu { procedure_id })?;
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu { procedure_id })?;
watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu { procedure_id })?;
}
Err(err) => {
// TODO(weny): Retrieves table by table_id
if let AlterKind::RenameTable { new_table_name } = req.alter_kind {
let new_table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &new_table_name,
};
let table = self.get_table(&new_table_ref).await?;
ensure!(
table.table_info().table_id() == req.table_id,
UnexpectedSnafu {
violated: format!(
"expected table id: {}, actual: {}",
req.table_id,
table.table_info().table_id()
)
}
)
} else {
return Err(err);
}
}
}
watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu { procedure_id })?;
// Tried in MySQL, it really prints "Affected Rows: 0".
Ok(Output::AffectedRows(0))
}