From 8bade8f8e495e7fc2551bb0daac8261e06bc37f2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 25 Jan 2024 22:58:43 +0900 Subject: [PATCH] fix: fix create table ddl return incorrect table id (#3232) * fix: fix create table ddl return incorrect table id * refactor: refactor param of Status::done_with_output --- src/common/meta/src/ddl/create_table.rs | 11 +++++---- src/common/meta/src/ddl_manager.rs | 31 ++++++++++++++----------- src/common/meta/src/error.rs | 6 ++++- src/common/procedure/src/lib.rs | 4 ++-- src/common/procedure/src/procedure.rs | 4 ++-- src/common/procedure/src/watcher.rs | 2 +- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index ca9ac1e0b0..848e9f2d8e 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -104,18 +104,18 @@ impl CreateTableProcedure { /// Checks whether the table exists. async fn on_prepare(&mut self) -> Result { let expr = &self.creator.data.task.create_table; - let exist = self + let table_name_value = self .context .table_metadata_manager .table_name_manager() - .exists(TableNameKey::new( + .get(TableNameKey::new( &expr.catalog_name, &expr.schema_name, &expr.table_name, )) .await?; - if exist { + if let Some(value) = table_name_value { ensure!( self.creator.data.task.create_table.create_if_not_exists, error::TableAlreadyExistsSnafu { @@ -123,7 +123,8 @@ impl CreateTableProcedure { } ); - return Ok(Status::done()); + let table_id = value.table_id(); + return Ok(Status::done_with_output(table_id)); } self.creator.data.state = CreateTableState::DatanodeCreateRegions; @@ -315,7 +316,7 @@ impl CreateTableProcedure { .await?; info!("Created table metadata for table {table_id}"); - Ok(Status::done()) + Ok(Status::done_with_output(table_id)) } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 655d66126b..d7c74ae519 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; +use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, tracing}; use snafu::{OptionExt, ResultExt}; @@ -163,7 +163,7 @@ impl DdlManager { alter_table_task: AlterTableTask, table_info_value: DeserializedValueWithBytes, physical_table_info: Option<(TableId, TableName)>, - ) -> Result { + ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); let procedure = AlterTableProcedure::new( @@ -187,7 +187,7 @@ impl DdlManager { create_table_task: CreateTableTask, table_route: TableRouteValue, region_wal_options: HashMap, - ) -> Result { + ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); let procedure = CreateTableProcedure::new( @@ -211,7 +211,7 @@ impl DdlManager { drop_table_task: DropTableTask, table_info_value: DeserializedValueWithBytes, table_route_value: DeserializedValueWithBytes, - ) -> Result { + ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); let procedure = DropTableProcedure::new( @@ -235,7 +235,7 @@ impl DdlManager { truncate_table_task: TruncateTableTask, table_info_value: DeserializedValueWithBytes, region_routes: Vec, - ) -> Result { + ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); let procedure = TruncateTableProcedure::new( cluster_id, @@ -250,7 +250,10 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { + async fn submit_procedure( + &self, + procedure_with_id: ProcedureWithId, + ) -> Result<(ProcedureId, Option)> { let procedure_id = procedure_with_id.id; let mut watcher = self @@ -259,11 +262,11 @@ impl DdlManager { .await .context(SubmitProcedureSnafu)?; - watcher::wait(&mut watcher) + let output = watcher::wait(&mut watcher) .await .context(WaitProcedureSnafu)?; - Ok(procedure_id) + Ok((procedure_id, output)) } } @@ -288,7 +291,7 @@ async fn handle_truncate_table_task( let table_route = table_route_value.into_inner().region_routes()?.clone(); - let id = ddl_manager + let (id, _) = ddl_manager .submit_truncate_table_task( cluster_id, truncate_table_task, @@ -363,7 +366,7 @@ async fn handle_alter_table_task( )) }; - let id = ddl_manager + let (id, _) = ddl_manager .submit_alter_table_task( cluster_id, alter_table_task, @@ -405,7 +408,7 @@ async fn handle_drop_table_task( let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value)); - let id = ddl_manager + let (id, _) = ddl_manager .submit_drop_table_task( cluster_id, drop_table_task, @@ -443,7 +446,7 @@ async fn handle_create_table_task( create_table_task.table_info.ident.table_id = table_id; - let id = ddl_manager + let (id, output) = ddl_manager .submit_create_table_task( cluster_id, create_table_task, @@ -451,8 +454,10 @@ async fn handle_create_table_task( region_wal_options, ) .await?; + let output = output.context(error::ProcedureOutputSnafu)?; - info!("Table: {table_id:?} is created via procedure_id {id:?}"); + let table_id = *(output.downcast_ref::().unwrap()); + info!("Table: {table_id} is created via procedure_id {id:?}"); Ok(SubmitDdlTaskResponse { key: id.to_string().into(), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 9da624fab9..246a09711d 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -118,6 +118,9 @@ pub enum Error { source: datatypes::Error, }, + #[snafu(display("Failed to get procedure output"))] + ProcedureOutput { location: Location }, + #[snafu(display("Primary key '{key}' not found when creating region request"))] PrimaryKeyNotFound { key: String, location: Location }, @@ -396,7 +399,8 @@ impl ErrorExt for Error { | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } - | UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected, + | UnexpectedLogicalRouteTable { .. } + | ProcedureOutput { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 3386640572..36ccd6b190 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -25,7 +25,7 @@ pub mod watcher; pub use crate::error::{Error, Result}; pub use crate::procedure::{ - BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager, - ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, + BoxedProcedure, Context, ContextProvider, LockKey, Output, Procedure, ProcedureId, + ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index a60d935c3e..93d8711edd 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -58,9 +58,9 @@ impl Status { } /// Returns a [Status::Done] with output. - pub fn done_with_output(output: Output) -> Status { + pub fn done_with_output(output: T) -> Status { Status::Done { - output: Some(output), + output: Some(Arc::new(output)), } } /// Returns `true` if the procedure is done. diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 93aa91d5bc..de15b545c1 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -89,7 +89,7 @@ mod tests { self.error = !self.error; Err(Error::retry_later(MockError::new(StatusCode::Internal))) } else { - Ok(Status::done_with_output(Arc::new("hello"))) + Ok(Status::done_with_output("hello")) } }