diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index b2b8b6858b..5c8e99211b 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -39,7 +39,7 @@ use crate::key::table_route::PhysicalTableRouteValue; use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::rpc::ddl::AlterTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders}; +use crate::rpc::router::find_leaders; use crate::{cache_invalidator, metrics, ClusterId}; pub struct AlterLogicalTablesProcedure { @@ -118,20 +118,14 @@ impl AlterLogicalTablesProcedure { for peer in leaders { let requester = self.context.datanode_manager.datanode(&peer).await; - let region_numbers = find_leader_regions(&physical_table_route.region_routes, &peer); + let request = self.make_request(&peer, &physical_table_route.region_routes)?; - for region_number in region_numbers { - let request = self.make_request(region_number)?; - let peer = peer.clone(); - let requester = requester.clone(); - - alter_region_tasks.push(async move { - requester - .handle(request) - .await - .map_err(add_peer_context_if_needed(peer)) - }); - } + alter_region_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer)) + }); } // Collects responses from all the alter region tasks. diff --git a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs index 1f6fa2e67b..012cecf12d 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs @@ -19,16 +19,22 @@ use api::v1::region::{ RegionColumnDef, RegionRequest, RegionRequestHeader, }; use common_telemetry::tracing_context::TracingContext; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::RegionId; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::error::Result; use crate::key::table_info::TableInfoValue; +use crate::peer::Peer; use crate::rpc::ddl::AlterTableTask; +use crate::rpc::router::{find_leader_regions, RegionRoute}; impl AlterLogicalTablesProcedure { - pub(crate) fn make_request(&self, region_number: RegionNumber) -> Result { - let alter_requests = self.make_alter_region_requests(region_number)?; + pub(crate) fn make_request( + &self, + peer: &Peer, + region_routes: &[RegionRoute], + ) -> Result { + let alter_requests = self.make_alter_region_requests(peer, region_routes)?; let request = RegionRequest { header: Some(RegionRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), @@ -40,17 +46,25 @@ impl AlterLogicalTablesProcedure { Ok(request) } - fn make_alter_region_requests(&self, region_number: RegionNumber) -> Result { - let mut requests = Vec::with_capacity(self.data.tasks.len()); + fn make_alter_region_requests( + &self, + peer: &Peer, + region_routes: &[RegionRoute], + ) -> Result { + let tasks = &self.data.tasks; + let regions_on_this_peer = find_leader_regions(region_routes, peer); + let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len()); for (task, table) in self .data .tasks .iter() .zip(self.data.table_info_values.iter()) { - let region_id = RegionId::new(table.table_info.ident.table_id, region_number); - let request = self.make_alter_region_request(region_id, task, table)?; - requests.push(request); + for region_number in ®ions_on_this_peer { + let region_id = RegionId::new(table.table_info.ident.table_id, *region_number); + let request = self.make_alter_region_request(region_id, task, table)?; + requests.push(request); + } } Ok(AlterRequests { requests })