From 5943b4106789516a4f2beac74d7583a358a9f03e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 26 May 2026 20:54:54 +0800 Subject: [PATCH] refactor: split repartition region descriptors (#8172) * refactor: split repartition region descriptors Signed-off-by: WenyXu * feat(meta-srv): support default source repartition planning Signed-off-by: WenyXu * feat(meta-srv): support default source repartition metadata Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/meta-srv/src/procedure/repartition.rs | 68 ++-- .../procedure/repartition/allocate_region.rs | 32 +- .../src/procedure/repartition/dispatch.rs | 8 +- .../src/procedure/repartition/group.rs | 12 +- .../group/apply_staging_manifest.rs | 4 +- .../repartition/group/enter_staging_region.rs | 12 +- .../repartition/group/remap_manifest.rs | 8 +- .../repartition/group/repartition_start.rs | 118 ++++-- .../update_metadata/apply_staging_region.rs | 97 ++++- .../update_metadata/exit_staging_region.rs | 62 ++- .../src/procedure/repartition/plan.rs | 360 ++++++++++++++++-- .../repartition/repartition_start.rs | 198 +++++++++- .../src/procedure/repartition/test_util.rs | 6 +- .../src/procedure/repartition/utils.rs | 100 +++-- 14 files changed, 894 insertions(+), 191 deletions(-) diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index f314a40080..be060b7424 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -853,7 +853,7 @@ mod tests { use crate::procedure::repartition::deallocate_region::DeallocateRegion; use crate::procedure::repartition::dispatch::Dispatch; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; - use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::repartition_end::RepartitionEnd; use crate::procedure::repartition::test_util::{ TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids, @@ -864,16 +864,16 @@ mod tests { fn test_plan(table_id: TableId) -> RepartitionPlanEntry { RepartitionPlanEntry { group_id: uuid::Uuid::new_v4(), - source_regions: vec![RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }], + source_regions: vec![SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + range_expr("x", 0, 100), + )], target_regions: vec![ - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 1), partition_expr: range_expr("x", 0, 50), }, - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 3), partition_expr: range_expr("x", 50, 100), }, @@ -1209,16 +1209,16 @@ mod tests { ); let succeeded_plan = RepartitionPlanEntry { group_id: Uuid::new_v4(), - source_regions: vec![RegionDescriptor { - region_id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 100, 200), - }], + source_regions: vec![SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 2), + range_expr("x", 100, 200), + )], target_regions: vec![ - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 2), partition_expr: range_expr("x", 100, 150), }, - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 4), partition_expr: range_expr("x", 150, 200), }, @@ -1292,16 +1292,16 @@ mod tests { ); let succeeded_plan = RepartitionPlanEntry { group_id: Uuid::new_v4(), - source_regions: vec![RegionDescriptor { - region_id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 100, 200), - }], + source_regions: vec![SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 2), + range_expr("x", 100, 200), + )], target_regions: vec![ - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 2), partition_expr: range_expr("x", 100, 150), }, - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 4), partition_expr: range_expr("x", 150, 200), }, @@ -1567,16 +1567,16 @@ mod tests { let failed_merge_plan = RepartitionPlanEntry { group_id: Uuid::new_v4(), source_regions: vec![ - RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }, - RegionDescriptor { - region_id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 100, 200), - }, + SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + range_expr("x", 0, 100), + ), + SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 2), + range_expr("x", 100, 200), + ), ], - target_regions: vec![RegionDescriptor { + target_regions: vec![TargetRegionDescriptor { region_id: RegionId::new(table_id, 1), partition_expr: range_expr("x", 0, 200), }], @@ -1587,16 +1587,16 @@ mod tests { }; let succeeded_split_plan = RepartitionPlanEntry { group_id: Uuid::new_v4(), - source_regions: vec![RegionDescriptor { - region_id: RegionId::new(table_id, 3), - partition_expr: range_expr("x", 200, 300), - }], + source_regions: vec![SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 3), + range_expr("x", 200, 300), + )], target_regions: vec![ - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 3), partition_expr: range_expr("x", 200, 250), }, - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 4), partition_expr: range_expr("x", 250, 300), }, diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index a3293e8c3e..c49866ac8a 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -35,7 +35,7 @@ use tokio::time::Instant; use crate::error::{self, Result}; use crate::procedure::repartition::dispatch::Dispatch; use crate::procedure::repartition::plan::{ - AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry, + AllocationPlanEntry, RepartitionPlanEntry, TargetRegionDescriptor, convert_allocation_plan_to_repartition_plan, }; use crate::procedure::repartition::{Context, State}; @@ -324,7 +324,7 @@ impl AllocateRegion { /// Collects all regions that need to be allocated from the repartition plan entries. fn collect_allocate_regions( repartition_plan_entries: &[RepartitionPlanEntry], - ) -> Vec<&RegionDescriptor> { + ) -> Vec<&TargetRegionDescriptor> { repartition_plan_entries .iter() .flat_map(|p| p.allocate_regions()) @@ -333,7 +333,7 @@ impl AllocateRegion { /// Prepares region allocation data: region numbers and their partition expressions. fn prepare_region_allocation_data( - allocate_regions: &[&RegionDescriptor], + allocate_regions: &[&TargetRegionDescriptor], ) -> Result> { allocate_regions .iter() @@ -417,6 +417,7 @@ mod tests { use super::*; use crate::procedure::repartition::State; + use crate::procedure::repartition::plan::SourceRegionDescriptor; use crate::procedure::repartition::test_util::{ TestingEnv, current_parent_region_routes, new_parent_context, range_expr, test_region_wal_options, @@ -428,8 +429,21 @@ mod tests { col: &str, start: i64, end: i64, - ) -> RegionDescriptor { - RegionDescriptor { + ) -> SourceRegionDescriptor { + SourceRegionDescriptor::partitioned( + RegionId::new(table_id, region_number), + range_expr(col, start, end), + ) + } + + fn create_target_region_descriptor( + table_id: TableId, + region_number: u32, + col: &str, + start: i64, + end: i64, + ) -> TargetRegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, region_number), partition_expr: range_expr(col, start, end), } @@ -700,10 +714,10 @@ mod tests { fn test_prepare_region_allocation_data() { let table_id = 1024; let regions = [ - create_region_descriptor(table_id, 10, "x", 0, 50), - create_region_descriptor(table_id, 11, "x", 50, 100), + create_target_region_descriptor(table_id, 10, "x", 0, 50), + create_target_region_descriptor(table_id, 11, "x", 50, 100), ]; - let region_refs: Vec<&RegionDescriptor> = regions.iter().collect(); + let region_refs: Vec<&TargetRegionDescriptor> = regions.iter().collect(); let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap(); @@ -732,7 +746,7 @@ mod tests { ctx.persistent_ctx.plans = vec![RepartitionPlanEntry { group_id: Uuid::new_v4(), source_regions: vec![], - target_regions: vec![create_region_descriptor(table_id, 3, "x", 0, 100)], + target_regions: vec![create_target_region_descriptor(table_id, 3, "x", 0, 100)], allocated_region_ids: vec![RegionId::new(table_id, 3)], pending_deallocate_region_ids: vec![], transition_map: vec![], diff --git a/src/meta-srv/src/procedure/repartition/dispatch.rs b/src/meta-srv/src/procedure/repartition/dispatch.rs index 3a9f9376f1..4377123887 100644 --- a/src/meta-srv/src/procedure/repartition/dispatch.rs +++ b/src/meta-srv/src/procedure/repartition/dispatch.rs @@ -25,22 +25,22 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::procedure::repartition::collect::{Collect, ProcedureMeta}; use crate::procedure::repartition::group::RepartitionGroupProcedure; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::{self, Context, State}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Dispatch; pub(crate) fn build_region_mapping( - source_regions: &[RegionDescriptor], - target_regions: &[RegionDescriptor], + source_regions: &[SourceRegionDescriptor], + target_regions: &[TargetRegionDescriptor], transition_map: &[Vec], ) -> HashMap> { transition_map .iter() .enumerate() .map(|(source_idx, indices)| { - let source_region = source_regions[source_idx].region_id; + let source_region = source_regions[source_idx].region_id(); let target_regions = indices .iter() .map(|&target_idx| target_regions[target_idx].region_id) diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index 12374e8ada..2dc1117467 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -49,7 +49,7 @@ use uuid::Uuid; use crate::error::{self, Result}; use crate::procedure::repartition::group::repartition_start::RepartitionStart; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::utils::get_datanode_table_value; use crate::procedure::repartition::{self}; use crate::service::mailbox::MailboxRef; @@ -330,9 +330,9 @@ pub struct PersistentContext { /// The schema name of the repartition group. pub schema_name: String, /// The source regions of the repartition group. - pub sources: Vec, + pub sources: Vec, /// The target regions of the repartition group. - pub targets: Vec, + pub targets: Vec, /// For each `source region`, the corresponding /// `target regions` that overlap with it. pub region_mapping: HashMap>, @@ -360,8 +360,8 @@ impl PersistentContext { table_id: TableId, catalog_name: String, schema_name: String, - sources: Vec, - targets: Vec, + sources: Vec, + targets: Vec, region_mapping: HashMap>, sync_region: bool, allocated_region_ids: Vec, @@ -392,7 +392,7 @@ impl PersistentContext { SchemaLock::read(&self.catalog_name, &self.schema_name).into(), ]); for source in &self.sources { - lock_keys.push(RegionLock::Write(source.region_id).into()); + lock_keys.push(RegionLock::Write(source.region_id()).into()); } lock_keys } diff --git a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs index 43e5ee31d9..6148901ffa 100644 --- a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs @@ -37,7 +37,7 @@ use crate::procedure::repartition::group::utils::{ HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, }; use crate::procedure::repartition::group::{Context, State}; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::TargetRegionDescriptor; use crate::service::mailbox::{Channel, MailboxRef}; #[derive(Debug, Serialize, Deserialize)] @@ -75,7 +75,7 @@ impl ApplyStagingManifest { fn build_apply_staging_manifest_instructions( staging_manifest_paths: &HashMap, target_routes: &[RegionRoute], - targets: &[RegionDescriptor], + targets: &[TargetRegionDescriptor], central_region_id: RegionId, ) -> Result { let target_partition_expr_by_region = targets diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs index c1957031d5..d1be2ca9d0 100644 --- a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -38,7 +38,7 @@ use crate::procedure::repartition::group::utils::{ HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, }; use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State}; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::TargetRegionDescriptor; use crate::procedure::utils::{self, ErrorStrategy}; use crate::service::mailbox::{Channel, MailboxRef}; @@ -77,7 +77,7 @@ impl EnterStagingRegion { fn build_enter_staging_instructions( group_id: GroupId, prepare_result: &GroupPrepareResult, - targets: &[RegionDescriptor], + targets: &[TargetRegionDescriptor], pending_deallocate_region_ids: &[RegionId], ) -> Result>> { let target_partition_expr_by_region = targets @@ -454,7 +454,7 @@ mod tests { use crate::error::{self, Error}; use crate::procedure::repartition::group::GroupPrepareResult; use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; - use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::plan::TargetRegionDescriptor; use crate::procedure::repartition::test_util::{ TestingEnv, new_persistent_context, range_expr, }; @@ -720,13 +720,13 @@ mod tests { } } - fn test_targets() -> Vec { + fn test_targets() -> Vec { vec![ - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(1024, 1), partition_expr: range_expr("x", 0, 10), }, - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(1024, 2), partition_expr: range_expr("x", 10, 20), }, diff --git a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs index 1d6a75100e..d8259a354f 100644 --- a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs @@ -30,7 +30,7 @@ use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest; use crate::procedure::repartition::group::{Context, State}; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::service::mailbox::{Channel, MailboxRef}; #[derive(Debug, Serialize, Deserialize)] @@ -98,8 +98,8 @@ impl State for RemapManifest { impl RemapManifest { fn build_remap_manifest_instructions( - source_regions: &[RegionDescriptor], - target_regions: &[RegionDescriptor], + source_regions: &[SourceRegionDescriptor], + target_regions: &[TargetRegionDescriptor], region_mapping: &HashMap>, central_region_id: RegionId, ) -> Result { @@ -117,7 +117,7 @@ impl RemapManifest { Ok(common_meta::instruction::RemapManifest { region_id: central_region_id, - input_regions: source_regions.iter().map(|r| r.region_id).collect(), + input_regions: source_regions.iter().map(|r| r.region_id()).collect(), region_mapping: region_mapping.clone(), new_partition_exprs, }) diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs index 8b8b5208b4..8392890911 100644 --- a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -19,7 +19,7 @@ use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::debug; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{OptionExt, ensure}; use crate::error::{self, Result}; use crate::procedure::repartition::group::sync_region::SyncRegion; @@ -27,21 +27,18 @@ use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::{ Context, GroupId, GroupPrepareResult, State, region_routes, }; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; #[derive(Debug, Serialize, Deserialize)] pub struct RepartitionStart; -/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor. -fn ensure_region_route_expr_match( +/// Ensures that the partition expression of the source region route matches the source descriptor. +fn ensure_source_region_route_expr_match( region_route: &RegionRoute, - region_descriptor: &RegionDescriptor, + source: &SourceRegionDescriptor, ) -> Result { let actual = region_route.region.partition_expr(); - let expected = region_descriptor - .partition_expr - .as_json_str() - .context(error::SerializePartitionExprSnafu)?; + let expected = source.route_expr_for_rollback()?; ensure!( actual == expected, error::PartitionExprMismatchSnafu { @@ -60,8 +57,8 @@ impl RepartitionStart { fn ensure_route_present( group_id: GroupId, region_routes: &[RegionRoute], - sources: &[RegionDescriptor], - targets: &[RegionDescriptor], + sources: &[SourceRegionDescriptor], + targets: &[TargetRegionDescriptor], ) -> Result { ensure!( !sources.is_empty(), @@ -78,12 +75,12 @@ impl RepartitionStart { .iter() .map(|s| { region_routes_map - .get(&s.region_id) + .get(&s.region_id()) .context(error::RepartitionSourceRegionMissingSnafu { group_id, - region_id: s.region_id, + region_id: s.region_id(), }) - .and_then(|r| ensure_region_route_expr_match(r, s)) + .and_then(|r| ensure_source_region_route_expr_match(r, s)) }) .collect::>>()?; let target_region_routes = targets @@ -109,7 +106,7 @@ impl RepartitionStart { } ); } - let central_region = sources[0].region_id; + let central_region = sources[0].region_id(); let central_region_datanode = source_region_routes[0] .leader_peer .as_ref() @@ -216,16 +213,14 @@ mod tests { use crate::error::Error; use crate::procedure::repartition::group::repartition_start::RepartitionStart; - use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::test_util::range_expr; #[test] fn test_ensure_route_present_missing_source_region() { - let source_region = RegionDescriptor { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 100), - }; - let target_region = RegionDescriptor { + let source_region = + SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100)); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(1024, 2), partition_expr: range_expr("x", 0, 10), }; @@ -249,11 +244,9 @@ mod tests { #[test] fn test_ensure_route_present_partition_expr_mismatch() { - let source_region = RegionDescriptor { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 100), - }; - let target_region = RegionDescriptor { + let source_region = + SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100)); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(1024, 2), partition_expr: range_expr("x", 0, 10), }; @@ -277,12 +270,69 @@ mod tests { } #[test] - fn test_ensure_route_present_missing_target_region() { - let source_region = RegionDescriptor { + fn test_ensure_route_present_default_source_matches_empty_partition_expr() { + let source_region = SourceRegionDescriptor::Default { region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 100), }; - let target_region = RegionDescriptor { + let target_region = TargetRegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10), + }; + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(1024, 1), + partition_expr: String::new(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + + let result = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ); + + assert!(result.is_ok()); + } + + #[test] + fn test_ensure_route_present_default_source_rejects_non_empty_partition_expr() { + let source_region = SourceRegionDescriptor::Default { + region_id: RegionId::new(1024, 1), + }; + let target_region = TargetRegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10), + }; + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + + let err = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ) + .unwrap_err(); + + assert_matches!(err, Error::PartitionExprMismatch { .. }); + } + + #[test] + fn test_ensure_route_present_missing_target_region() { + let source_region = + SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100)); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(1024, 2), partition_expr: range_expr("x", 0, 10), }; @@ -307,11 +357,9 @@ mod tests { #[test] fn test_ensure_route_present_legacy_partition_expr_source() { - let source_region = RegionDescriptor { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 100), - }; - let target_region = RegionDescriptor { + let source_region = + SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100)); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(1024, 2), partition_expr: range_expr("x", 0, 10), }; diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs index ff01161ff5..13fc486467 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs @@ -22,7 +22,7 @@ use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::{Context, GroupId, region_routes}; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; impl UpdateMetadata { /// Applies the new partition expressions for staging regions. @@ -32,8 +32,8 @@ impl UpdateMetadata { /// - Source region not found. pub(crate) fn apply_staging_region_routes( group_id: GroupId, - sources: &[RegionDescriptor], - targets: &[RegionDescriptor], + sources: &[SourceRegionDescriptor], + targets: &[TargetRegionDescriptor], pending_deallocate_region_ids: &[store_api::storage::RegionId], current_region_routes: &[RegionRoute], ) -> Result> { @@ -61,15 +61,16 @@ impl UpdateMetadata { } for source in sources { - let region_route = region_routes_map.get_mut(&source.region_id).context( + let region_id = source.region_id(); + let region_route = region_routes_map.get_mut(®ion_id).context( error::RepartitionSourceRegionMissingSnafu { group_id, - region_id: source.region_id, + region_id, }, )?; // Set leader staging state for the source region route. region_route.set_leader_staging(); - if pending_deallocate_region_ids.contains(&source.region_id) { + if pending_deallocate_region_ids.contains(®ion_id) { // When a region is pending deallocation, it should ignore all writes. region_route.set_ignore_all_writes(); } @@ -130,7 +131,7 @@ mod tests { use uuid::Uuid; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; - use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::test_util::range_expr; #[test] @@ -166,11 +167,11 @@ mod tests { ..Default::default() }, ]; - let source_region = RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }; - let target_region = RegionDescriptor { + let source_region = SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + range_expr("x", 0, 100), + ); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(table_id, 2), partition_expr: range_expr("x", 0, 10), }; @@ -196,6 +197,68 @@ mod tests { assert!(!new_region_routes[2].is_leader_staging()); } + #[test] + fn test_generate_region_routes_with_reused_default_source_region() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let default_region_id = RegionId::new(table_id, 1); + let region_routes = vec![ + RegionRoute { + region: Region { + id: default_region_id, + partition_expr: String::new(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + partition_expr: String::new(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + ]; + let source_region = SourceRegionDescriptor::Default { + region_id: default_region_id, + }; + let reused_target_expr = range_expr("x", 0, 10); + let target_regions = vec![ + TargetRegionDescriptor { + region_id: default_region_id, + partition_expr: reused_target_expr.clone(), + }, + TargetRegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 10, 20), + }, + ]; + + let new_region_routes = UpdateMetadata::apply_staging_region_routes( + group_id, + &[source_region], + &target_regions, + &[], + ®ion_routes, + ) + .unwrap(); + + assert_eq!( + new_region_routes[0].region.partition_expr, + reused_target_expr.as_json_str().unwrap() + ); + assert!(new_region_routes[0].is_leader_staging()); + assert!(!new_region_routes[0].is_ignore_all_writes()); + assert_eq!( + new_region_routes[1].region.partition_expr, + range_expr("x", 10, 20).as_json_str().unwrap() + ); + assert!(new_region_routes[1].is_leader_staging()); + } + #[test] fn test_generate_region_routes_mark_pending_deallocate_reject_all_writes() { let group_id = Uuid::new_v4(); @@ -221,11 +284,11 @@ mod tests { ..Default::default() }, ]; - let source_region = RegionDescriptor { - region_id: pending_deallocate_region_id, - partition_expr: range_expr("x", 0, 100), - }; - let target_region = RegionDescriptor { + let source_region = SourceRegionDescriptor::partitioned( + pending_deallocate_region_id, + range_expr("x", 0, 100), + ); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(table_id, 2), partition_expr: range_expr("x", 0, 10), }; diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs index 50864daa93..325f859a98 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs @@ -22,13 +22,13 @@ use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::{Context, GroupId, region_routes}; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; impl UpdateMetadata { pub(crate) fn exit_staging_region_routes( group_id: GroupId, - sources: &[RegionDescriptor], - targets: &[RegionDescriptor], + sources: &[SourceRegionDescriptor], + targets: &[TargetRegionDescriptor], current_region_routes: &[RegionRoute], ) -> Result> { let mut region_routes = current_region_routes.to_vec(); @@ -48,10 +48,11 @@ impl UpdateMetadata { } for source in sources { - let region_route = region_routes_map.get_mut(&source.region_id).context( + let region_id = source.region_id(); + let region_route = region_routes_map.get_mut(®ion_id).context( error::RepartitionSourceRegionMissingSnafu { group_id, - region_id: source.region_id, + region_id, }, )?; region_route.clear_leader_staging(); @@ -113,24 +114,25 @@ mod tests { use uuid::Uuid; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; - use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::test_util::range_expr; #[test] fn test_exit_staging_region_routes_keep_reject_all_writes() { let group_id = Uuid::new_v4(); let table_id = 1024; - let source_region = RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }; - let target_region = RegionDescriptor { + let source_region = SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + range_expr("x", 0, 100), + ); + let source_region_id = source_region.region_id(); + let target_region = TargetRegionDescriptor { region_id: RegionId::new(table_id, 2), partition_expr: range_expr("x", 0, 50), }; let mut source_route = RegionRoute { region: Region { - id: source_region.region_id, + id: source_region_id, partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), ..Default::default() }, @@ -165,4 +167,40 @@ mod tests { assert!(!new_region_routes[1].is_leader_staging()); assert!(new_region_routes[1].is_ignore_all_writes()); } + + #[test] + fn test_exit_staging_region_routes_with_reused_default_source_region() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let default_region_id = RegionId::new(table_id, 1); + let source_region = SourceRegionDescriptor::Default { + region_id: default_region_id, + }; + let target_region = TargetRegionDescriptor { + region_id: default_region_id, + partition_expr: range_expr("x", 0, 50), + }; + let target_expr = target_region.partition_expr.as_json_str().unwrap(); + let region_route = RegionRoute { + region: Region { + id: default_region_id, + partition_expr: target_expr.clone(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + leader_state: Some(LeaderState::Staging), + ..Default::default() + }; + + let new_region_routes = UpdateMetadata::exit_staging_region_routes( + group_id, + &[source_region], + &[target_region], + &[region_route], + ) + .unwrap(); + + assert!(!new_region_routes[0].is_leader_staging()); + assert_eq!(new_region_routes[0].region.partition_expr, target_expr); + } } diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs index 063a64341b..1d11d7aa56 100644 --- a/src/meta-srv/src/procedure/repartition/plan.rs +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -16,17 +16,137 @@ use std::cmp::Ordering; use common_meta::rpc::router::RegionRoute; use partition::expr::PartitionExpr; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; +use snafu::ResultExt; use store_api::storage::{RegionId, RegionNumber, TableId}; +use crate::error::{self, Result}; use crate::procedure::repartition::group::GroupId; -/// Metadata describing a region involved in the plan. +/// Metadata describing a source region involved in the plan. +/// +/// Source regions may represent either an existing partitioned region or the +/// default region of an unpartitioned table. +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub enum SourceRegionDescriptor { + /// A regular partitioned source region. + Partitioned { + /// The region id of the source region. + region_id: RegionId, + /// The partition expression of the source region. + partition_expr: PartitionExpr, + }, + /// The default source region of an unpartitioned table. + Default { + /// The region id of the default source region. + region_id: RegionId, + }, +} + +impl<'de> Deserialize<'de> for SourceRegionDescriptor { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(deny_unknown_fields)] + struct PartitionedSourceRegionDescriptor { + region_id: RegionId, + partition_expr: PartitionExpr, + } + + #[derive(Deserialize)] + #[serde(untagged)] + enum SourceRegionDescriptorRepr { + Tagged(SourceRegionDescriptorTaggedRepr), + Legacy(PartitionedSourceRegionDescriptor), + } + + #[derive(Deserialize)] + enum SourceRegionDescriptorTaggedRepr { + Partitioned { + region_id: RegionId, + partition_expr: PartitionExpr, + }, + Default { + region_id: RegionId, + }, + } + + match SourceRegionDescriptorRepr::deserialize(deserializer)? { + SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Partitioned { + region_id, + partition_expr, + }) => Ok(Self::Partitioned { + region_id, + partition_expr, + }), + SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Default { + region_id, + }) => Ok(Self::Default { region_id }), + SourceRegionDescriptorRepr::Legacy(descriptor) => Ok(Self::Partitioned { + region_id: descriptor.region_id, + partition_expr: descriptor.partition_expr, + }), + } + } +} + +impl SourceRegionDescriptor { + /// Creates a partitioned source region descriptor. + pub fn partitioned(region_id: RegionId, partition_expr: PartitionExpr) -> Self { + Self::Partitioned { + region_id, + partition_expr, + } + } + + /// Returns the region id of this source descriptor. + pub fn region_id(&self) -> RegionId { + match self { + Self::Partitioned { region_id, .. } => *region_id, + Self::Default { region_id } => *region_id, + } + } + + /// Returns the partition expression if this source is partitioned. + pub fn partition_expr(&self) -> Option<&PartitionExpr> { + match self { + Self::Partitioned { partition_expr, .. } => Some(partition_expr), + Self::Default { .. } => None, + } + } + + /// Returns true if this source descriptor matches the route partition expression. + pub fn matches_route_expr(&self, route_expr: &str) -> Result { + match self { + Self::Partitioned { partition_expr, .. } => { + let expected = partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu)?; + Ok(route_expr == expected) + } + Self::Default { .. } => Ok(route_expr.is_empty()), + } + } + + /// Returns the route partition expression to restore during rollback. + pub fn route_expr_for_rollback(&self) -> Result { + match self { + Self::Partitioned { partition_expr, .. } => partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu), + Self::Default { .. } => Ok(String::new()), + } + } +} + +/// Metadata describing a target region involved in the plan. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct RegionDescriptor { - /// The region id of the region involved in the plan. +pub struct TargetRegionDescriptor { + /// The region id of the target region. pub region_id: RegionId, - /// The partition expression of the region. + /// The partition expression of the target region. pub partition_expr: PartitionExpr, } @@ -37,7 +157,7 @@ pub struct AllocationPlanEntry { /// The group id for this plan entry. pub group_id: GroupId, /// Source region descriptors involved in the plan. - pub source_regions: Vec, + pub source_regions: Vec, /// The target partition expressions for the new or changed regions. pub target_partition_exprs: Vec, /// For each `source_regions[k]`, the corresponding vector contains global @@ -52,9 +172,9 @@ pub struct RepartitionPlanEntry { /// The group id for this plan entry. pub group_id: GroupId, /// The source region descriptors involved in the plan. - pub source_regions: Vec, + pub source_regions: Vec, /// The target region descriptors involved in the plan. - pub target_regions: Vec, + pub target_regions: Vec, /// The region ids of the allocated regions. pub allocated_region_ids: Vec, /// The region ids of the regions that are pending deallocation. @@ -69,7 +189,7 @@ pub struct RepartitionPlanEntry { impl RepartitionPlanEntry { /// Returns the target regions that are newly allocated. - pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> { + pub(crate) fn allocate_regions(&self) -> Vec<&TargetRegionDescriptor> { self.target_regions .iter() .filter(|r| self.allocated_region_ids.contains(&r.region_id)) @@ -111,7 +231,7 @@ pub fn convert_allocation_plan_to_repartition_plan( .iter() .skip(source_regions.len()) .map(|target_partition_expr| { - let desc = RegionDescriptor { + let desc = TargetRegionDescriptor { region_id: RegionId::new(table_id, *next_region_number), partition_expr: target_partition_expr.clone(), }; @@ -128,10 +248,12 @@ pub fn convert_allocation_plan_to_repartition_plan( let target_regions = source_regions .iter() .zip(target_partition_exprs.iter()) - .map(|(source_region, target_partition_expr)| RegionDescriptor { - region_id: source_region.region_id, - partition_expr: target_partition_expr.clone(), - }) + .map( + |(source_region, target_partition_expr)| TargetRegionDescriptor { + region_id: source_region.region_id(), + partition_expr: target_partition_expr.clone(), + }, + ) .chain(pending_allocate_target_partition_exprs) .collect::>(); @@ -149,10 +271,12 @@ pub fn convert_allocation_plan_to_repartition_plan( let target_regions = source_regions .iter() .zip(target_partition_exprs.iter()) - .map(|(source_region, target_partition_expr)| RegionDescriptor { - region_id: source_region.region_id, - partition_expr: target_partition_expr.clone(), - }) + .map( + |(source_region, target_partition_expr)| TargetRegionDescriptor { + region_id: source_region.region_id(), + partition_expr: target_partition_expr.clone(), + }, + ) .collect::>(); RepartitionPlanEntry { @@ -171,16 +295,18 @@ pub fn convert_allocation_plan_to_repartition_plan( .iter() .take(target_partition_exprs.len()) .zip(target_partition_exprs.iter()) - .map(|(source_region, target_partition_expr)| RegionDescriptor { - region_id: source_region.region_id, - partition_expr: target_partition_expr.clone(), - }) + .map( + |(source_region, target_partition_expr)| TargetRegionDescriptor { + region_id: source_region.region_id(), + partition_expr: target_partition_expr.clone(), + }, + ) .collect::>(); let pending_deallocate_region_ids = source_regions .iter() .skip(target_partition_exprs.len()) - .map(|source_region| source_region.region_id) + .map(|source_region| source_region.region_id()) .collect::>(); RepartitionPlanEntry { @@ -210,11 +336,140 @@ mod tests { col: &str, start: i64, end: i64, - ) -> RegionDescriptor { - RegionDescriptor { - region_id: RegionId::new(table_id, region_number), - partition_expr: range_expr(col, start, end), - } + ) -> SourceRegionDescriptor { + SourceRegionDescriptor::partitioned( + RegionId::new(table_id, region_number), + range_expr(col, start, end), + ) + } + + #[test] + fn test_source_region_descriptor_deserializes_legacy_partitioned_shape() { + let table_id = 1024; + let region_id = RegionId::new(table_id, 1); + let partition_expr = range_expr("x", 0, 100); + let legacy_json = serde_json::json!({ + "region_id": region_id, + "partition_expr": partition_expr, + }); + + let descriptor: SourceRegionDescriptor = serde_json::from_value(legacy_json).unwrap(); + + assert_eq!( + descriptor, + SourceRegionDescriptor::partitioned(region_id, partition_expr) + ); + } + + #[test] + fn test_source_region_descriptor_rejects_legacy_default_shape() { + let region_id = RegionId::new(1024, 1); + let default_json = serde_json::json!({ + "region_id": region_id, + }); + + let err = serde_json::from_value::(default_json).unwrap_err(); + + assert!(err.to_string().contains("data did not match any variant")); + } + + #[test] + fn test_source_region_descriptor_roundtrip_tagged_partitioned_shape() { + let region_id = RegionId::new(1024, 1); + let partition_expr = range_expr("x", 0, 100); + let descriptor = SourceRegionDescriptor::partitioned(region_id, partition_expr.clone()); + + let value = serde_json::to_value(&descriptor).unwrap(); + let decoded = serde_json::from_value::(value.clone()).unwrap(); + + assert_eq!( + value, + serde_json::json!({ + "Partitioned": { + "region_id": region_id, + "partition_expr": partition_expr, + } + }) + ); + assert_eq!(decoded, descriptor); + } + + #[test] + fn test_source_region_descriptor_roundtrip_tagged_default_shape() { + let region_id = RegionId::new(1024, 1); + let descriptor = SourceRegionDescriptor::Default { region_id }; + + let value = serde_json::to_value(&descriptor).unwrap(); + let decoded = serde_json::from_value::(value.clone()).unwrap(); + + assert_eq!( + value, + serde_json::json!({ + "Default": { + "region_id": region_id, + } + }) + ); + assert_eq!(decoded, descriptor); + } + + #[test] + fn test_source_region_descriptor_rejects_invalid_partition_expr_shape() { + let region_id = RegionId::new(1024, 1); + let invalid_json = serde_json::json!({ + "region_id": region_id, + "partition_expr": 42, + }); + + let err = serde_json::from_value::(invalid_json).unwrap_err(); + + assert!(err.to_string().contains("data did not match any variant")); + } + + #[test] + fn test_repartition_plan_entry_deserializes_legacy_source_regions() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let source_region_id = RegionId::new(table_id, 1); + let target_region_id = RegionId::new(table_id, 2); + let source_partition_expr = range_expr("x", 0, 100); + let target_partition_expr = range_expr("x", 0, 50); + let legacy_json = serde_json::json!({ + "group_id": group_id, + "source_regions": [{ + "region_id": source_region_id, + "partition_expr": source_partition_expr, + }], + "target_regions": [{ + "region_id": target_region_id, + "partition_expr": target_partition_expr, + }], + "allocated_region_ids": [target_region_id], + "pending_deallocate_region_ids": [], + "transition_map": [[0]], + }); + + let plan: RepartitionPlanEntry = serde_json::from_value(legacy_json).unwrap(); + + assert_eq!(plan.group_id, group_id); + assert_eq!( + plan.source_regions, + vec![SourceRegionDescriptor::partitioned( + source_region_id, + source_partition_expr + )] + ); + assert_eq!( + plan.target_regions, + vec![TargetRegionDescriptor { + region_id: target_region_id, + partition_expr: target_partition_expr, + }] + ); + assert_eq!(plan.allocated_region_ids, vec![target_region_id]); + assert!(plan.pending_deallocate_region_ids.is_empty()); + assert_eq!(plan.transition_map, vec![vec![0]]); + assert!(plan.original_target_routes.is_empty()); } #[test] @@ -468,6 +723,55 @@ mod tests { assert_eq!(next_region_number, 6); } + #[test] + fn test_convert_plan_allocate_default_source_region() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let mut next_region_number = 5; + let source_regions = vec![SourceRegionDescriptor::Default { + region_id: RegionId::new(table_id, 1), + }]; + let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + let allocation_plan = AllocationPlanEntry { + group_id, + source_regions: source_regions.clone(), + target_partition_exprs: target_partition_exprs.clone(), + transition_map: vec![vec![0, 1]], + }; + + let result = convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plan, + ); + + assert_eq!(result.source_regions, source_regions); + assert_eq!(result.target_regions.len(), 2); + assert_eq!( + result.target_regions[0].region_id, + RegionId::new(table_id, 1) + ); + assert_eq!( + result.target_regions[0].partition_expr, + target_partition_exprs[0] + ); + assert_eq!( + result.target_regions[1].region_id, + RegionId::new(table_id, 5) + ); + assert_eq!( + result.target_regions[1].partition_expr, + target_partition_exprs[1] + ); + assert_eq!( + result.allocated_region_ids, + vec![RegionId::new(table_id, 5)] + ); + assert!(result.pending_deallocate_region_ids.is_empty()); + assert_eq!(result.transition_map, vec![vec![0, 1]]); + assert_eq!(next_region_number, 6); + } + #[test] fn test_convert_plan_deallocate_to_single_region() { let group_id = Uuid::new_v4(); diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index 5c6bcfdb06..6e14e6a0e6 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_meta::key::table_route::PhysicalTableRouteValue; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::debug; +use partition::collider::Collider; use partition::expr::PartitionExpr; use partition::subtask::{self, RepartitionSubtask}; use serde::{Deserialize, Serialize}; @@ -26,7 +27,7 @@ use uuid::Uuid; use crate::error::{self, Result}; use crate::procedure::repartition::allocate_region::AllocateRegion; -use crate::procedure::repartition::plan::{AllocationPlanEntry, RegionDescriptor}; +use crate::procedure::repartition::plan::{AllocationPlanEntry, SourceRegionDescriptor}; use crate::procedure::repartition::repartition_end::RepartitionEnd; use crate::procedure::repartition::{Context, State}; @@ -107,8 +108,12 @@ impl RepartitionStart { from_exprs: &[PartitionExpr], to_exprs: &[PartitionExpr], ) -> Result> { - let subtasks = subtask::create_subtasks(from_exprs, to_exprs) - .context(error::RepartitionCreateSubtasksSnafu)?; + let subtasks = if from_exprs.is_empty() { + Self::default_source_subtasks(to_exprs)? + } else { + subtask::create_subtasks(from_exprs, to_exprs) + .context(error::RepartitionCreateSubtasksSnafu)? + }; if subtasks.is_empty() { return Ok(vec![]); } @@ -123,7 +128,7 @@ impl RepartitionStart { fn build_plan_entries( subtasks: Vec, - source_index: &[RegionDescriptor], + source_index: &[SourceRegionDescriptor], target_exprs: &[PartitionExpr], ) -> Vec { subtasks @@ -151,10 +156,32 @@ impl RepartitionStart { .collect::>() } + fn default_source_subtasks(to_exprs: &[PartitionExpr]) -> Result> { + ensure!( + !to_exprs.is_empty(), + error::UnexpectedSnafu { + violated: "Default source repartition expects non-empty target partition exprs", + } + ); + + Collider::new(to_exprs).context(error::RepartitionCreateSubtasksSnafu)?; + + let to_expr_indices = (0..to_exprs.len()).collect::>(); + Ok(vec![RepartitionSubtask { + from_expr_indices: vec![0], + to_expr_indices: to_expr_indices.clone(), + transition_map: vec![to_expr_indices], + }]) + } + fn source_region_descriptors( from_exprs: &[PartitionExpr], physical_route: &PhysicalTableRouteValue, - ) -> Result> { + ) -> Result> { + if from_exprs.is_empty() { + return Self::default_source_region_descriptors(physical_route); + } + let existing_regions = physical_route .region_routes .iter() @@ -178,13 +205,166 @@ impl RepartitionStart { debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions); })?; - Ok(RegionDescriptor { - region_id: matched_region_id, - partition_expr: expr.clone(), - }) + Ok(SourceRegionDescriptor::partitioned( + matched_region_id, + expr.clone(), + )) }) .collect::>>()?; Ok(descriptors) } + + fn default_source_region_descriptors( + physical_route: &PhysicalTableRouteValue, + ) -> Result> { + ensure!( + physical_route.region_routes.len() == 1, + error::UnexpectedSnafu { + violated: format!( + "Default source repartition expects exactly one source region, but got {}", + physical_route.region_routes.len() + ), + } + ); + + let source_region = &physical_route.region_routes[0].region; + ensure!( + source_region.partition_expr().is_empty(), + error::UnexpectedSnafu { + violated: format!( + "Default source repartition expects an empty partition expr, but got {}", + source_region.partition_expr() + ), + } + ); + + Ok(vec![SourceRegionDescriptor::Default { + region_id: source_region.id, + }]) + } +} + +#[cfg(test)] +mod tests { + use common_meta::key::table_route::PhysicalTableRouteValue; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use datatypes::prelude::Value; + use partition::expr::{Operand, RestrictedOp}; + use store_api::storage::RegionId; + + use super::*; + use crate::procedure::repartition::test_util::{range_expr, test_region_route}; + + fn physical_route(region_routes: Vec) -> PhysicalTableRouteValue { + PhysicalTableRouteValue::new(region_routes) + } + + #[test] + fn test_build_plan_with_default_source_region() { + let table_id = 1024; + let physical_route = + physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]); + let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + + let plans = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap(); + + assert_eq!(plans.len(), 1); + let plan = &plans[0]; + assert_eq!( + plan.source_regions, + vec![SourceRegionDescriptor::Default { + region_id: RegionId::new(table_id, 1) + }] + ); + assert_eq!(plan.target_partition_exprs, to_exprs); + assert_eq!(plan.transition_map, vec![vec![0, 1]]); + } + + #[test] + fn test_build_plan_with_default_source_rejects_non_empty_partition_expr() { + let table_id = 1024; + let physical_route = physical_route(vec![test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + )]); + let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + + let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err(); + + assert!(err.to_string().contains("empty partition expr")); + } + + #[test] + fn test_build_plan_with_default_source_rejects_multiple_regions() { + let table_id = 1024; + let physical_route = physical_route(vec![ + test_region_route(RegionId::new(table_id, 1), ""), + test_region_route(RegionId::new(table_id, 2), ""), + ]); + let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + + let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err(); + + assert!(err.to_string().contains("exactly one source region")); + } + + #[test] + fn test_build_plan_with_default_source_rejects_empty_targets() { + let table_id = 1024; + let physical_route = + physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]); + + let err = RepartitionStart::build_plan(&physical_route, &[], &[]).unwrap_err(); + + assert!(err.to_string().contains("non-empty target partition exprs")); + } + + #[test] + fn test_build_plan_with_default_source_rejects_invalid_targets() { + let table_id = 1024; + let physical_route = + physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]); + let invalid_to_expr = PartitionExpr::new( + Operand::Value(Value::Int64(1)), + RestrictedOp::Eq, + Operand::Value(Value::Int64(2)), + ); + + let err = + RepartitionStart::build_plan(&physical_route, &[], &[invalid_to_expr]).unwrap_err(); + + assert!( + err.to_string() + .contains("Failed to create repartition subtasks") + ); + } + + #[test] + fn test_build_plan_keeps_partitioned_source_matching() { + let table_id = 1024; + let from_exprs = vec![range_expr("x", 0, 100)]; + let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + let physical_route = physical_route(vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + partition_expr: from_exprs[0].as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]); + + let plans = RepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap(); + + assert_eq!(plans.len(), 1); + assert_eq!( + plans[0].source_regions, + vec![SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + from_exprs[0].clone() + )] + ); + } } diff --git a/src/meta-srv/src/procedure/repartition/test_util.rs b/src/meta-srv/src/procedure/repartition/test_util.rs index 83856a49e6..122f8e3953 100644 --- a/src/meta-srv/src/procedure/repartition/test_util.rs +++ b/src/meta-srv/src/procedure/repartition/test_util.rs @@ -42,7 +42,7 @@ use uuid::Uuid; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::metasrv::MetasrvInfo; use crate::procedure::repartition::group::{Context, PersistentContext, VolatileContext}; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::{ Context as ParentContext, PersistentContext as ParentPersistentContext, RepartitionProcedure, }; @@ -177,8 +177,8 @@ pub fn test_region_wal_options(region_numbers: &[RegionNumber]) -> HashMap, - targets: Vec, + sources: Vec, + targets: Vec, ) -> PersistentContext { PersistentContext { group_id: Uuid::new_v4(), diff --git a/src/meta-srv/src/procedure/repartition/utils.rs b/src/meta-srv/src/procedure/repartition/utils.rs index 6f274e9596..f255ca618f 100644 --- a/src/meta-srv/src/procedure/repartition/utils.rs +++ b/src/meta-srv/src/procedure/repartition/utils.rs @@ -23,7 +23,7 @@ use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::error::{self, Result}; use crate::procedure::repartition::group::GroupId; -use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::plan::SourceRegionDescriptor; /// Returns the `datanode_table_value` /// @@ -138,21 +138,23 @@ pub fn merge_and_validate_region_wal_options( /// restored here. pub fn rollback_group_metadata_routes( group_id: GroupId, - source_regions: &[RegionDescriptor], + source_regions: &[SourceRegionDescriptor], original_target_routes: &[RegionRoute], allocated_region_ids: &[RegionId], pending_deallocate_region_ids: &[RegionId], region_routes_map: &mut HashMap, ) -> Result<()> { for source in source_regions { - let region_route = region_routes_map.get_mut(&source.region_id).context( + let region_id = source.region_id(); + let region_route = region_routes_map.get_mut(®ion_id).context( error::RepartitionSourceRegionMissingSnafu { group_id, - region_id: source.region_id, + region_id, }, )?; region_route.clear_leader_staging(); - if pending_deallocate_region_ids.contains(&source.region_id) { + region_route.region.partition_expr = source.route_expr_for_rollback()?; + if pending_deallocate_region_ids.contains(®ion_id) { region_route.clear_ignore_all_writes(); } } @@ -191,7 +193,7 @@ mod tests { use super::*; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; - use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor}; use crate::procedure::repartition::test_util::range_expr; /// Helper function to create a Kafka WAL option string from a topic name. @@ -242,7 +244,7 @@ mod tests { fn original_target_routes( region_routes: &[RegionRoute], - targets: &[RegionDescriptor], + targets: &[TargetRegionDescriptor], ) -> Vec { let target_ids = targets .iter() @@ -380,16 +382,16 @@ mod tests { ), new_staged_region_route(RegionId::new(table_id, 3), "", None, false), ]; - let sources = vec![RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }]; + let sources = vec![SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + range_expr("x", 0, 100), + )]; let targets = vec![ - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 1), partition_expr: range_expr("x", 0, 50), }, - RegionDescriptor { + TargetRegionDescriptor { region_id: RegionId::new(table_id, 3), partition_expr: range_expr("x", 50, 100), }, @@ -420,6 +422,60 @@ mod tests { assert_eq!(applied_region_routes, original_region_routes); } + #[test] + fn test_rollback_group_metadata_routes_default_source_restores_empty_expr() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let default_region_id = RegionId::new(table_id, 1); + let allocated_region_id = RegionId::new(table_id, 2); + let source_regions = vec![SourceRegionDescriptor::Default { + region_id: default_region_id, + }]; + let target_regions = vec![ + TargetRegionDescriptor { + region_id: default_region_id, + partition_expr: range_expr("x", 0, 50), + }, + TargetRegionDescriptor { + region_id: allocated_region_id, + partition_expr: range_expr("x", 50, 100), + }, + ]; + let current_region_routes = vec![ + new_staged_region_route(default_region_id, "", None, false), + new_staged_region_route(allocated_region_id, "", None, false), + ]; + let original_target_routes = vec![current_region_routes[0].clone()]; + let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes( + group_id, + &source_regions, + &target_regions, + &[], + ¤t_region_routes, + ) + .unwrap(); + assert_eq!( + applied_region_routes[0].region.partition_expr, + range_expr("x", 0, 50).as_json_str().unwrap() + ); + + rollback_group_metadata_routes( + group_id, + &source_regions, + &original_target_routes, + &[allocated_region_id], + &[], + &mut applied_region_routes + .iter_mut() + .map(|route| (route.region.id, route)) + .collect(), + ) + .unwrap(); + + assert_eq!(applied_region_routes[0].region.partition_expr, ""); + assert!(!applied_region_routes[0].is_leader_staging()); + } + #[test] fn test_rollback_group_metadata_routes_merge_case_is_idempotent() { let group_id = Uuid::new_v4(); @@ -445,16 +501,16 @@ mod tests { ), ]; let sources = vec![ - RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }, - RegionDescriptor { - region_id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 100, 200), - }, + SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 1), + range_expr("x", 0, 100), + ), + SourceRegionDescriptor::partitioned( + RegionId::new(table_id, 2), + range_expr("x", 100, 200), + ), ]; - let targets = vec![RegionDescriptor { + let targets = vec![TargetRegionDescriptor { region_id: RegionId::new(table_id, 1), partition_expr: range_expr("x", 0, 200), }];