mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix: fix create table ddl return incorrect table id (#3232)
* fix: fix create table ddl return incorrect table id * refactor: refactor param of Status::done_with_output
This commit is contained in:
@@ -104,18 +104,18 @@ impl CreateTableProcedure {
|
||||
/// Checks whether the table exists.
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let expr = &self.creator.data.task.create_table;
|
||||
let exist = self
|
||||
let table_name_value = self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.exists(TableNameKey::new(
|
||||
.get(TableNameKey::new(
|
||||
&expr.catalog_name,
|
||||
&expr.schema_name,
|
||||
&expr.table_name,
|
||||
))
|
||||
.await?;
|
||||
|
||||
if exist {
|
||||
if let Some(value) = table_name_value {
|
||||
ensure!(
|
||||
self.creator.data.task.create_table.create_if_not_exists,
|
||||
error::TableAlreadyExistsSnafu {
|
||||
@@ -123,7 +123,8 @@ impl CreateTableProcedure {
|
||||
}
|
||||
);
|
||||
|
||||
return Ok(Status::done());
|
||||
let table_id = value.table_id();
|
||||
return Ok(Status::done_with_output(table_id));
|
||||
}
|
||||
|
||||
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
|
||||
@@ -315,7 +316,7 @@ impl CreateTableProcedure {
|
||||
.await?;
|
||||
info!("Created table metadata for table {table_id}");
|
||||
|
||||
Ok(Status::done())
|
||||
Ok(Status::done_with_output(table_id))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{info, tracing};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -163,7 +163,7 @@ impl DdlManager {
|
||||
alter_table_task: AlterTableTask,
|
||||
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
|
||||
physical_table_info: Option<(TableId, TableName)>,
|
||||
) -> Result<ProcedureId> {
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = AlterTableProcedure::new(
|
||||
@@ -187,7 +187,7 @@ impl DdlManager {
|
||||
create_table_task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) -> Result<ProcedureId> {
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = CreateTableProcedure::new(
|
||||
@@ -211,7 +211,7 @@ impl DdlManager {
|
||||
drop_table_task: DropTableTask,
|
||||
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
|
||||
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
|
||||
) -> Result<ProcedureId> {
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = DropTableProcedure::new(
|
||||
@@ -235,7 +235,7 @@ impl DdlManager {
|
||||
truncate_table_task: TruncateTableTask,
|
||||
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
|
||||
region_routes: Vec<RegionRoute>,
|
||||
) -> Result<ProcedureId> {
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
let procedure = TruncateTableProcedure::new(
|
||||
cluster_id,
|
||||
@@ -250,7 +250,10 @@ impl DdlManager {
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
|
||||
async fn submit_procedure(
|
||||
&self,
|
||||
procedure_with_id: ProcedureWithId,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let procedure_id = procedure_with_id.id;
|
||||
|
||||
let mut watcher = self
|
||||
@@ -259,11 +262,11 @@ impl DdlManager {
|
||||
.await
|
||||
.context(SubmitProcedureSnafu)?;
|
||||
|
||||
watcher::wait(&mut watcher)
|
||||
let output = watcher::wait(&mut watcher)
|
||||
.await
|
||||
.context(WaitProcedureSnafu)?;
|
||||
|
||||
Ok(procedure_id)
|
||||
Ok((procedure_id, output))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -288,7 +291,7 @@ async fn handle_truncate_table_task(
|
||||
|
||||
let table_route = table_route_value.into_inner().region_routes()?.clone();
|
||||
|
||||
let id = ddl_manager
|
||||
let (id, _) = ddl_manager
|
||||
.submit_truncate_table_task(
|
||||
cluster_id,
|
||||
truncate_table_task,
|
||||
@@ -363,7 +366,7 @@ async fn handle_alter_table_task(
|
||||
))
|
||||
};
|
||||
|
||||
let id = ddl_manager
|
||||
let (id, _) = ddl_manager
|
||||
.submit_alter_table_task(
|
||||
cluster_id,
|
||||
alter_table_task,
|
||||
@@ -405,7 +408,7 @@ async fn handle_drop_table_task(
|
||||
let table_route_value =
|
||||
DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value));
|
||||
|
||||
let id = ddl_manager
|
||||
let (id, _) = ddl_manager
|
||||
.submit_drop_table_task(
|
||||
cluster_id,
|
||||
drop_table_task,
|
||||
@@ -443,7 +446,7 @@ async fn handle_create_table_task(
|
||||
|
||||
create_table_task.table_info.ident.table_id = table_id;
|
||||
|
||||
let id = ddl_manager
|
||||
let (id, output) = ddl_manager
|
||||
.submit_create_table_task(
|
||||
cluster_id,
|
||||
create_table_task,
|
||||
@@ -451,8 +454,10 @@ async fn handle_create_table_task(
|
||||
region_wal_options,
|
||||
)
|
||||
.await?;
|
||||
let output = output.context(error::ProcedureOutputSnafu)?;
|
||||
|
||||
info!("Table: {table_id:?} is created via procedure_id {id:?}");
|
||||
let table_id = *(output.downcast_ref::<u32>().unwrap());
|
||||
info!("Table: {table_id} is created via procedure_id {id:?}");
|
||||
|
||||
Ok(SubmitDdlTaskResponse {
|
||||
key: id.to_string().into(),
|
||||
|
||||
@@ -118,6 +118,9 @@ pub enum Error {
|
||||
source: datatypes::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to get procedure output"))]
|
||||
ProcedureOutput { location: Location },
|
||||
|
||||
#[snafu(display("Primary key '{key}' not found when creating region request"))]
|
||||
PrimaryKeyNotFound { key: String, location: Location },
|
||||
|
||||
@@ -396,7 +399,8 @@ impl ErrorExt for Error {
|
||||
| ProduceRecord { .. }
|
||||
| CreateKafkaWalTopic { .. }
|
||||
| EmptyTopicPool { .. }
|
||||
| UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected,
|
||||
| UnexpectedLogicalRouteTable { .. }
|
||||
| ProcedureOutput { .. } => StatusCode::Unexpected,
|
||||
|
||||
SendMessage { .. }
|
||||
| GetKvCache { .. }
|
||||
|
||||
@@ -25,7 +25,7 @@ pub mod watcher;
|
||||
|
||||
pub use crate::error::{Error, Result};
|
||||
pub use crate::procedure::{
|
||||
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
|
||||
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
|
||||
BoxedProcedure, Context, ContextProvider, LockKey, Output, Procedure, ProcedureId,
|
||||
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
|
||||
};
|
||||
pub use crate::watcher::Watcher;
|
||||
|
||||
@@ -58,9 +58,9 @@ impl Status {
|
||||
}
|
||||
|
||||
/// Returns a [Status::Done] with output.
|
||||
pub fn done_with_output(output: Output) -> Status {
|
||||
pub fn done_with_output<T: Any + Send + Sync>(output: T) -> Status {
|
||||
Status::Done {
|
||||
output: Some(output),
|
||||
output: Some(Arc::new(output)),
|
||||
}
|
||||
}
|
||||
/// Returns `true` if the procedure is done.
|
||||
|
||||
@@ -89,7 +89,7 @@ mod tests {
|
||||
self.error = !self.error;
|
||||
Err(Error::retry_later(MockError::new(StatusCode::Internal)))
|
||||
} else {
|
||||
Ok(Status::done_with_output(Arc::new("hello")))
|
||||
Ok(Status::done_with_output("hello"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user