diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 4b867147be..7a72de63a1 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -21,7 +21,7 @@ use api::v1::CreateTableExpr; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use futures_util::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -143,7 +143,12 @@ impl CreateLogicalTablesProcedure { for peer in leaders { let requester = self.context.node_manager.datanode(&peer).await; - let request = self.make_request(&peer, region_routes)?; + let Some(request) = self.make_request(&peer, region_routes)? else { + debug!("no region request to send to datanode {}", peer); + // We can skip the rest of the datanodes, + // the rest of the datanodes should have the same result. + break; + }; create_region_tasks.push(async move { requester diff --git a/src/common/meta/src/ddl/create_logical_tables/check.rs b/src/common/meta/src/ddl/create_logical_tables/check.rs index 2f86b8ac44..6ed94e7826 100644 --- a/src/common/meta/src/ddl/create_logical_tables/check.rs +++ b/src/common/meta/src/ddl/create_logical_tables/check.rs @@ -25,7 +25,7 @@ impl CreateLogicalTablesProcedure { Ok(()) } - pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> { + pub async fn check_tables_already_exist(&mut self) -> Result<()> { let table_name_keys = self .data .all_create_table_exprs() diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index bc0d290c4e..9448f67a14 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader}; +use common_telemetry::debug; use common_telemetry::tracing_context::TracingContext; use store_api::storage::RegionId; @@ -31,11 +32,15 @@ impl CreateLogicalTablesProcedure { &self, peer: &Peer, region_routes: &[RegionRoute], - ) -> Result { + ) -> Result> { let tasks = &self.data.tasks; + let table_ids_already_exists = &self.data.table_ids_already_exists; let regions_on_this_peer = find_leader_regions(region_routes, peer); let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len()); - for task in tasks { + for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) { + if table_id_already_exists.is_some() { + continue; + } let create_table_expr = &task.create_table; let catalog = &create_table_expr.catalog_name; let schema = &create_table_expr.schema_name; @@ -51,13 +56,18 @@ impl CreateLogicalTablesProcedure { } } - Ok(RegionRequest { + if requests.is_empty() { + debug!("no region request to send to datanodes"); + return Ok(None); + } + + Ok(Some(RegionRequest { header: Some(RegionRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), ..Default::default() }), body: Some(region_request::Body::Creates(CreateRequests { requests })), - }) + })) } fn create_region_request_builder( diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 4f1c97f14c..5690b8fedd 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -291,6 +291,7 @@ async fn test_on_datanode_create_logical_regions() { } }); + procedure.check_tables_already_exist().await.unwrap(); let status = procedure.on_datanode_create_regions().await.unwrap(); assert!(matches!(status, Status::Executing { persist: true })); assert!(matches!(